drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amansi...@apache.org
Subject [5/5] drill git commit: DRILL-4982: Separate Hive reader classes for different data formats to improve performance.
Date Sun, 04 Dec 2016 19:21:17 GMT
DRILL-4982: Separate Hive reader classes for different data formats to improve performance.

1, Separating Hive reader classes allows optimization to apply on different classes in optimized ways. This  separation effectively avoid the performance degradation of scan.

2, Do not apply Skip footer/header mechanism on most Hive formats. This skip mechanism introduces extra checks on each incoming records.

close apache/drill#638


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/68bd27a1
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/68bd27a1
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/68bd27a1

Branch: refs/heads/master
Commit: 68bd27a128c2f244bd504369dce510727ea28da7
Parents: 42006ad
Author: chunhui-shi <cshi@maprtech.com>
Authored: Sun Oct 30 01:29:06 2016 -0700
Committer: Aman Sinha <asinha@maprtech.com>
Committed: Sun Dec 4 08:35:05 2016 -0800

----------------------------------------------------------------------
 .../core/src/main/codegen/config.fmpp           |   1 +
 .../core/src/main/codegen/data/HiveFormats.tdd  |  50 ++
 .../codegen/templates/HiveRecordReaders.java    | 300 +++++++++++
 .../exec/store/hive/HiveAbstractReader.java     | 361 +++++++++++++
 .../hive/HiveDrillNativeScanBatchCreator.java   |   2 +-
 .../drill/exec/store/hive/HiveRecordReader.java | 515 -------------------
 .../exec/store/hive/HiveScanBatchCreator.java   |  58 ++-
 7 files changed, 752 insertions(+), 535 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/codegen/config.fmpp
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/config.fmpp b/contrib/storage-hive/core/src/main/codegen/config.fmpp
index cd36891..d8ca3fa 100644
--- a/contrib/storage-hive/core/src/main/codegen/config.fmpp
+++ b/contrib/storage-hive/core/src/main/codegen/config.fmpp
@@ -16,6 +16,7 @@
 
 data: {
     drillOI:tdd(../data/HiveTypes.tdd)
+    hiveFormat:tdd(../data/HiveFormats.tdd)
 }
 freemarkerLinks: {
     includes: includes/

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd b/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
new file mode 100644
index 0000000..5200e4a
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/codegen/data/HiveFormats.tdd
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http:# www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  map: [
+    {
+      hiveFormat: "HiveAvro",
+      hiveReader: "Avro",
+      hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveParquet",
+      hiveReader: "Parquet",
+      hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveText",
+      hiveReader: "Text",
+      hasHeaderFooter: true,
+    },
+    {
+      hiveFormat: "HiveOrc",
+      hiveReader: "Orc",
+      hasHeaderFooter: false,
+    },
+    {
+       hiveFormat: "HiveRCFile",
+       hiveReader: "RCFile",
+       hasHeaderFooter: false,
+    },
+    {
+      hiveFormat: "HiveDefault",
+      hiveReader: "Default",
+      hasHeaderFooter: false,
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
new file mode 100644
index 0000000..0dc8c08
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/codegen/templates/HiveRecordReaders.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This template is used to generate different Hive record reader classes for different data formats
+ * to avoid JIT profile pullusion. These readers are derived from HiveAbstractReader which implements
+ * codes for init and setup stage, but the repeated - and performance critical part - next() method is
+ * separately implemented in the classes generated from this template. The internal SkipRecordReeader
+ * class is also separated as well due to the same reason.
+ *
+ * As to the performance gain with this change, please refer to:
+ * https://issues.apache.org/jira/browse/DRILL-4982
+ *
+ */
+<@pp.dropOutputFile />
+<#list hiveFormat.map as entry>
+<@pp.changeOutputFile name="/org/apache/drill/exec/store/hive/Hive${entry.hiveReader}Reader.java" />
+<#include "/@includes/license.ftl" />
+
+package org.apache.drill.exec.store.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+
+import org.apache.hadoop.mapred.RecordReader;
+<#if entry.hasHeaderFooter == true>
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde.serdeConstants;
+</#if>
+
+public class Hive${entry.hiveReader}Reader extends HiveAbstractReader {
+
+  Object key;
+<#if entry.hasHeaderFooter == true>
+  SkipRecordsInspector skipRecordsInspector;
+<#else>
+  Object value;
+</#if>
+
+  public Hive${entry.hiveReader}Reader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+                       FragmentContext context, final HiveConf hiveConf,
+                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
+    super(table, partition, inputSplit, projectedColumns, context, hiveConf, proxyUgi);
+  }
+
+  public  void internalInit(Properties tableProperties, RecordReader<Object, Object> reader) {
+
+    key = reader.createKey();
+<#if entry.hasHeaderFooter == true>
+    skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
+<#else>
+    value = reader.createValue();
+</#if>
+
+  }
+  private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
+    for (int i = 0; i < selectedStructFieldRefs.size(); i++) {
+      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, selectedStructFieldRefs.get(i));
+      if (hiveValue != null) {
+        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
+          vectors.get(i), outputRecordIndex);
+      }
+    }
+  }
+
+<#if entry.hasHeaderFooter == true>
+  @Override
+  public int next() {
+    for (ValueVector vv : vectors) {
+      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
+    }
+    if (empty) {
+      setValueCountAndPopulatePartitionVectors(0);
+      return 0;
+    }
+
+    try {
+      skipRecordsInspector.reset();
+      Object value;
+
+      int recordCount = 0;
+
+      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
+        if (skipRecordsInspector.doSkipHeader(recordCount++)) {
+          continue;
+        }
+        Object bufferedValue = skipRecordsInspector.bufferAdd(value);
+        if (bufferedValue != null) {
+          Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
+          if (partTblObjectInspectorConverter != null) {
+            deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
+          }
+          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
+          skipRecordsInspector.incrementActualCount();
+        }
+        skipRecordsInspector.incrementTempCount();
+      }
+
+      setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
+      skipRecordsInspector.updateContinuance();
+      return skipRecordsInspector.getActualCount();
+    } catch (IOException | SerDeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+/**
+ * SkipRecordsInspector encapsulates logic to skip header and footer from file.
+ * Logic is applicable only for predefined in constructor file formats.
+ */
+protected class SkipRecordsInspector {
+
+  private final Set<Object> fileFormats;
+  private int headerCount;
+  private int footerCount;
+  private Queue<Object> footerBuffer;
+  // indicates if we continue reading the same file
+  private boolean continuance;
+  private int holderIndex;
+  private List<Object> valueHolder;
+  private int actualCount;
+  // actualCount without headerCount, used to determine holderIndex
+  private int tempCount;
+
+  protected SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
+    this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
+    this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
+    this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
+    logger.debug("skipRecordInspector: fileFormat {}, headerCount {}, footerCount {}",
+        this.fileFormats, this.headerCount, this.footerCount);
+    this.footerBuffer = Lists.newLinkedList();
+    this.continuance = false;
+    this.holderIndex = -1;
+    this.valueHolder = initializeValueHolder(reader, footerCount);
+    this.actualCount = 0;
+    this.tempCount = 0;
+  }
+
+  protected boolean doSkipHeader(int recordCount) {
+    return !continuance && recordCount < headerCount;
+  }
+
+  protected void reset() {
+    tempCount = holderIndex + 1;
+    actualCount = 0;
+    if (!continuance) {
+      footerBuffer.clear();
+    }
+  }
+
+  protected Object bufferAdd(Object value) throws SerDeException {
+    footerBuffer.add(value);
+    if (footerBuffer.size() <= footerCount) {
+      return null;
+    }
+    return footerBuffer.poll();
+  }
+
+  protected Object getNextValue() {
+    holderIndex = tempCount % getHolderSize();
+    return valueHolder.get(holderIndex);
+  }
+
+  private int getHolderSize() {
+    return valueHolder.size();
+  }
+
+  protected void updateContinuance() {
+    this.continuance = actualCount != 0;
+  }
+
+  protected int incrementTempCount() {
+    return ++tempCount;
+  }
+
+  protected int getActualCount() {
+    return actualCount;
+  }
+
+  protected int incrementActualCount() {
+    return ++actualCount;
+  }
+
+  /**
+   * Retrieves positive numeric property from Properties object by name.
+   * Return default value if
+   * 1. file format is absent in predefined file formats list
+   * 2. property doesn't exist in table properties
+   * 3. property value is negative
+   * otherwise casts value to int.
+   *
+   * @param tableProperties property holder
+   * @param propertyName    name of the property
+   * @param defaultValue    default value
+   * @return property numeric value
+   * @throws NumberFormatException if property value is non-numeric
+   */
+  protected int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
+    int propertyIntValue = defaultValue;
+    if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
+      return propertyIntValue;
+    }
+    Object propertyObject = tableProperties.get(propertyName);
+    if (propertyObject != null) {
+      try {
+        propertyIntValue = Integer.valueOf((String) propertyObject);
+      } catch (NumberFormatException e) {
+        throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
+      }
+    }
+    return propertyIntValue < 0 ? defaultValue : propertyIntValue;
+  }
+
+  /**
+   * Creates buffer of objects to be used as values, so these values can be re-used.
+   * Objects number depends on number of lines to skip in the end of the file plus one object.
+   *
+   * @param reader          RecordReader to return value object
+   * @param skipFooterLines number of lines to skip at the end of the file
+   * @return list of objects to be used as values
+   */
+  private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
+    List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
+    for (int i = 0; i <= skipFooterLines; i++) {
+      valueHolder.add(reader.createValue());
+    }
+    return valueHolder;
+  }
+ }
+
+<#else>
+  @Override
+  public int next() {
+    for (ValueVector vv : vectors) {
+      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
+    }
+    if (empty) {
+      setValueCountAndPopulatePartitionVectors(0);
+      return 0;
+    }
+
+    try {
+      int recordCount = 0;
+      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) {
+        Object deSerializedValue = partitionSerDe.deserialize((Writable) value);
+        if (partTblObjectInspectorConverter != null) {
+          deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
+        }
+        readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount);
+        recordCount++;
+      }
+
+      setValueCountAndPopulatePartitionVectors(recordCount);
+      return recordCount;
+    } catch (IOException | SerDeException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+</#if>
+
+}
+</#list>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
new file mode 100644
index 0000000..107fc66
--- /dev/null
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveAbstractReader.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.security.UserGroupInformation;
+
+
+public abstract class HiveAbstractReader extends AbstractRecordReader {
+  protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveAbstractReader.class);
+
+  protected final DrillBuf managedBuffer;
+
+  protected Table table;
+  protected Partition partition;
+  protected InputSplit inputSplit;
+  protected List<String> selectedColumnNames;
+  protected List<StructField> selectedStructFieldRefs = Lists.newArrayList();
+  protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
+  protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
+  protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
+  protected List<String> selectedPartitionNames = Lists.newArrayList();
+  protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
+  protected List<Object> selectedPartitionValues = Lists.newArrayList();
+
+  // SerDe of the reading partition (or table if the table is non-partitioned)
+  protected SerDe partitionSerDe;
+
+  // ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
+  // ObjectInspector).
+  protected StructObjectInspector partitionOI;
+
+  // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
+  // partition. If there are no schema changes then this is same as the partitionOI.
+  protected StructObjectInspector finalOI;
+
+  // Converter which converts data from partition schema to table schema.
+  protected Converter partTblObjectInspectorConverter;
+
+  protected Object key;
+  protected RecordReader<Object, Object> reader;
+  protected List<ValueVector> vectors = Lists.newArrayList();
+  protected List<ValueVector> pVectors = Lists.newArrayList();
+  protected boolean empty;
+  protected HiveConf hiveConf;
+  protected FragmentContext fragmentContext;
+  protected String defaultPartitionValue;
+  protected final UserGroupInformation proxyUgi;
+
+
+  protected static final int TARGET_RECORD_COUNT = 4000;
+
+  public HiveAbstractReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
+                       FragmentContext context, final HiveConf hiveConf,
+                       UserGroupInformation proxyUgi) throws ExecutionSetupException {
+    this.table = table;
+    this.partition = partition;
+    this.inputSplit = inputSplit;
+    this.empty = (inputSplit == null && partition == null);
+    this.hiveConf = hiveConf;
+    this.fragmentContext = context;
+    this.proxyUgi = proxyUgi;
+    this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
+    setColumns(projectedColumns);
+  }
+
+  public abstract void internalInit(Properties tableProperties, RecordReader<Object, Object> reader);
+
+  private void init() throws ExecutionSetupException {
+    final JobConf job = new JobConf(hiveConf);
+
+    // Get the configured default val
+    defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
+
+    Properties tableProperties;
+    try {
+      tableProperties = MetaStoreUtils.getTableMetadata(table);
+      final Properties partitionProperties =
+          (partition == null) ?  tableProperties :
+              HiveUtilities.getPartitionMetadata(partition, table);
+      HiveUtilities.addConfToJob(job, partitionProperties);
+
+      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
+      final StructObjectInspector tableOI = getStructOI(tableSerDe);
+
+      if (partition != null) {
+        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
+        partitionOI = getStructOI(partitionSerDe);
+
+        finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
+        partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
+      } else {
+        // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
+        partitionSerDe = tableSerDe;
+        partitionOI = tableOI;
+        partTblObjectInspectorConverter = null;
+        finalOI = tableOI;
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
+      }
+
+      if (logger.isTraceEnabled()) {
+        for (StructField field: finalOI.getAllStructFieldRefs()) {
+          logger.trace("field in finalOI: {}", field.getClass().getName());
+        }
+        logger.trace("partitionSerDe class is {} {}", partitionSerDe.getClass().getName());
+      }
+      // Get list of partition column names
+      final List<String> partitionNames = Lists.newArrayList();
+      for (FieldSchema field : table.getPartitionKeys()) {
+        partitionNames.add(field.getName());
+      }
+
+      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
+      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
+      // SerDe object knows how to get the schema with all the config and table properties passed in initialization.
+      // ObjectInspector created from the SerDe object has the schema.
+      final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
+      final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
+
+      // Select list of columns for project pushdown into Hive SerDe readers.
+      final List<Integer> columnIds = Lists.newArrayList();
+      if (isStarQuery()) {
+        selectedColumnNames = tableColumnNames;
+        for(int i=0; i<selectedColumnNames.size(); i++) {
+          columnIds.add(i);
+        }
+        selectedPartitionNames = partitionNames;
+      } else {
+        selectedColumnNames = Lists.newArrayList();
+        for (SchemaPath field : getColumns()) {
+          String columnName = field.getRootSegment().getPath();
+          if (partitionNames.contains(columnName)) {
+            selectedPartitionNames.add(columnName);
+          } else {
+            columnIds.add(tableColumnNames.indexOf(columnName));
+            selectedColumnNames.add(columnName);
+          }
+        }
+      }
+      ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
+
+      for (String columnName : selectedColumnNames) {
+        StructField fieldRef = finalOI.getStructFieldRef(columnName);
+        selectedStructFieldRefs.add(fieldRef);
+        ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
+
+        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
+
+        selectedColumnObjInspectors.add(fieldOI);
+        selectedColumnTypes.add(typeInfo);
+        selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
+      }
+
+      for(int i=0; i<selectedColumnNames.size(); ++i){
+        logger.trace("inspector:typeName={}, className={}, TypeInfo: {}, converter:{}",
+            selectedColumnObjInspectors.get(i).getTypeName(),
+            selectedColumnObjInspectors.get(i).getClass().getName(),
+            selectedColumnTypes.get(i).toString(),
+            selectedColumnFieldConverters.get(i).getClass().getName());
+      }
+
+      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
+        FieldSchema field = table.getPartitionKeys().get(i);
+        if (selectedPartitionNames.contains(field.getName())) {
+          TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
+          selectedPartitionTypes.add(pType);
+
+          if (partition != null) {
+            selectedPartitionValues.add(
+                HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
+          }
+        }
+      }
+    } catch (Exception e) {
+      throw new ExecutionSetupException("Failure while initializing Hive Reader " + this.getClass().getName(), e);
+    }
+
+    if (!empty) {
+      try {
+        reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
+        logger.trace("hive reader created: {} for inputSplit {}", reader.getClass().getName(), inputSplit.toString());
+      } catch (Exception e) {
+        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
+      }
+
+      internalInit(tableProperties, reader);
+    }
+  }
+
+  /**
+   * Utility method which creates a SerDe object for given SerDe class name and properties.
+   */
+  private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
+    final Class<? extends SerDe> c = Class.forName(sLib).asSubclass(SerDe.class);
+    final SerDe serde = c.getConstructor().newInstance();
+    serde.initialize(job, properties);
+
+    return serde;
+  }
+
+  private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
+    ObjectInspector oi = serDe.getObjectInspector();
+    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
+      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
+    }
+    return (StructObjectInspector) oi;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output)
+      throws ExecutionSetupException {
+    // initializes "reader"
+    final Callable<Void> readerInitializer = new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        init();
+        return null;
+      }
+    };
+
+    final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
+    try {
+      result.get();
+    } catch (InterruptedException e) {
+      result.cancel(true);
+      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+      // interruption and respond to it if it wants to.
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
+    }
+    try {
+      final OptionManager options = fragmentContext.getOptions();
+      for (int i = 0; i < selectedColumnNames.size(); i++) {
+        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
+        MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
+        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
+        vectors.add(output.addField(field, vvClass));
+      }
+
+      for (int i = 0; i < selectedPartitionNames.size(); i++) {
+        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
+        MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
+        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
+        pVectors.add(output.addField(field, vvClass));
+      }
+    } catch(SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  /**
+   * To take into account Hive "skip.header.lines.count" property first N values from file are skipped.
+   * Since file can be read in batches (depends on TARGET_RECORD_COUNT), additional checks are made
+   * to determine if it's new file or continuance.
+   *
+   * To take into account Hive "skip.footer.lines.count" property values are buffered in queue
+   * until queue size exceeds number of footer lines to skip, then first value in queue is retrieved.
+   * Buffer of value objects is used to re-use value objects in order to reduce number of created value objects.
+   * For each new file queue is cleared to drop footer lines from previous file.
+   */
+  @Override
+  public abstract int next();
+
+
+
+  protected void setValueCountAndPopulatePartitionVectors(int recordCount) {
+    for (ValueVector v : vectors) {
+      v.getMutator().setValueCount(recordCount);
+    }
+
+    if (partition != null) {
+      populatePartitionVectors(recordCount);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (Exception e) {
+      logger.warn("Failure while closing Hive Record reader.", e);
+    }
+  }
+
+  protected void populatePartitionVectors(int recordCount) {
+    for (int i = 0; i < pVectors.size(); i++) {
+      final ValueVector vector = pVectors.get(i);
+      final Object val = selectedPartitionValues.get(i);
+
+      AllocationHelper.allocateNew(vector, recordCount);
+
+      if (val != null) {
+        HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
+      }
+
+      vector.getMutator().setValueCount(recordCount);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 4be2ced..81be529 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -169,7 +169,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
     // If there are no readers created (which is possible when the table is empty or no row groups are matched),
     // create an empty RecordReader to output the schema
     if (readers.size() == 0) {
-      readers.add(new HiveRecordReader(table, null, null, columns, context, conf,
+      readers.add(new HiveDefaultReader(table, null, null, columns, context, conf,
         ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
deleted file mode 100644
index 8631b8d..0000000
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.hive;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.security.UserGroupInformation;
-
-public class HiveRecordReader extends AbstractRecordReader {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveRecordReader.class);
-
-  private final DrillBuf managedBuffer;
-
-  protected Table table;
-  protected Partition partition;
-  protected InputSplit inputSplit;
-  protected List<String> selectedColumnNames;
-  protected List<TypeInfo> selectedColumnTypes = Lists.newArrayList();
-  protected List<ObjectInspector> selectedColumnObjInspectors = Lists.newArrayList();
-  protected List<HiveFieldConverter> selectedColumnFieldConverters = Lists.newArrayList();
-  protected List<String> selectedPartitionNames = Lists.newArrayList();
-  protected List<TypeInfo> selectedPartitionTypes = Lists.newArrayList();
-  protected List<Object> selectedPartitionValues = Lists.newArrayList();
-
-  // SerDe of the reading partition (or table if the table is non-partitioned)
-  protected SerDe partitionSerDe;
-
-  // ObjectInspector to read data from partitionSerDe (for a non-partitioned table this is same as the table
-  // ObjectInspector).
-  protected StructObjectInspector partitionOI;
-
-  // Final ObjectInspector. We may not use the partitionOI directly if there are schema changes between the table and
-  // partition. If there are no schema changes then this is same as the partitionOI.
-  protected StructObjectInspector finalOI;
-
-  // Converter which converts data from partition schema to table schema.
-  private Converter partTblObjectInspectorConverter;
-
-  protected Object key;
-  protected RecordReader<Object, Object> reader;
-  protected List<ValueVector> vectors = Lists.newArrayList();
-  protected List<ValueVector> pVectors = Lists.newArrayList();
-  protected boolean empty;
-  private HiveConf hiveConf;
-  private FragmentContext fragmentContext;
-  private String defaultPartitionValue;
-  private final UserGroupInformation proxyUgi;
-  private SkipRecordsInspector skipRecordsInspector;
-
-  protected static final int TARGET_RECORD_COUNT = 4000;
-
-  public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns,
-                          FragmentContext context, final HiveConf hiveConf,
-                          UserGroupInformation proxyUgi) throws ExecutionSetupException {
-    this.table = table;
-    this.partition = partition;
-    this.inputSplit = inputSplit;
-    this.empty = (inputSplit == null && partition == null);
-    this.hiveConf = hiveConf;
-    this.fragmentContext = context;
-    this.proxyUgi = proxyUgi;
-    this.managedBuffer = fragmentContext.getManagedBuffer().reallocIfNeeded(256);
-    setColumns(projectedColumns);
-  }
-
-  private void init() throws ExecutionSetupException {
-    final JobConf job = new JobConf(hiveConf);
-
-    // Get the configured default val
-    defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname);
-
-    Properties tableProperties;
-    try {
-      tableProperties = MetaStoreUtils.getTableMetadata(table);
-      final Properties partitionProperties =
-          (partition == null) ?  tableProperties :
-              HiveUtilities.getPartitionMetadata(partition, table);
-      HiveUtilities.addConfToJob(job, partitionProperties);
-
-      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(), tableProperties);
-      final StructObjectInspector tableOI = getStructOI(tableSerDe);
-
-      if (partition != null) {
-        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(), partitionProperties);
-        partitionOI = getStructOI(partitionSerDe);
-
-        finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI, tableOI);
-        partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI, finalOI);
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
-      } else {
-        // For non-partitioned tables, there is no need to create converter as there are no schema changes expected.
-        partitionSerDe = tableSerDe;
-        partitionOI = tableOI;
-        partTblObjectInspectorConverter = null;
-        finalOI = tableOI;
-        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
-      }
-
-      // Get list of partition column names
-      final List<String> partitionNames = Lists.newArrayList();
-      for (FieldSchema field : table.getPartitionKeys()) {
-        partitionNames.add(field.getName());
-      }
-
-      // We should always get the columns names from ObjectInspector. For some of the tables (ex. avro) metastore
-      // may not contain the schema, instead it is derived from other sources such as table properties or external file.
-      // SerDe object knows how to get the schema with all the config and table properties passed in initialization.
-      // ObjectInspector created from the SerDe object has the schema.
-      final StructTypeInfo sTypeInfo = (StructTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(finalOI);
-      final List<String> tableColumnNames = sTypeInfo.getAllStructFieldNames();
-
-      // Select list of columns for project pushdown into Hive SerDe readers.
-      final List<Integer> columnIds = Lists.newArrayList();
-      if (isStarQuery()) {
-        selectedColumnNames = tableColumnNames;
-        for(int i=0; i<selectedColumnNames.size(); i++) {
-          columnIds.add(i);
-        }
-        selectedPartitionNames = partitionNames;
-      } else {
-        selectedColumnNames = Lists.newArrayList();
-        for (SchemaPath field : getColumns()) {
-          String columnName = field.getRootSegment().getPath();
-          if (partitionNames.contains(columnName)) {
-            selectedPartitionNames.add(columnName);
-          } else {
-            columnIds.add(tableColumnNames.indexOf(columnName));
-            selectedColumnNames.add(columnName);
-          }
-        }
-      }
-      ColumnProjectionUtils.appendReadColumns(job, columnIds, selectedColumnNames);
-
-      for (String columnName : selectedColumnNames) {
-        ObjectInspector fieldOI = finalOI.getStructFieldRef(columnName).getFieldObjectInspector();
-        TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldOI.getTypeName());
-
-        selectedColumnObjInspectors.add(fieldOI);
-        selectedColumnTypes.add(typeInfo);
-        selectedColumnFieldConverters.add(HiveFieldConverter.create(typeInfo, fragmentContext));
-      }
-
-      for (int i = 0; i < table.getPartitionKeys().size(); i++) {
-        FieldSchema field = table.getPartitionKeys().get(i);
-        if (selectedPartitionNames.contains(field.getName())) {
-          TypeInfo pType = TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
-          selectedPartitionTypes.add(pType);
-
-          if (partition != null) {
-            selectedPartitionValues.add(
-                HiveUtilities.convertPartitionType(pType, partition.getValues().get(i), defaultPartitionValue));
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new ExecutionSetupException("Failure while initializing HiveRecordReader: " + e.getMessage(), e);
-    }
-
-    if (!empty) {
-      try {
-        reader = (org.apache.hadoop.mapred.RecordReader<Object, Object>) job.getInputFormat().getRecordReader(inputSplit, job, Reporter.NULL);
-      } catch (Exception e) {
-        throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e);
-      }
-      key = reader.createKey();
-      skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader);
-    }
-  }
-
-  /**
-   * Utility method which creates a SerDe object for given SerDe class name and properties.
-   */
-  private static SerDe createSerDe(final JobConf job, final String sLib, final Properties properties) throws Exception {
-    final Class<? extends SerDe> c = Class.forName(sLib).asSubclass(SerDe.class);
-    final SerDe serde = c.getConstructor().newInstance();
-    serde.initialize(job, properties);
-
-    return serde;
-  }
-
-  private static StructObjectInspector getStructOI(final SerDe serDe) throws Exception {
-    ObjectInspector oi = serDe.getObjectInspector();
-    if (oi.getCategory() != ObjectInspector.Category.STRUCT) {
-      throw new UnsupportedOperationException(String.format("%s category not supported", oi.getCategory()));
-    }
-    return (StructObjectInspector) oi;
-  }
-
-  @Override
-  public void setup(OperatorContext context, OutputMutator output)
-      throws ExecutionSetupException {
-    // initializes "reader"
-    final Callable<Void> readerInitializer = new Callable<Void>() {
-      @Override
-      public Void call() throws Exception {
-        init();
-        return null;
-      }
-    };
-
-    final ListenableFuture<Void> result = context.runCallableAs(proxyUgi, readerInitializer);
-    try {
-      result.get();
-    } catch (InterruptedException e) {
-      result.cancel(true);
-      // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
-      // interruption and respond to it if it wants to.
-      Thread.currentThread().interrupt();
-    } catch (ExecutionException e) {
-      throw ExecutionSetupException.fromThrowable(e.getMessage(), e);
-    }
-    try {
-      final OptionManager options = fragmentContext.getOptions();
-      for (int i = 0; i < selectedColumnNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedColumnNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode());
-        vectors.add(output.addField(field, vvClass));
-      }
-
-      for (int i = 0; i < selectedPartitionNames.size(); i++) {
-        MajorType type = HiveUtilities.getMajorTypeFromHiveTypeInfo(selectedPartitionTypes.get(i), options);
-        MaterializedField field = MaterializedField.create(selectedPartitionNames.get(i), type);
-        Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode());
-        pVectors.add(output.addField(field, vvClass));
-      }
-    } catch(SchemaChangeException e) {
-      throw new ExecutionSetupException(e);
-    }
-  }
-
-  /**
-   * To take into account Hive "skip.header.lines.count" property first N values from file are skipped.
-   * Since file can be read in batches (depends on TARGET_RECORD_COUNT), additional checks are made
-   * to determine if it's new file or continuance.
-   *
-   * To take into account Hive "skip.footer.lines.count" property values are buffered in queue
-   * until queue size exceeds number of footer lines to skip, then first value in queue is retrieved.
-   * Buffer of value objects is used to re-use value objects in order to reduce number of created value objects.
-   * For each new file queue is cleared to drop footer lines from previous file.
-   */
-  @Override
-  public int next() {
-    for (ValueVector vv : vectors) {
-      AllocationHelper.allocateNew(vv, TARGET_RECORD_COUNT);
-    }
-    if (empty) {
-      setValueCountAndPopulatePartitionVectors(0);
-      return 0;
-    }
-
-    try {
-      skipRecordsInspector.reset();
-      int recordCount = 0;
-      Object value;
-      while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) {
-        if (skipRecordsInspector.doSkipHeader(recordCount++)) {
-          continue;
-        }
-        Object bufferedValue = skipRecordsInspector.bufferAdd(value);
-        if (bufferedValue != null) {
-          Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue);
-          if (partTblObjectInspectorConverter != null) {
-            deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue);
-          }
-          readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount());
-          skipRecordsInspector.incrementActualCount();
-        }
-        skipRecordsInspector.incrementTempCount();
-      }
-
-      setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount());
-      skipRecordsInspector.updateContinuance();
-      return skipRecordsInspector.getActualCount();
-    } catch (IOException | SerDeException e) {
-      throw new DrillRuntimeException(e);
-    }
-  }
-
-  private void readHiveRecordAndInsertIntoRecordBatch(Object deSerializedValue, int outputRecordIndex) {
-    for (int i = 0; i < selectedColumnNames.size(); i++) {
-      final String columnName = selectedColumnNames.get(i);
-      Object hiveValue = finalOI.getStructFieldData(deSerializedValue, finalOI.getStructFieldRef(columnName));
-
-      if (hiveValue != null) {
-        selectedColumnFieldConverters.get(i).setSafeValue(selectedColumnObjInspectors.get(i), hiveValue,
-            vectors.get(i), outputRecordIndex);
-      }
-    }
-  }
-
-  private void setValueCountAndPopulatePartitionVectors(int recordCount) {
-    for (ValueVector v : vectors) {
-      v.getMutator().setValueCount(recordCount);
-    }
-
-    if (partition != null) {
-      populatePartitionVectors(recordCount);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (Exception e) {
-      logger.warn("Failure while closing Hive Record reader.", e);
-    }
-  }
-
-  protected void populatePartitionVectors(int recordCount) {
-    for (int i = 0; i < pVectors.size(); i++) {
-      final ValueVector vector = pVectors.get(i);
-      final Object val = selectedPartitionValues.get(i);
-
-      AllocationHelper.allocateNew(vector, recordCount);
-
-      if (val != null) {
-        HiveUtilities.populateVector(vector, managedBuffer, val, 0, recordCount);
-      }
-
-      vector.getMutator().setValueCount(recordCount);
-    }
-  }
-
-  /**
-   * SkipRecordsInspector encapsulates logic to skip header and footer from file.
-   * Logic is applicable only for predefined in constructor file formats.
-   */
-  private class SkipRecordsInspector {
-
-    private final Set<Object> fileFormats;
-    private int headerCount;
-    private int footerCount;
-    private Queue<Object> footerBuffer;
-    // indicates if we continue reading the same file
-    private boolean continuance;
-    private int holderIndex;
-    private List<Object> valueHolder;
-    private int actualCount;
-    // actualCount without headerCount, used to determine holderIndex
-    private int tempCount;
-
-    private SkipRecordsInspector(Properties tableProperties, RecordReader reader) {
-      this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName()));
-      this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0);
-      this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0);
-      this.footerBuffer = Lists.newLinkedList();
-      this.continuance = false;
-      this.holderIndex = -1;
-      this.valueHolder = initializeValueHolder(reader, footerCount);
-      this.actualCount = 0;
-      this.tempCount = 0;
-    }
-
-    private boolean doSkipHeader(int recordCount) {
-      return !continuance && recordCount < headerCount;
-    }
-
-    private void reset() {
-      tempCount = holderIndex + 1;
-      actualCount = 0;
-      if (!continuance) {
-        footerBuffer.clear();
-      }
-    }
-
-    private Object bufferAdd(Object value) throws SerDeException {
-      footerBuffer.add(value);
-      if (footerBuffer.size() <= footerCount) {
-        return null;
-      }
-      return footerBuffer.poll();
-    }
-
-    private Object getNextValue() {
-      holderIndex = tempCount % getHolderSize();
-      return valueHolder.get(holderIndex);
-    }
-
-    private int getHolderSize() {
-      return valueHolder.size();
-    }
-
-    private void updateContinuance() {
-      this.continuance = actualCount != 0;
-    }
-
-    private int incrementTempCount() {
-      return ++tempCount;
-    }
-
-    private int getActualCount() {
-      return actualCount;
-    }
-
-    private int incrementActualCount() {
-      return ++actualCount;
-    }
-
-    /**
-     * Retrieves positive numeric property from Properties object by name.
-     * Return default value if
-     * 1. file format is absent in predefined file formats list
-     * 2. property doesn't exist in table properties
-     * 3. property value is negative
-     * otherwise casts value to int.
-     *
-     * @param tableProperties property holder
-     * @param propertyName    name of the property
-     * @param defaultValue    default value
-     * @return property numeric value
-     * @throws NumberFormatException if property value is non-numeric
-     */
-    private int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) {
-      int propertyIntValue = defaultValue;
-      if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) {
-        return propertyIntValue;
-      }
-      Object propertyObject = tableProperties.get(propertyName);
-      if (propertyObject != null) {
-        try {
-          propertyIntValue = Integer.valueOf((String) propertyObject);
-        } catch (NumberFormatException e) {
-          throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString()));
-        }
-      }
-      return propertyIntValue < 0 ? defaultValue : propertyIntValue;
-    }
-
-    /**
-     * Creates buffer of objects to be used as values, so these values can be re-used.
-     * Objects number depends on number of lines to skip in the end of the file plus one object.
-     *
-     * @param reader          RecordReader to return value object
-     * @param skipFooterLines number of lines to skip at the end of the file
-     * @return list of objects to be used as values
-     */
-    private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) {
-      List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1);
-      for (int i = 0; i <= skipFooterLines; i++) {
-        valueHolder.add(reader.createValue());
-      }
-      return valueHolder;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/68bd27a1/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index eee7343..7aece71 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -17,7 +17,10 @@
  */
 package org.apache.drill.exec.store.hive;
 
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -29,14 +32,33 @@ import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 
 import com.google.common.collect.Lists;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 
 @SuppressWarnings("unused")
 public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
 
