hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [29/37] hive git commit: HIVE-17118. Move the hive-orc source files to make the package names unique.
Date Wed, 19 Jul 2017 16:58:52 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/hive/orc/impl/RecordReaderImpl.java
new file mode 100644
index 0000000..d0edce5
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RecordReaderImpl.java
@@ -0,0 +1,1238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hive.orc.BooleanColumnStatistics;
+import org.apache.hive.orc.ColumnStatistics;
+import org.apache.hive.orc.CompressionCodec;
+import org.apache.hive.orc.DoubleColumnStatistics;
+import org.apache.hive.orc.IntegerColumnStatistics;
+import org.apache.hive.orc.Reader;
+import org.apache.hive.orc.RecordReader;
+import org.apache.hive.orc.TypeDescription;
+import org.apache.hive.orc.DataReader;
+import org.apache.hive.orc.DateColumnStatistics;
+import org.apache.hive.orc.DecimalColumnStatistics;
+import org.apache.hive.orc.OrcConf;
+import org.apache.hive.orc.StringColumnStatistics;
+import org.apache.hive.orc.StripeInformation;
+import org.apache.hive.orc.TimestampColumnStatistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hive.orc.BloomFilterIO;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.ql.util.TimestampUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hive.orc.OrcProto;
+
+public class RecordReaderImpl implements RecordReader {
+  static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
+  private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
+  private static final Object UNKNOWN_VALUE = new Object();
+  protected final Path path;
+  private final long firstRow;
+  private final List<StripeInformation> stripes =
+      new ArrayList<StripeInformation>();
+  private OrcProto.StripeFooter stripeFooter;
+  private final long totalRowCount;
+  private final CompressionCodec codec;
+  protected final TypeDescription schema;
+  private final List<OrcProto.Type> types;
+  private final int bufferSize;
+  private final SchemaEvolution evolution;
+  // the file included columns indexed by the file's column ids.
+  private final boolean[] included;
+  private final long rowIndexStride;
+  private long rowInStripe = 0;
+  private int currentStripe = -1;
+  private long rowBaseInStripe = 0;
+  private long rowCountInStripe = 0;
+  private final Map<StreamName, InStream> streams =
+      new HashMap<StreamName, InStream>();
+  DiskRangeList bufferChunks = null;
+  private final TreeReaderFactory.TreeReader reader;
+  private final OrcProto.RowIndex[] indexes;
+  private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+  private final SargApplier sargApp;
+  // an array about which row groups aren't skipped
+  private boolean[] includedRowGroups = null;
+  private final DataReader dataReader;
+
+  /**
+   * Given a list of column names, find the given column and return the index.
+   *
+   * @param evolution the mapping from reader to file schema
+   * @param columnName  the column name to look for
+   * @return the file column id or -1 if the column wasn't found
+   */
+  static int findColumns(SchemaEvolution evolution,
+                         String columnName) {
+    TypeDescription readerSchema = evolution.getReaderBaseSchema();
+    List<String> fieldNames = readerSchema.getFieldNames();
+    List<TypeDescription> children = readerSchema.getChildren();
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      if (columnName.equals(fieldNames.get(i))) {
+        TypeDescription result = evolution.getFileType(children.get(i).getId());
+        return result == null ? -1 : result.getId();
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Find the mapping from predicate leaves to columns.
+   * @param sargLeaves the search argument that we need to map
+   * @param evolution the mapping from reader to file schema
+   * @return an array mapping the sarg leaves to file column ids
+   */
+  public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
+                             SchemaEvolution evolution) {
+    int[] result = new int[sargLeaves.size()];
+    Arrays.fill(result, -1);
+    for(int i=0; i < result.length; ++i) {
+      String colName = sargLeaves.get(i).getColumnName();
+      result[i] = findColumns(evolution, colName);
+    }
+    return result;
+  }
+
+  protected RecordReaderImpl(ReaderImpl fileReader,
+                             Reader.Options options) throws IOException {
+    boolean[] readerIncluded = options.getInclude();
+    if (options.getSchema() == null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Reader schema not provided -- using file schema " +
+            fileReader.getSchema());
+      }
+      evolution = new SchemaEvolution(fileReader.getSchema(), readerIncluded);
+    } else {
+
+      // Now that we are creating a record reader for a file, validate that the schema to read
+      // is compatible with the file schema.
+      //
+      evolution = new SchemaEvolution(fileReader.getSchema(),
+          options.getSchema(), readerIncluded);
+      if (LOG.isDebugEnabled() && evolution.hasConversion()) {
+        LOG.debug("ORC file " + fileReader.path.toString() +
+            " has data type conversion --\n" +
+            "reader schema: " + options.getSchema().toString() + "\n" +
+            "file schema:   " + fileReader.getSchema());
+      }
+    }
+    this.schema = evolution.getReaderSchema();
+    this.path = fileReader.path;
+    this.codec = fileReader.codec;
+    this.types = fileReader.types;
+    this.bufferSize = fileReader.bufferSize;
+    this.rowIndexStride = fileReader.rowIndexStride;
+    SearchArgument sarg = options.getSearchArgument();
+    if (sarg != null && rowIndexStride != 0) {
+      sargApp = new SargApplier(sarg, options.getColumnNames(), rowIndexStride,
+          evolution);
+    } else {
+      sargApp = null;
+    }
+    long rows = 0;
+    long skippedRows = 0;
+    long offset = options.getOffset();
+    long maxOffset = options.getMaxOffset();
+    for(StripeInformation stripe: fileReader.getStripes()) {
+      long stripeStart = stripe.getOffset();
+      if (offset > stripeStart) {
+        skippedRows += stripe.getNumberOfRows();
+      } else if (stripeStart < maxOffset) {
+        this.stripes.add(stripe);
+        rows += stripe.getNumberOfRows();
+      }
+    }
+
+    Boolean zeroCopy = options.getUseZeroCopy();
+    if (zeroCopy == null) {
+      zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
+    }
+    if (options.getDataReader() != null) {
+      this.dataReader = options.getDataReader();
+    } else {
+      this.dataReader = RecordReaderUtils.createDefaultDataReader(
+          DataReaderProperties.builder()
+              .withBufferSize(bufferSize)
+              .withCompression(fileReader.compressionKind)
+              .withFileSystem(fileReader.fileSystem)
+              .withPath(fileReader.path)
+              .withTypeCount(types.size())
+              .withZeroCopy(zeroCopy)
+              .build());
+    }
+    this.dataReader.open();
+
+    firstRow = skippedRows;
+    totalRowCount = rows;
+    Boolean skipCorrupt = options.getSkipCorruptRecords();
+    if (skipCorrupt == null) {
+      skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
+    }
+
+    reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
+        evolution, readerIncluded, skipCorrupt);
+    indexes = new OrcProto.RowIndex[types.size()];
+    bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+    this.included = evolution.getFileIncluded();
+    advanceToNextRow(reader, 0L, true);
+  }
+
+  public static final class PositionProviderImpl implements PositionProvider {
+    private final OrcProto.RowIndexEntry entry;
+    private int index;
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+      this(entry, 0);
+    }
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
+      this.entry = entry;
+      this.index = startPos;
+    }
+
+    @Override
+    public long getNext() {
+      return entry.getPositions(index++);
+    }
+
+    @Override
+    public String toString() {
+      return "{" + entry.getPositionsList() + "; " + index + "}";
+    }
+  }
+
+  public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
+                                                ) throws IOException {
+    return dataReader.readStripeFooter(stripe);
+  }
+
+  enum Location {
+    BEFORE, MIN, MIDDLE, MAX, AFTER
+  }
+
+  /**
+   * Given a point and min and max, determine if the point is before, at the
+   * min, in the middle, at the max, or after the range.
+   * @param point the point to test
+   * @param min the minimum point
+   * @param max the maximum point
+   * @param <T> the type of the comparision
+   * @return the location of the point
+   */
+  static <T> Location compareToRange(Comparable<T> point, T min, T max) {
+    int minCompare = point.compareTo(min);
+    if (minCompare < 0) {
+      return Location.BEFORE;
+    } else if (minCompare == 0) {
+      return Location.MIN;
+    }
+    int maxCompare = point.compareTo(max);
+    if (maxCompare > 0) {
+      return Location.AFTER;
+    } else if (maxCompare == 0) {
+      return Location.MAX;
+    }
+    return Location.MIDDLE;
+  }
+
+  /**
+   * Get the maximum value out of an index entry.
+   * @param index
+   *          the index entry
+   * @return the object for the maximum value or null if there isn't one
+   */
+  static Object getMax(ColumnStatistics index) {
+    if (index instanceof IntegerColumnStatistics) {
+      return ((IntegerColumnStatistics) index).getMaximum();
+    } else if (index instanceof DoubleColumnStatistics) {
+      return ((DoubleColumnStatistics) index).getMaximum();
+    } else if (index instanceof StringColumnStatistics) {
+      return ((StringColumnStatistics) index).getMaximum();
+    } else if (index instanceof DateColumnStatistics) {
+      return ((DateColumnStatistics) index).getMaximum();
+    } else if (index instanceof DecimalColumnStatistics) {
+      return ((DecimalColumnStatistics) index).getMaximum();
+    } else if (index instanceof TimestampColumnStatistics) {
+      return ((TimestampColumnStatistics) index).getMaximum();
+    } else if (index instanceof BooleanColumnStatistics) {
+      if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
+        return Boolean.TRUE;
+      } else {
+        return Boolean.FALSE;
+      }
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get the minimum value out of an index entry.
+   * @param index
+   *          the index entry
+   * @return the object for the minimum value or null if there isn't one
+   */
+  static Object getMin(ColumnStatistics index) {
+    if (index instanceof IntegerColumnStatistics) {
+      return ((IntegerColumnStatistics) index).getMinimum();
+    } else if (index instanceof DoubleColumnStatistics) {
+      return ((DoubleColumnStatistics) index).getMinimum();
+    } else if (index instanceof StringColumnStatistics) {
+      return ((StringColumnStatistics) index).getMinimum();
+    } else if (index instanceof DateColumnStatistics) {
+      return ((DateColumnStatistics) index).getMinimum();
+    } else if (index instanceof DecimalColumnStatistics) {
+      return ((DecimalColumnStatistics) index).getMinimum();
+    } else if (index instanceof TimestampColumnStatistics) {
+      return ((TimestampColumnStatistics) index).getMinimum();
+    } else if (index instanceof BooleanColumnStatistics) {
+      if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
+        return Boolean.FALSE;
+      } else {
+        return Boolean.TRUE;
+      }
+    } else {
+      return UNKNOWN_VALUE; // null is not safe here
+    }
+  }
+
+  /**
+   * Evaluate a predicate with respect to the statistics from the column
+   * that is referenced in the predicate.
+   * @param statsProto the statistics for the column mentioned in the predicate
+   * @param predicate the leaf predicate we need to evaluation
+   * @param bloomFilter
+   * @return the set of truth values that may be returned for the given
+   *   predicate.
+   */
+  static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
+      PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
+    ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
+    Object minValue = getMin(cs);
+    Object maxValue = getMax(cs);
+    BloomFilterIO bf = null;
+    if (bloomFilter != null) {
+      bf = new BloomFilterIO(bloomFilter);
+    }
+    return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
+  }
+
+  /**
+   * Evaluate a predicate with respect to the statistics from the column
+   * that is referenced in the predicate.
+   * @param stats the statistics for the column mentioned in the predicate
+   * @param predicate the leaf predicate we need to evaluation
+   * @return the set of truth values that may be returned for the given
+   *   predicate.
+   */
+  public static TruthValue evaluatePredicate(ColumnStatistics stats,
+                                             PredicateLeaf predicate,
+                                             BloomFilterIO bloomFilter) {
+    Object minValue = getMin(stats);
+    Object maxValue = getMax(stats);
+    return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
+  }
+
+  static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
+      Object max, boolean hasNull, BloomFilterIO bloomFilter) {
+    // if we didn't have any values, everything must have been null
+    if (min == null) {
+      if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+        return TruthValue.YES;
+      } else {
+        return TruthValue.NULL;
+      }
+    } else if (min == UNKNOWN_VALUE) {
+      return TruthValue.YES_NO_NULL;
+    }
+
+    // TODO: Enabling PPD for timestamp requires ORC-101 and ORC-135
+    if (min != null && min instanceof Timestamp) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not using predication pushdown on {} because it doesn't " +
+          "include ORC-135.", predicate.getColumnName());
+      }
+      return TruthValue.YES_NO_NULL;
+    }
+
+    TruthValue result;
+    Object baseObj = predicate.getLiteral();
+    try {
+      // Predicate object and stats objects are converted to the type of the predicate object.
+      Object minValue = getBaseObjectForComparison(predicate.getType(), min);
+      Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
+      Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
+
+      result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
+      if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
+        result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
+      }
+      // in case failed conversion, return the default YES_NO_NULL truth value
+    } catch (Exception e) {
+      if (LOG.isDebugEnabled()) {
+        final String statsType = min == null ?
+            (max == null ? "null" : max.getClass().getSimpleName()) :
+            min.getClass().getSimpleName();
+        final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName();
+        final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
+            " Skipping ORC PPD." +
+            " Exception: " + e.getMessage() +
+            " StatsType: " + statsType +
+            " PredicateType: " + predicateType;
+        LOG.debug(reason);
+      }
+      if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) {
+        result = TruthValue.YES_NO;
+      } else {
+        result = TruthValue.YES_NO_NULL;
+      }
+    }
+    return result;
+  }
+
+  private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
+      TruthValue result, BloomFilterIO bloomFilter) {
+    // evaluate bloom filter only when
+    // 1) Bloom filter is available
+    // 2) Min/Max evaluation yield YES or MAYBE
+    // 3) Predicate is EQUALS or IN list
+    if (bloomFilter != null
+        && result != TruthValue.NO_NULL && result != TruthValue.NO
+        && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
+            || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+            || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
+      return true;
+    }
+    return false;
+  }
+
+  private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
+      Object minValue,
+      Object maxValue,
+      boolean hasNull) {
+    Location loc;
+
+    switch (predicate.getOperator()) {
+      case NULL_SAFE_EQUALS:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return TruthValue.NO;
+        } else {
+          return TruthValue.YES_NO;
+        }
+      case EQUALS:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (minValue.equals(maxValue) && loc == Location.MIN) {
+          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+        } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case LESS_THAN:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (loc == Location.AFTER) {
+          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+        } else if (loc == Location.BEFORE || loc == Location.MIN) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case LESS_THAN_EQUALS:
+        loc = compareToRange((Comparable) predObj, minValue, maxValue);
+        if (loc == Location.AFTER || loc == Location.MAX) {
+          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+        } else if (loc == Location.BEFORE) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case IN:
+        if (minValue.equals(maxValue)) {
+          // for a single value, look through to see if that value is in the
+          // set
+          for (Object arg : predicate.getLiteralList()) {
+            predObj = getBaseObjectForComparison(predicate.getType(), arg);
+            loc = compareToRange((Comparable) predObj, minValue, maxValue);
+            if (loc == Location.MIN) {
+              return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+            }
+          }
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          // are all of the values outside of the range?
+          for (Object arg : predicate.getLiteralList()) {
+            predObj = getBaseObjectForComparison(predicate.getType(), arg);
+            loc = compareToRange((Comparable) predObj, minValue, maxValue);
+            if (loc == Location.MIN || loc == Location.MIDDLE ||
+                loc == Location.MAX) {
+              return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+            }
+          }
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        }
+      case BETWEEN:
+        List<Object> args = predicate.getLiteralList();
+        Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));
+
+        loc = compareToRange((Comparable) predObj1, minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.MIN) {
+          Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
+
+          Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
+          if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+            return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
+          } else if (loc2 == Location.BEFORE) {
+            return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+          } else {
+            return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+          }
+        } else if (loc == Location.AFTER) {
+          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+        } else {
+          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+        }
+      case IS_NULL:
+        // min = null condition above handles the all-nulls YES case
+        return hasNull ? TruthValue.YES_NO : TruthValue.NO;
+      default:
+        return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    }
+  }
+
+  private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
+      final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
+    switch (predicate.getOperator()) {
+      case NULL_SAFE_EQUALS:
+        // null safe equals does not return *_NULL variant. So set hasNull to false
+        return checkInBloomFilter(bloomFilter, predObj, false);
+      case EQUALS:
+        return checkInBloomFilter(bloomFilter, predObj, hasNull);
+      case IN:
+        for (Object arg : predicate.getLiteralList()) {
+          // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
+          Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
+          TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
+          if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
+            return result;
+          }
+        }
+        return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+      default:
+        return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    }
+  }
+
+  private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
+    TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
+
+    if (predObj instanceof Long) {
+      if (bf.testLong(((Long) predObj).longValue())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Double) {
+      if (bf.testDouble(((Double) predObj).doubleValue())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof String || predObj instanceof Text ||
+        predObj instanceof HiveDecimalWritable ||
+        predObj instanceof BigDecimal) {
+      if (bf.testString(predObj.toString())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Timestamp) {
+      if (bf.testLong(((Timestamp) predObj).getTime())) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else if (predObj instanceof Date) {
+      if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
+        result = TruthValue.YES_NO_NULL;
+      }
+    } else {
+        // if the predicate object is null and if hasNull says there are no nulls then return NO
+        if (predObj == null && !hasNull) {
+          result = TruthValue.NO;
+        } else {
+          result = TruthValue.YES_NO_NULL;
+        }
+      }
+
+    if (result == TruthValue.YES_NO_NULL && !hasNull) {
+      result = TruthValue.YES_NO;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Bloom filter evaluation: " + result.toString());
+    }
+
+    return result;
+  }
+
+  private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) {
+    if (obj == null) {
+      return null;
+    }
+    switch (type) {
+      case BOOLEAN:
+        if (obj instanceof Boolean) {
+          return obj;
+        } else {
+          // will only be true if the string conversion yields "true", all other values are
+          // considered false
+          return Boolean.valueOf(obj.toString());
+        }
+      case DATE:
+        if (obj instanceof Date) {
+          return obj;
+        } else if (obj instanceof String) {
+          return Date.valueOf((String) obj);
+        } else if (obj instanceof Timestamp) {
+          return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
+        }
+        // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
+        break;
+      case DECIMAL:
+        if (obj instanceof Boolean) {
+          return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
+              HiveDecimal.ONE : HiveDecimal.ZERO);
+        } else if (obj instanceof Integer) {
+          return new HiveDecimalWritable(((Integer) obj).intValue());
+        } else if (obj instanceof Long) {
+          return new HiveDecimalWritable(((Long) obj));
+        } else if (obj instanceof Float || obj instanceof Double ||
+            obj instanceof String) {
+          return new HiveDecimalWritable(obj.toString());
+        } else if (obj instanceof BigDecimal) {
+          return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
+        } else if (obj instanceof HiveDecimal) {
+          return new HiveDecimalWritable((HiveDecimal) obj);
+        } else if (obj instanceof HiveDecimalWritable) {
+          return obj;
+        } else if (obj instanceof Timestamp) {
+          return new HiveDecimalWritable(Double.toString(
+              TimestampUtils.getDouble((Timestamp) obj)));
+        }
+        break;
+      case FLOAT:
+        if (obj instanceof Number) {
+          // widening conversion
+          return ((Number) obj).doubleValue();
+        } else if (obj instanceof HiveDecimal) {
+          return ((HiveDecimal) obj).doubleValue();
+        } else if (obj instanceof String) {
+          return Double.valueOf(obj.toString());
+        } else if (obj instanceof Timestamp) {
+          return TimestampUtils.getDouble((Timestamp) obj);
+        } else if (obj instanceof HiveDecimal) {
+          return ((HiveDecimal) obj).doubleValue();
+        } else if (obj instanceof BigDecimal) {
+          return ((BigDecimal) obj).doubleValue();
+        }
+        break;
+      case LONG:
+        if (obj instanceof Number) {
+          // widening conversion
+          return ((Number) obj).longValue();
+        } else if (obj instanceof HiveDecimal) {
+          return ((HiveDecimal) obj).longValue();
+        } else if (obj instanceof String) {
+          return Long.valueOf(obj.toString());
+        }
+        break;
+      case STRING:
+        if (obj != null) {
+          return (obj.toString());
+        }
+        break;
+      case TIMESTAMP:
+        if (obj instanceof Timestamp) {
+          return obj;
+        } else if (obj instanceof Integer) {
+          return new Timestamp(((Number) obj).longValue());
+        } else if (obj instanceof Float) {
+          return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue());
+        } else if (obj instanceof Double) {
+          return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue());
+        } else if (obj instanceof HiveDecimal) {
+          return TimestampUtils.decimalToTimestamp((HiveDecimal) obj);
+        } else if (obj instanceof HiveDecimalWritable) {
+          return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
+        } else if (obj instanceof Date) {
+          return new Timestamp(((Date) obj).getTime());
+        }
+        // float/double conversion to timestamp is interpreted as seconds whereas integer conversion
+        // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
+        // is also config driven. The filter operator changes its promotion based on config:
+        // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
+        break;
+      default:
+        break;
+    }
+
+    throw new IllegalArgumentException(String.format(
+        "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass()
+            .getSimpleName(), type));
+  }
+
+  public static class SargApplier {
+    public final static boolean[] READ_ALL_RGS = null;
+    public final static boolean[] READ_NO_RGS = new boolean[0];
+
+    private final SearchArgument sarg;
+    private final List<PredicateLeaf> sargLeaves;
+    private final int[] filterColumns;
+    private final long rowIndexStride;
+    // same as the above array, but indices are set to true
+    private final boolean[] sargColumns;
+    private SchemaEvolution evolution;
+
+    public SargApplier(SearchArgument sarg, String[] columnNames,
+                       long rowIndexStride,
+                       SchemaEvolution evolution) {
+      this.sarg = sarg;
+      sargLeaves = sarg.getLeaves();
+      filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, evolution);
+      this.rowIndexStride = rowIndexStride;
+      // included will not be null, row options will fill the array with trues if null
+      sargColumns = new boolean[evolution.getFileIncluded().length];
+      for (int i : filterColumns) {
+        // filter columns may have -1 as index which could be partition column in SARG.
+        if (i > 0) {
+          sargColumns[i] = true;
+        }
+      }
+      this.evolution = evolution;
+    }
+
+    /**
+     * Pick the row groups that we need to load from the current stripe.
+     *
+     * @return an array with a boolean for each row group or null if all of the
+     * row groups must be read.
+     * @throws IOException
+     */
+    public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
+        OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
+      long rowsInStripe = stripe.getNumberOfRows();
+      int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+      boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
+      TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+      boolean hasSelected = false, hasSkipped = false;
+      for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
+        for (int pred = 0; pred < leafValues.length; ++pred) {
+          int columnIx = filterColumns[pred];
+          if (columnIx != -1) {
+            if (indexes[columnIx] == null) {
+              throw new AssertionError("Index is not populated for " + columnIx);
+            }
+            OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
+            if (entry == null) {
+              throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
+            }
+            OrcProto.ColumnStatistics stats = entry.getStatistics();
+            OrcProto.BloomFilter bf = null;
+            if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != null) {
+              bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
+            }
+            if (evolution != null && evolution.isPPDSafeConversion(columnIx)) {
+              leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
+            } else {
+              leafValues[pred] = TruthValue.YES_NO_NULL;
+            }
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Stats = " + stats);
+              LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
+            }
+          } else {
+            // the column is a virtual column
+            leafValues[pred] = TruthValue.YES_NO_NULL;
+          }
+        }
+        result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
+        hasSelected = hasSelected || result[rowGroup];
+        hasSkipped = hasSkipped || (!result[rowGroup]);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+              (rowIndexStride * (rowGroup + 1) - 1) + " is " +
+              (result[rowGroup] ? "" : "not ") + "included.");
+        }
+      }
+
+      return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
+    }
+  }
+
+  /**
+   * Pick the row groups that we need to load from the current stripe.
+   *
+   * @return an array with a boolean for each row group or null if all of the
+   * row groups must be read.
+   * @throws IOException
+   */
+  protected boolean[] pickRowGroups() throws IOException {
+    // if we don't have a sarg or indexes, we read everything
+    if (sargApp == null) {
+      return null;
+    }
+    readRowIndex(currentStripe, included, sargApp.sargColumns);
+    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
+  }
+
+  private void clearStreams() {
+    // explicit close of all streams to de-ref ByteBuffers
+    for (InStream is : streams.values()) {
+      is.close();
+    }
+    if (bufferChunks != null) {
+      if (dataReader.isTrackingDiskRanges()) {
+        for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
+          if (!(range instanceof BufferChunk)) {
+            continue;
+          }
+          dataReader.releaseBuffer(((BufferChunk) range).getChunk());
+        }
+      }
+    }
+    bufferChunks = null;
+    streams.clear();
+  }
+
+  /**
+   * Read the current stripe into memory.
+   *
+   * @throws IOException
+   */
+  private void readStripe() throws IOException {
+    StripeInformation stripe = beginReadStripe();
+    includedRowGroups = pickRowGroups();
+
+    // move forward to the first unskipped row
+    if (includedRowGroups != null) {
+      while (rowInStripe < rowCountInStripe &&
+          !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
+        rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
+      }
+    }
+
+    // if we haven't skipped the whole stripe, read the data
+    if (rowInStripe < rowCountInStripe) {
+      // if we aren't projecting columns or filtering rows, just read it all
+      if (included == null && includedRowGroups == null) {
+        readAllDataStreams(stripe);
+      } else {
+        readPartialDataStreams(stripe);
+      }
+      reader.startStripe(streams, stripeFooter);
+      // if we skipped the first row group, move the pointers forward
+      if (rowInStripe != 0) {
+        seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
+      }
+    }
+  }
+
+  private StripeInformation beginReadStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    clearStreams();
+    // setup the position in the stripe
+    rowCountInStripe = stripe.getNumberOfRows();
+    rowInStripe = 0;
+    rowBaseInStripe = 0;
+    for (int i = 0; i < currentStripe; ++i) {
+      rowBaseInStripe += stripes.get(i).getNumberOfRows();
+    }
+    // reset all of the indexes
+    for (int i = 0; i < indexes.length; ++i) {
+      indexes[i] = null;
+    }
+    return stripe;
+  }
+
+  private void readAllDataStreams(StripeInformation stripe) throws IOException {
+    long start = stripe.getIndexLength();
+    long end = start + stripe.getDataLength();
+    // explicitly trigger 1 big read
+    DiskRangeList toRead = new DiskRangeList(start, end);
+    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+    List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
+    createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+  }
+
+  /**
+   * Plan the ranges of the file that we need to read given the list of
+   * columns and row groups.
+   *
+   * @param streamList        the list of streams available
+   * @param indexes           the indexes that have been loaded
+   * @param includedColumns   which columns are needed
+   * @param includedRowGroups which row groups are needed
+   * @param isCompressed      does the file have generic compression
+   * @param encodings         the encodings for each column
+   * @param types             the types of the columns
+   * @param compressionSize   the compression block size
+   * @return the list of disk ranges that will be loaded
+   */
+  static DiskRangeList planReadPartialDataStreams
+  (List<OrcProto.Stream> streamList,
+      OrcProto.RowIndex[] indexes,
+      boolean[] includedColumns,
+      boolean[] includedRowGroups,
+      boolean isCompressed,
+      List<OrcProto.ColumnEncoding> encodings,
+      List<OrcProto.Type> types,
+      int compressionSize,
+      boolean doMergeBuffers) {
+    long offset = 0;
+    // figure out which columns have a present stream
+    boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
+    CreateHelper list = new CreateHelper();
+    for (OrcProto.Stream stream : streamList) {
+      long length = stream.getLength();
+      int column = stream.getColumn();
+      OrcProto.Stream.Kind streamKind = stream.getKind();
+      // since stream kind is optional, first check if it exists
+      if (stream.hasKind() &&
+          (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
+          (column < includedColumns.length && includedColumns[column])) {
+        // if we aren't filtering or it is a dictionary, load it.
+        if (includedRowGroups == null
+            || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
+          RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
+        } else {
+          RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
+              isCompressed, indexes[column], encodings.get(column), types.get(column),
+              compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
+        }
+      }
+      offset += length;
+    }
+    return list.extract();
+  }
+
+  void createStreams(List<OrcProto.Stream> streamDescriptions,
+      DiskRangeList ranges,
+      boolean[] includeColumn,
+      CompressionCodec codec,
+      int bufferSize,
+      Map<StreamName, InStream> streams) throws IOException {
+    long streamOffset = 0;
+    for (OrcProto.Stream streamDesc : streamDescriptions) {
+      int column = streamDesc.getColumn();
+      if ((includeColumn != null &&
+          (column < included.length && !includeColumn[column])) ||
+          streamDesc.hasKind() &&
+              (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
+        streamOffset += streamDesc.getLength();
+        continue;
+      }
+      List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
+          ranges, streamOffset, streamDesc.getLength());
+      StreamName name = new StreamName(column, streamDesc.getKind());
+      streams.put(name, InStream.create(name.toString(), buffers,
+          streamDesc.getLength(), codec, bufferSize));
+      streamOffset += streamDesc.getLength();
+    }
+  }
+
+  private void readPartialDataStreams(StripeInformation stripe) throws IOException {
+    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+    DiskRangeList toRead = planReadPartialDataStreams(streamList,
+        indexes, included, includedRowGroups, codec != null,
+        stripeFooter.getColumnsList(), types, bufferSize, true);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
+    }
+    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
+    }
+
+    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+  }
+
+  /**
+   * Read the next stripe until we find a row that we don't skip.
+   *
+   * @throws IOException
+   */
+  private void advanceStripe() throws IOException {
+    rowInStripe = rowCountInStripe;
+    while (rowInStripe >= rowCountInStripe &&
+        currentStripe < stripes.size() - 1) {
+      currentStripe += 1;
+      readStripe();
+    }
+  }
+
+  /**
+   * Skip over rows that we aren't selecting, so that the next row is
+   * one that we will read.
+   *
+   * @param nextRow the row we want to go to
+   * @throws IOException
+   */
+  private boolean advanceToNextRow(
+      TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe)
+      throws IOException {
+    long nextRowInStripe = nextRow - rowBaseInStripe;
+    // check for row skipping
+    if (rowIndexStride != 0 &&
+        includedRowGroups != null &&
+        nextRowInStripe < rowCountInStripe) {
+      int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+      if (!includedRowGroups[rowGroup]) {
+        while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
+          rowGroup += 1;
+        }
+        if (rowGroup >= includedRowGroups.length) {
+          if (canAdvanceStripe) {
+            advanceStripe();
+          }
+          return canAdvanceStripe;
+        }
+        nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
+      }
+    }
+    if (nextRowInStripe >= rowCountInStripe) {
+      if (canAdvanceStripe) {
+        advanceStripe();
+      }
+      return canAdvanceStripe;
+    }
+    if (nextRowInStripe != rowInStripe) {
+      if (rowIndexStride != 0) {
+        int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+        seekToRowEntry(reader, rowGroup);
+        reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+      } else {
+        reader.skipRows(nextRowInStripe - rowInStripe);
+      }
+      rowInStripe = nextRowInStripe;
+    }
+    return true;
+  }
+
+  @Override
+  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+    try {
+      if (rowInStripe >= rowCountInStripe) {
+        currentStripe += 1;
+        if (currentStripe >= stripes.size()) {
+          batch.size = 0;
+          return false;
+        }
+        readStripe();
+      }
+
+      int batchSize = computeBatchSize(batch.getMaxSize());
+
+      rowInStripe += batchSize;
+      reader.setVectorColumnCount(batch.getDataColumnCount());
+      reader.nextBatch(batch, batchSize);
+      batch.selectedInUse = false;
+      batch.size = batchSize;
+      advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+      return batch.size  != 0;
+    } catch (IOException e) {
+      // Rethrow exception with file name in log message
+      throw new IOException("Error reading file: " + path, e);
+    }
+  }
+
+  private int computeBatchSize(long targetBatchSize) {
+    final int batchSize;
+    // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
+    // groups are selected then marker position is set to the end of range (subset of row groups
+    // within strip). Batch size computed out of marker position makes sure that batch size is
+    // aware of row group boundary and will not cause overflow when reading rows
+    // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
+    if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
+      int startRowGroup = (int) (rowInStripe / rowIndexStride);
+      if (!includedRowGroups[startRowGroup]) {
+        while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
+          startRowGroup += 1;
+        }
+      }
+
+      int endRowGroup = startRowGroup;
+      while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
+        endRowGroup += 1;
+      }
+
+      final long markerPosition =
+          (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
+              : rowCountInStripe;
+      batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
+
+      if (isLogDebugEnabled && batchSize < targetBatchSize) {
+        LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
+      }
+    } else {
+      batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
+    }
+    return batchSize;
+  }
+
+  @Override
+  public void close() throws IOException {
+    clearStreams();
+    dataReader.close();
+  }
+
+  @Override
+  public long getRowNumber() {
+    return rowInStripe + rowBaseInStripe + firstRow;
+  }
+
+  /**
+   * Return the fraction of rows that have been read from the selected.
+   * section of the file
+   *
+   * @return fraction between 0.0 and 1.0 of rows consumed
+   */
+  @Override
+  public float getProgress() {
+    return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
+  }
+
+  private int findStripe(long rowNumber) {
+    for (int i = 0; i < stripes.size(); i++) {
+      StripeInformation stripe = stripes.get(i);
+      if (stripe.getNumberOfRows() > rowNumber) {
+        return i;
+      }
+      rowNumber -= stripe.getNumberOfRows();
+    }
+    throw new IllegalArgumentException("Seek after the end of reader range");
+  }
+
+  public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
+                               boolean[] sargColumns) throws IOException {
+    return readRowIndex(stripeIndex, included, null, null, sargColumns);
+  }
+
+  public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
+                               OrcProto.RowIndex[] indexes,
+                               OrcProto.BloomFilterIndex[] bloomFilterIndex,
+                               boolean[] sargColumns) throws IOException {
+    StripeInformation stripe = stripes.get(stripeIndex);
+    OrcProto.StripeFooter stripeFooter = null;
+    // if this is the current stripe, use the cached objects.
+    if (stripeIndex == currentStripe) {
+      stripeFooter = this.stripeFooter;
+      indexes = indexes == null ? this.indexes : indexes;
+      bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
+      sargColumns = sargColumns == null ?
+          (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
+    }
+    return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
+        bloomFilterIndex);
+  }
+
+  private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
+      throws IOException {
+    PositionProvider[] index = new PositionProvider[indexes.length];
+    for (int i = 0; i < indexes.length; ++i) {
+      if (indexes[i] != null) {
+        index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+      }
+    }
+    reader.seek(index);
+  }
+
+  @Override
+  public void seekToRow(long rowNumber) throws IOException {
+    if (rowNumber < 0) {
+      throw new IllegalArgumentException("Seek to a negative row number " +
+          rowNumber);
+    } else if (rowNumber < firstRow) {
+      throw new IllegalArgumentException("Seek before reader range " +
+          rowNumber);
+    }
+    // convert to our internal form (rows from the beginning of slice)
+    rowNumber -= firstRow;
+
+    // move to the right stripe
+    int rightStripe = findStripe(rowNumber);
+    if (rightStripe != currentStripe) {
+      currentStripe = rightStripe;
+      readStripe();
+    }
+    readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
+
+    // if we aren't to the right row yet, advance in the stripe.
+    advanceToNextRow(reader, rowNumber, true);
+  }
+
+  private static final String TRANSLATED_SARG_SEPARATOR = "_";
+  public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
+    return rootColumn + TRANSLATED_SARG_SEPARATOR
+        + ((indexInSourceTable == null) ? -1 : indexInSourceTable);
+  }
+
+  public static int[] mapTranslatedSargColumns(
+      List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
+    int[] result = new int[sargLeaves.size()];
+    OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
+    String lastRootStr = null;
+    for (int i = 0; i < result.length; ++i) {
+      String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
+      assert rootAndIndex.length == 2;
+      String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
+      int index = Integer.parseInt(indexStr);
+      // First, check if the column even maps to anything.
+      if (index == -1) {
+        result[i] = -1;
+        continue;
+      }
+      assert index >= 0;
+      // Then, find the root type if needed.
+      if (!rootStr.equals(lastRootStr)) {
+        lastRoot = types.get(Integer.parseInt(rootStr));
+        lastRootStr = rootStr;
+      }
+      // Subtypes of the root types correspond, in order, to the columns in the table schema
+      // (disregarding schema evolution that doesn't presently work). Get the index for the
+      // corresponding subtype.
+      result[i] = lastRoot.getSubtypes(index);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RecordReaderUtils.java b/orc/src/java/org/apache/hive/orc/impl/RecordReaderUtils.java
new file mode 100644
index 0000000..16af69d
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RecordReaderUtils.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
+import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
+import org.apache.hive.orc.CompressionCodec;
+import org.apache.hive.orc.DataReader;
+import org.apache.hive.orc.OrcProto;
+import org.apache.hive.orc.StripeInformation;
+
+import com.google.common.collect.ComparisonChain;
+
+/**
+ * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
+ */
+public class RecordReaderUtils {
+  private static final HadoopShims SHIMS = HadoopShims.Factory.get();
+
+  private static class DefaultDataReader implements DataReader {
+    private FSDataInputStream file = null;
+    private final ByteBufferAllocatorPool pool;
+    private HadoopShims.ZeroCopyReaderShim zcr = null;
+    private final FileSystem fs;
+    private final Path path;
+    private final boolean useZeroCopy;
+    private final CompressionCodec codec;
+    private final int bufferSize;
+    private final int typeCount;
+
+    private DefaultDataReader(DefaultDataReader other) {
+      this.pool = other.pool;
+      this.bufferSize = other.bufferSize;
+      this.typeCount = other.typeCount;
+      this.fs = other.fs;
+      this.path = other.path;
+      this.useZeroCopy = other.useZeroCopy;
+      this.codec = other.codec;
+    }
+
+    private DefaultDataReader(DataReaderProperties properties) {
+      this.fs = properties.getFileSystem();
+      this.path = properties.getPath();
+      this.useZeroCopy = properties.getZeroCopy();
+      this.codec = PhysicalFsWriter.createCodec(properties.getCompression());
+      this.bufferSize = properties.getBufferSize();
+      this.typeCount = properties.getTypeCount();
+      if (useZeroCopy) {
+        this.pool = new ByteBufferAllocatorPool();
+      } else {
+        this.pool = null;
+      }
+    }
+
+    @Override
+    public void open() throws IOException {
+      this.file = fs.open(path);
+      if (useZeroCopy) {
+        zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+      } else {
+        zcr = null;
+      }
+    }
+
+    @Override
+    public OrcIndex readRowIndex(StripeInformation stripe,
+                                 OrcProto.StripeFooter footer,
+                                 boolean[] included,
+                                 OrcProto.RowIndex[] indexes,
+                                 boolean[] sargColumns,
+                                 OrcProto.BloomFilterIndex[] bloomFilterIndices
+                                 ) throws IOException {
+      if (file == null) {
+        open();
+      }
+      if (footer == null) {
+        footer = readStripeFooter(stripe);
+      }
+      if (indexes == null) {
+        indexes = new OrcProto.RowIndex[typeCount];
+      }
+      if (bloomFilterIndices == null) {
+        bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount];
+      }
+      long offset = stripe.getOffset();
+      List<OrcProto.Stream> streams = footer.getStreamsList();
+      for (int i = 0; i < streams.size(); i++) {
+        OrcProto.Stream stream = streams.get(i);
+        OrcProto.Stream nextStream = null;
+        if (i < streams.size() - 1) {
+          nextStream = streams.get(i+1);
+        }
+        int col = stream.getColumn();
+        int len = (int) stream.getLength();
+        // row index stream and bloom filter are interlaced, check if the sarg column contains bloom
+        // filter and combine the io to read row index and bloom filters for that column together
+        if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) {
+          boolean readBloomFilter = false;
+          if (sargColumns != null && sargColumns[col] &&
+              nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) {
+            len += nextStream.getLength();
+            i += 1;
+            readBloomFilter = true;
+          }
+          if ((included == null || included[col]) && indexes[col] == null) {
+            byte[] buffer = new byte[len];
+            file.readFully(offset, buffer, 0, buffer.length);
+            ByteBuffer bb = ByteBuffer.wrap(buffer);
+            indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
+                Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(),
+                codec, bufferSize));
+            if (readBloomFilter) {
+              bb.position((int) stream.getLength());
+              bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create(
+                  "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)),
+                  nextStream.getLength(), codec, bufferSize));
+            }
+          }
+        }
+        offset += len;
+      }
+
+      OrcIndex index = new OrcIndex(indexes, bloomFilterIndices);
+      return index;
+    }
+
+    @Override
+    public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+      if (file == null) {
+        open();
+      }
+      long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
+      int tailLength = (int) stripe.getFooterLength();
+
+      // read the footer
+      ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+      file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
+      return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer",
+          Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+          tailLength, codec, bufferSize));
+    }
+
+    @Override
+    public DiskRangeList readFileData(
+        DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
+      return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (pool != null) {
+        pool.clear();
+      }
+      // close both zcr and file
+      try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
+        if (file != null) {
+          file.close();
+        }
+      }
+    }
+
+    @Override
+    public boolean isTrackingDiskRanges() {
+      return zcr != null;
+    }
+
+    @Override
+    public void releaseBuffer(ByteBuffer buffer) {
+      zcr.releaseBuffer(buffer);
+    }
+
+    @Override
+    public DataReader clone() {
+      return new DefaultDataReader(this);
+    }
+
+  }
+
+  public static DataReader createDefaultDataReader(DataReaderProperties properties) {
+    return new DefaultDataReader(properties);
+  }
+
+  public static boolean[] findPresentStreamsByColumn(
+      List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
+    boolean[] hasNull = new boolean[types.size()];
+    for(OrcProto.Stream stream: streamList) {
+      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
+        hasNull[stream.getColumn()] = true;
+      }
+    }
+    return hasNull;
+  }
+
+  /**
+   * Does region A overlap region B? The end points are inclusive on both sides.
+   * @param leftA A's left point
+   * @param rightA A's right point
+   * @param leftB B's left point
+   * @param rightB B's right point
+   * @return Does region A overlap region B?
+   */
+  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+    if (leftA <= leftB) {
+      return rightA >= leftB;
+    }
+    return rightB >= leftA;
+  }
+
+  public static void addEntireStreamToRanges(
+      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+    list.addOrMerge(offset, offset + length, doMergeBuffers, false);
+  }
+
+  public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
+      boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
+      OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
+      long offset, long length, CreateHelper list, boolean doMergeBuffers) {
+    for (int group = 0; group < includedRowGroups.length; ++group) {
+      if (!includedRowGroups[group]) continue;
+      int posn = getIndexPosition(
+          encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
+      long start = index.getEntry(group).getPositions(posn);
+      final long nextGroupOffset;
+      boolean isLast = group == (includedRowGroups.length - 1);
+      nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
+
+      start += offset;
+      long end = offset + estimateRgEndOffset(
+          isCompressed, isLast, nextGroupOffset, length, compressionSize);
+      list.addOrMerge(start, end, doMergeBuffers, true);
+    }
+  }
+
+  public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
+      long nextGroupOffset, long streamLength, int bufferSize) {
+    // figure out the worst case last location
+    // if adjacent groups have the same compressed block offset then stretch the slop
+    // by factor of 2 to safely accommodate the next compression block.
+    // One for the current compression block and another for the next compression block.
+    long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
+    return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
+  }
+
+  private static final int BYTE_STREAM_POSITIONS = 1;
+  private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+  private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+
+  /**
+   * Get the offset in the index positions for the column that the given
+   * stream starts.
+   * @param columnEncoding the encoding of the column
+   * @param columnType the type of the column
+   * @param streamType the kind of the stream
+   * @param isCompressed is the file compressed
+   * @param hasNulls does the column have a PRESENT stream?
+   * @return the number of positions that will be used for that stream
+   */
+  public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
+                              OrcProto.Type.Kind columnType,
+                              OrcProto.Stream.Kind streamType,
+                              boolean isCompressed,
+                              boolean hasNulls) {
+    if (streamType == OrcProto.Stream.Kind.PRESENT) {
+      return 0;
+    }
+    int compressionValue = isCompressed ? 1 : 0;
+    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+    switch (columnType) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case DATE:
+      case STRUCT:
+      case MAP:
+      case LIST:
+      case UNION:
+        return base;
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+            columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+          return base;
+        } else {
+          if (streamType == OrcProto.Stream.Kind.DATA) {
+            return base;
+          } else {
+            return base + BYTE_STREAM_POSITIONS + compressionValue;
+          }
+        }
+      case BINARY:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case DECIMAL:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case TIMESTAMP:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+      default:
+        throw new IllegalArgumentException("Unknown type " + columnType);
+    }
+  }
+
+  // for uncompressed streams, what is the most overlap with the following set
+  // of rows (long vint literal group).
+  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+  /**
+   * Is this stream part of a dictionary?
+   * @return is this part of a dictionary?
+   */
+  public static boolean isDictionary(OrcProto.Stream.Kind kind,
+                              OrcProto.ColumnEncoding encoding) {
+    assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
+    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+      (kind == OrcProto.Stream.Kind.LENGTH &&
+       (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+        encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+  }
+
+  /**
+   * Build a string representation of a list of disk ranges.
+   * @param range ranges to stringify
+   * @return the resulting string
+   */
+  public static String stringifyDiskRanges(DiskRangeList range) {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("[");
+    boolean isFirst = true;
+    while (range != null) {
+      if (!isFirst) {
+        buffer.append(", {");
+      } else {
+        buffer.append("{");
+      }
+      isFirst = false;
+      buffer.append(range.toString());
+      buffer.append("}");
+      range = range.next;
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  /**
+   * Read the list of ranges from the file.
+   * @param file the file to read
+   * @param base the base of the stripe
+   * @param range the disk ranges within the stripe to read
+   * @return the bytes read for each disk range, which is the same length as
+   *    ranges
+   * @throws IOException
+   */
+  static DiskRangeList readDiskRanges(FSDataInputStream file,
+                                      HadoopShims.ZeroCopyReaderShim zcr,
+                                 long base,
+                                 DiskRangeList range,
+                                 boolean doForceDirect) throws IOException {
+    if (range == null) return null;
+    DiskRangeList prev = range.prev;
+    if (prev == null) {
+      prev = new MutateHelper(range);
+    }
+    while (range != null) {
+      if (range.hasData()) {
+        range = range.next;
+        continue;
+      }
+      int len = (int) (range.getEnd() - range.getOffset());
+      long off = range.getOffset();
+      if (zcr != null) {
+        file.seek(base + off);
+        boolean hasReplaced = false;
+        while (len > 0) {
+          ByteBuffer partial = zcr.readBuffer(len, false);
+          BufferChunk bc = new BufferChunk(partial, off);
+          if (!hasReplaced) {
+            range.replaceSelfWith(bc);
+            hasReplaced = true;
+          } else {
+            range.insertAfter(bc);
+          }
+          range = bc;
+          int read = partial.remaining();
+          len -= read;
+          off += read;
+        }
+      } else {
+        // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
+        byte[] buffer = new byte[len];
+        file.readFully((base + off), buffer, 0, buffer.length);
+        ByteBuffer bb = null;
+        if (doForceDirect) {
+          bb = ByteBuffer.allocateDirect(len);
+          bb.put(buffer);
+          bb.position(0);
+          bb.limit(len);
+        } else {
+          bb = ByteBuffer.wrap(buffer);
+        }
+        range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
+      }
+      range = range.next;
+    }
+    return prev.next;
+  }
+
+
+  static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
+    // This assumes sorted ranges (as do many other parts of ORC code.
+    ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
+    if (length == 0) return buffers;
+    long streamEnd = offset + length;
+    boolean inRange = false;
+    while (range != null) {
+      if (!inRange) {
+        if (range.getEnd() <= offset) {
+          range = range.next;
+          continue; // Skip until we are in range.
+        }
+        inRange = true;
+        if (range.getOffset() < offset) {
+          // Partial first buffer, add a slice of it.
+          buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
+          if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
+          range = range.next;
+          continue;
+        }
+      } else if (range.getOffset() >= streamEnd) {
+        break;
+      }
+      if (range.getEnd() > streamEnd) {
+        // Partial last buffer (may also be the first buffer), add a slice of it.
+        buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
+        break;
+      }
+      // Buffer that belongs entirely to one stream.
+      // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
+      //       because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
+      buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
+      if (range.getEnd() == streamEnd) break;
+      range = range.next;
+    }
+    return buffers;
+  }
+
+  static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
+      CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
+    if ((codec == null || ((codec instanceof DirectDecompressionCodec)
+            && ((DirectDecompressionCodec) codec).isAvailable()))) {
+      /* codec is null or is available */
+      return SHIMS.getZeroCopyReader(file, pool);
+    }
+    return null;
+  }
+
+  // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
+  // which lacks a clear()/clean() operation
+  public final static class ByteBufferAllocatorPool implements HadoopShims.ByteBufferPoolShim {
+    private static final class Key implements Comparable<Key> {
+      private final int capacity;
+      private final long insertionGeneration;
+
+      Key(int capacity, long insertionGeneration) {
+        this.capacity = capacity;
+        this.insertionGeneration = insertionGeneration;
+      }
+
+      @Override
+      public int compareTo(Key other) {
+        return ComparisonChain.start().compare(capacity, other.capacity)
+            .compare(insertionGeneration, other.insertionGeneration).result();
+      }
+
+      @Override
+      public boolean equals(Object rhs) {
+        if (rhs == null) {
+          return false;
+        }
+        try {
+          Key o = (Key) rhs;
+          return (compareTo(o) == 0);
+        } catch (ClassCastException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return new HashCodeBuilder().append(capacity).append(insertionGeneration)
+            .toHashCode();
+      }
+    }
+
+    private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
+
+    private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
+
+    private long currentGeneration = 0;
+
+    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+      return direct ? directBuffers : buffers;
+    }
+
+    public void clear() {
+      buffers.clear();
+      directBuffers.clear();
+    }
+
+    @Override
+    public ByteBuffer getBuffer(boolean direct, int length) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+      Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
+      if (entry == null) {
+        return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
+            .allocate(length);
+      }
+      tree.remove(entry.getKey());
+      return entry.getValue();
+    }
+
+    @Override
+    public void putBuffer(ByteBuffer buffer) {
+      TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+      while (true) {
+        Key key = new Key(buffer.capacity(), currentGeneration++);
+        if (!tree.containsKey(key)) {
+          tree.put(key, buffer);
+          return;
+        }
+        // Buffers are indexed by (capacity, generation).
+        // If our key is not unique on the first try, we try again
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RedBlackTree.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RedBlackTree.java b/orc/src/java/org/apache/hive/orc/impl/RedBlackTree.java
new file mode 100644
index 0000000..a340c50
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RedBlackTree.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+/**
+ * A memory efficient red-black tree that does not allocate any objects per
+ * an element. This class is abstract and assumes that the child class
+ * handles the key and comparisons with the key.
+ */
+abstract class RedBlackTree {
+  public static final int NULL = -1;
+
+  // Various values controlling the offset of the data within the array.
+  private static final int LEFT_OFFSET = 0;
+  private static final int RIGHT_OFFSET = 1;
+  private static final int ELEMENT_SIZE = 2;
+
+  protected int size = 0;
+  private final DynamicIntArray data;
+  protected int root = NULL;
+  protected int lastAdd = 0;
+  private boolean wasAdd = false;
+
+  /**
+   * Create a set with the given initial capacity.
+   */
+  public RedBlackTree(int initialCapacity) {
+    data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
+  }
+
+  /**
+   * Insert a new node into the data array, growing the array as necessary.
+   *
+   * @return Returns the position of the new node.
+   */
+  private int insert(int left, int right, boolean isRed) {
+    int position = size;
+    size += 1;
+    setLeft(position, left, isRed);
+    setRight(position, right);
+    return position;
+  }
+
+  /**
+   * Compare the value at the given position to the new value.
+   * @return 0 if the values are the same, -1 if the new value is smaller and
+   *         1 if the new value is larger.
+   */
+  protected abstract int compareValue(int position);
+
+  /**
+   * Is the given node red as opposed to black? To prevent having an extra word
+   * in the data array, we just the low bit on the left child index.
+   */
+  protected boolean isRed(int position) {
+    return position != NULL &&
+        (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
+  }
+
+  /**
+   * Set the red bit true or false.
+   */
+  private void setRed(int position, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    if (isRed) {
+      data.set(offset, data.get(offset) | 1);
+    } else {
+      data.set(offset, data.get(offset) & ~1);
+    }
+  }
+
+  /**
+   * Get the left field of the given position.
+   */
+  protected int getLeft(int position) {
+    return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
+  }
+
+  /**
+   * Get the right field of the given position.
+   */
+  protected int getRight(int position) {
+    return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left pointer.
+   */
+  private void setLeft(int position, int left) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (data.get(offset) & 1));
+  }
+
+  /**
+   * Set the left field of the given position.
+   * Note that we are storing the node color in the low bit of the left pointer.
+   */
+  private void setLeft(int position, int left, boolean isRed) {
+    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
+    data.set(offset, (left << 1) | (isRed ? 1 : 0));
+  }
+
+  /**
+   * Set the right field of the given position.
+   */
+  private void setRight(int position, int right) {
+    data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
+  }
+
+  /**
+   * Insert or find a given key in the tree and rebalance the tree correctly.
+   * Rebalancing restores the red-black aspect of the tree to maintain the
+   * invariants:
+   *   1. If a node is red, both of its children are black.
+   *   2. Each child of a node has the same black height (the number of black
+   *      nodes between it and the leaves of the tree).
+   *
+   * Inserted nodes are at the leaves and are red, therefore there is at most a
+   * violation of rule 1 at the node we just put in. Instead of always keeping
+   * the parents, this routine passing down the context.
+   *
+   * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
+   * left-right mirror images of each other). See Algorighms by Cormen,
+   * Leiserson, and Rivest for the explaination of the subcases.
+   *
+   * @param node The node that we are fixing right now.
+   * @param fromLeft Did we come down from the left?
+   * @param parent Nodes' parent
+   * @param grandparent Parent's parent
+   * @param greatGrandparent Grandparent's parent
+   * @return Does parent also need to be checked and/or fixed?
+   */
+  private boolean add(int node, boolean fromLeft, int parent,
+                      int grandparent, int greatGrandparent) {
+    if (node == NULL) {
+      if (root == NULL) {
+        lastAdd = insert(NULL, NULL, false);
+        root = lastAdd;
+        wasAdd = true;
+        return false;
+      } else {
+        lastAdd = insert(NULL, NULL, true);
+        node = lastAdd;
+        wasAdd = true;
+        // connect the new node into the tree
+        if (fromLeft) {
+          setLeft(parent, node);
+        } else {
+          setRight(parent, node);
+        }
+      }
+    } else {
+      int compare = compareValue(node);
+      boolean keepGoing;
+
+      // Recurse down to find where the node needs to be added
+      if (compare < 0) {
+        keepGoing = add(getLeft(node), true, node, parent, grandparent);
+      } else if (compare > 0) {
+        keepGoing = add(getRight(node), false, node, parent, grandparent);
+      } else {
+        lastAdd = node;
+        wasAdd = false;
+        return false;
+      }
+
+      // we don't need to fix the root (because it is always set to black)
+      if (node == root || !keepGoing) {
+        return false;
+      }
+    }
+
+
+    // Do we need to fix this node? Only if there are two reds right under each
+    // other.
+    if (isRed(node) && isRed(parent)) {
+      if (parent == getLeft(grandparent)) {
+        int uncle = getRight(grandparent);
+        if (isRed(uncle)) {
+          // case 1.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getRight(parent)) {
+            // case 1.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // left-rotate on node
+            setLeft(grandparent, parent);
+            setRight(node, getLeft(parent));
+            setLeft(parent, node);
+          }
+
+          // case 1.2 and 1.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+
+          // right-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getLeft(greatGrandparent) == grandparent) {
+            setLeft(greatGrandparent, parent);
+          } else {
+            setRight(greatGrandparent, parent);
+          }
+          setLeft(grandparent, getRight(parent));
+          setRight(parent, grandparent);
+          return false;
+        }
+      } else {
+        int uncle = getLeft(grandparent);
+        if (isRed(uncle)) {
+          // case 2.1
+          setRed(parent, false);
+          setRed(uncle, false);
+          setRed(grandparent, true);
+          return true;
+        } else {
+          if (node == getLeft(parent)) {
+            // case 2.2
+            // swap node and parent
+            int tmp = node;
+            node = parent;
+            parent = tmp;
+            // right-rotate on node
+            setRight(grandparent, parent);
+            setLeft(node, getRight(parent));
+            setRight(parent, node);
+          }
+          // case 2.2 and 2.3
+          setRed(parent, false);
+          setRed(grandparent, true);
+          // left-rotate on grandparent
+          if (greatGrandparent == NULL) {
+            root = parent;
+          } else if (getRight(greatGrandparent) == grandparent) {
+            setRight(greatGrandparent, parent);
+          } else {
+            setLeft(greatGrandparent, parent);
+          }
+          setRight(grandparent, getLeft(parent));
+          setLeft(parent, grandparent);
+          return false;
+        }
+      }
+    } else {
+      return true;
+    }
+  }
+
+  /**
+   * Add the new key to the tree.
+   * @return true if the element is a new one.
+   */
+  protected boolean add() {
+    add(root, false, NULL, NULL, NULL);
+    if (wasAdd) {
+      setRed(root, false);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Get the number of elements in the set.
+   */
+  public int size() {
+    return size;
+  }
+
+  /**
+   * Reset the table to empty.
+   */
+  public void clear() {
+    root = NULL;
+    size = 0;
+    data.clear();
+  }
+
+  /**
+   * Get the buffer size in bytes.
+   */
+  public long getSizeInBytes() {
+    return data.getSizeInBytes();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthByteReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteReader.java
new file mode 100644
index 0000000..1dd5dab
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteReader.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * A reader that reads a sequence of bytes. A control byte is read before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the
+ * byte is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteReader {
+  private InStream input;
+  private final byte[] literals =
+    new byte[RunLengthByteWriter.MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private int used = 0;
+  private boolean repeat = false;
+
+  public RunLengthByteReader(InStream input) throws IOException {
+    this.input = input;
+  }
+
+  public void setInStream(InStream input) {
+    this.input = input;
+  }
+
+  private void readValues(boolean ignoreEof) throws IOException {
+    int control = input.read();
+    used = 0;
+    if (control == -1) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of buffer RLE byte from " + input);
+      }
+      used = numLiterals = 0;
+      return;
+    } else if (control < 0x80) {
+      repeat = true;
+      numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
+      int val = input.read();
+      if (val == -1) {
+        throw new EOFException("Reading RLE byte got EOF");
+      }
+      literals[0] = (byte) val;
+    } else {
+      repeat = false;
+      numLiterals = 0x100 - control;
+      int bytes = 0;
+      while (bytes < numLiterals) {
+        int result = input.read(literals, bytes, numLiterals - bytes);
+        if (result == -1) {
+          throw new EOFException("Reading RLE byte literal got EOF in " + this);
+        }
+        bytes += result;
+      }
+    }
+  }
+
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  public byte next() throws IOException {
+    byte result;
+    if (used == numLiterals) {
+      readValues(false);
+    }
+    if (repeat) {
+      result = literals[0];
+    } else {
+      result = literals[used];
+    }
+    ++used;
+    return result;
+  }
+
+  public void nextVector(ColumnVector previous, long[] data, long size)
+      throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < size; i++) {
+      if (!previous.isNull[i]) {
+        data[i] = next();
+      } else {
+        // The default value of null for int types in vectorized
+        // processing is 1, so set that if the value is null
+        data[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && ((data[0] != data[i]) ||
+              (previous.isNull[0] != previous.isNull[i]))) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  /**
+   * Read the next size bytes into the data array, skipping over any slots
+   * where isNull is true.
+   * @param isNull if non-null, skip any rows where isNull[r] is true
+   * @param data the array to read into
+   * @param size the number of elements to read
+   * @throws IOException
+   */
+  public void nextVector(boolean[] isNull, int[] data,
+                         long size) throws IOException {
+    if (isNull == null) {
+      for(int i=0; i < size; ++i) {
+        data[i] = next();
+      }
+    } else {
+      for(int i=0; i < size; ++i) {
+        if (!isNull[i]) {
+          data[i] = next();
+        }
+      }
+    }
+  }
+
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two parts
+      while (consumed > 0) {
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  public void skip(long items) throws IOException {
+    while (items > 0) {
+      if (used == numLiterals) {
+        readValues(false);
+      }
+      long consume = Math.min(items, numLiterals - used);
+      used += consume;
+      items -= consume;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+        used + "/" + numLiterals + " from " + input;
+  }
+}


Mime
View raw message