+  /**
+   * Use different classes for different Hive native formats:
+   * ORC, AVRO, RCFFile, Text and Parquet.
+   * If input format is none of them falls to default reader.
+   */
+  static Map<String, Class> readerMap = new HashMap<>();
+  static {
+    readerMap.put(OrcInputFormat.class.getCanonicalName(), HiveOrcReader.class);
+    readerMap.put(AvroContainerInputFormat.class.getCanonicalName(), HiveAvroReader.class);
+    readerMap.put(RCFileInputFormat.class.getCanonicalName(), HiveRCFileReader.class);
+    readerMap.put(MapredParquetInputFormat.class.getCanonicalName(), HiveParquetReader.class);
+    readerMap.put(TextInputFormat.class.getCanonicalName(), HiveTextReader.class);
+  }
+
   @Override
   public ScanBatch getBatch(FragmentContext context, HiveSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
@@ -51,29 +73,27 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
 
     final HiveConf hiveConf = config.getHiveConf();
 
-    // Native hive text record reader doesn't handle all types currently. For now use HiveRecordReader which uses
-    // Hive InputFormat and SerDe classes to read the data.
-    //if (table.getSd().getInputFormat().equals(TextInputFormat.class.getCanonicalName()) &&
-    //        table.getSd().getSerdeInfo().getSerializationLib().equals(LazySimpleSerDe.class.getCanonicalName()) &&
-    //        config.getColumns() != null) {
-    //  for (InputSplit split : splits) {
-    //    readers.add(new HiveTextRecordReader(table,
-    //        (hasPartitions ? partitions.get(i++) : null),
-    //        split, config.getColumns(), context));
-    //  }
-    //} else {
+    final String formatName = table.getSd().getInputFormat();
+    Class<? extends HiveAbstractReader> readerClass = HiveDefaultReader.class;
+    if (readerMap.containsKey(formatName)) {
+      readerClass = readerMap.get(formatName);
+    }
+    Constructor<? extends HiveAbstractReader> readerConstructor = null;
+    try {
+      readerConstructor = readerClass.getConstructor(Table.class, Partition.class,
+          InputSplit.class, List.class, FragmentContext.class, HiveConf.class,
+          UserGroupInformation.class);
       for (InputSplit split : splits) {
-        readers.add(new HiveRecordReader(table,
+        readers.add(readerConstructor.newInstance(table,
             (hasPartitions ? partitions.get(i++) : null), split, config.getColumns(), context, hiveConf, proxyUgi));
       }
-    //}
-
-    // If there are no readers created (which is possible when the table is empty), create an empty RecordReader to
-    // output the schema
-    if (readers.size() == 0) {
-      readers.add(new HiveRecordReader(table, null, null, config.getColumns(), context, hiveConf, proxyUgi));
+      if (readers.size() == 0) {
+        readers.add(readerConstructor.newInstance(
+            table, null, null, config.getColumns(), context, hiveConf, proxyUgi));
+      }
+    } catch(Exception e) {
+      logger.error("No constructor for {}, thrown {}", readerClass.getName(), e);
     }
-
     return new ScanBatch(config, context, readers.iterator());
   }
 }


Mime
View raw message