hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1668750 [5/8] - in /hive/branches/cbo: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/common/type/ common/src/java/org/apache/hadoop/hive/conf/ common/src/java/org/apache/hive/common/util/ common/s...
Date Mon, 23 Mar 2015 22:02:16 GMT
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1668750&r1=1668749&r2=1668750&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Mar 23 22:02:13 2015
@@ -30,12 +30,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,6 +42,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilter;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -67,9 +70,7 @@ import org.apache.hadoop.hive.serde2.io.
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
-import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
 import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -77,11 +78,11 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
-import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
 
 class RecordReaderImpl implements RecordReader {
 
-  private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
+  static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
   private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
   private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
 
@@ -103,20 +104,15 @@ class RecordReaderImpl implements Record
   private long rowCountInStripe = 0;
   private final Map<StreamName, InStream> streams =
       new HashMap<StreamName, InStream>();
-  List<BufferChunk> bufferChunks = new ArrayList<BufferChunk>(0);
+  DiskRangeList bufferChunks = null;
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
   private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
-  private final SearchArgument sarg;
-  // the leaf predicates for the sarg
-  private final List<PredicateLeaf> sargLeaves;
-  // an array the same length as the sargLeaves that map them to column ids
-  private final int[] filterColumns;
-  // same as the above array, but indices are set to true
-  private final boolean[] sargColumns;
+  private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
   private final Configuration conf;
+  private final MetadataReader metadata;
 
   private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
   private final ZeroCopyReaderShim zcr;
@@ -137,85 +133,9 @@ class RecordReaderImpl implements Record
     public OrcProto.BloomFilterIndex[] getBloomFilterIndex() {
       return bloomFilterIndex;
     }
-  }
-
-  // this is an implementation copied from ElasticByteBufferPool in hadoop-2,
-  // which lacks a clear()/clean() operation
-  public final static class ByteBufferAllocatorPool implements ByteBufferPoolShim {
-    private static final class Key implements Comparable<Key> {
-      private final int capacity;
-      private final long insertionGeneration;
-
-      Key(int capacity, long insertionGeneration) {
-        this.capacity = capacity;
-        this.insertionGeneration = insertionGeneration;
-      }
-
-      @Override
-      public int compareTo(Key other) {
-        return ComparisonChain.start().compare(capacity, other.capacity)
-            .compare(insertionGeneration, other.insertionGeneration).result();
-      }
-
-      @Override
-      public boolean equals(Object rhs) {
-        if (rhs == null) {
-          return false;
-        }
-        try {
-          Key o = (Key) rhs;
-          return (compareTo(o) == 0);
-        } catch (ClassCastException e) {
-          return false;
-        }
-      }
-
-      @Override
-      public int hashCode() {
-        return new HashCodeBuilder().append(capacity).append(insertionGeneration)
-            .toHashCode();
-      }
-    }
 
-    private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>();
-
-    private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>();
-
-    private long currentGeneration = 0;
-
-    private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
-      return direct ? directBuffers : buffers;
-    }
-
-    public void clear() {
-      buffers.clear();
-      directBuffers.clear();
-    }
-
-    @Override
-    public ByteBuffer getBuffer(boolean direct, int length) {
-      TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
-      Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
-      if (entry == null) {
-        return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
-            .allocate(length);
-      }
-      tree.remove(entry.getKey());
-      return entry.getValue();
-    }
-
-    @Override
-    public void putBuffer(ByteBuffer buffer) {
-      TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
-      while (true) {
-        Key key = new Key(buffer.capacity(), currentGeneration++);
-        if (!tree.containsKey(key)) {
-          tree.put(key, buffer);
-          return;
-        }
-        // Buffers are indexed by (capacity, generation).
-        // If our key is not unique on the first try, we try again
-      }
+    public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) {
+      this.rowGroupIndex = rowGroupIndex;
     }
   }
 
@@ -245,7 +165,7 @@ class RecordReaderImpl implements Record
    *                   result
    * @return an array mapping the sarg leaves to concrete column numbers
    */
-  static int[] mapSargColumns(List<PredicateLeaf> sargLeaves,
+  public static int[] mapSargColumns(List<PredicateLeaf> sargLeaves,
                              String[] columnNames,
                              int rootColumn) {
     int[] result = new int[sargLeaves.size()];
@@ -257,7 +177,7 @@ class RecordReaderImpl implements Record
     return result;
   }
 
-  RecordReaderImpl(List<StripeInformation> stripes,
+  protected RecordReaderImpl(List<StripeInformation> stripes,
                    FileSystem fileSystem,
                    Path path,
                    Reader.Options options,
@@ -274,22 +194,14 @@ class RecordReaderImpl implements Record
     this.bufferSize = bufferSize;
     this.included = options.getInclude();
     this.conf = conf;
-    this.sarg = options.getSearchArgument();
-    if (sarg != null) {
-      sargLeaves = sarg.getLeaves();
-      filterColumns = mapSargColumns(sargLeaves, options.getColumnNames(), 0);
-      // included will not be null, row options will fill the array with trues if null
-      sargColumns = new boolean[included.length];
-      for (int i : filterColumns) {
-        // filter columns may have -1 as index which could be partition column in SARG.
-        if (i > 0) {
-          sargColumns[i] = true;
-        }
-      }
+    this.rowIndexStride = strideRate;
+    this.metadata = new MetadataReader(file, codec, bufferSize, types.size());
+    SearchArgument sarg = options.getSearchArgument();
+    if (sarg != null && strideRate != 0) {
+      sargApp = new SargApplier(
+          sarg, options.getColumnNames(), strideRate, types, included.length);
     } else {
-      sargLeaves = null;
-      filterColumns = null;
-      sargColumns = null;
+      sargApp = null;
     }
     long rows = 0;
     long skippedRows = 0;
@@ -307,31 +219,28 @@ class RecordReaderImpl implements Record
 
     final boolean zeroCopy = (conf != null)
         && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY));
-
-    if (zeroCopy
-        && (codec == null || ((codec instanceof DirectDecompressionCodec)
-            && ((DirectDecompressionCodec) codec).isAvailable()))) {
-      /* codec is null or is available */
-      this.zcr = ShimLoader.getHadoopShims().getZeroCopyReader(file, pool);
-    } else {
-      this.zcr = null;
-    }
+    zcr = zeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null;
 
     firstRow = skippedRows;
     totalRowCount = rows;
-    reader = createTreeReader(path, 0, types, included, conf);
+    boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
+    reader = createTreeReader(0, types, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
     bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
-    rowIndexStride = strideRate;
-    advanceToNextRow(0L);
+    advanceToNextRow(reader, 0L, true);
   }
 
-  private static final class PositionProviderImpl implements PositionProvider {
+  public static final class PositionProviderImpl implements PositionProvider {
     private final OrcProto.RowIndexEntry entry;
-    private int index = 0;
+    private int index;
 
-    PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+      this(entry, 0);
+    }
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
       this.entry = entry;
+      this.index = startPos;
     }
 
     @Override
@@ -340,33 +249,39 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private abstract static class TreeReader {
-    protected final Path path;
+  public abstract static class TreeReader {
     protected final int columnId;
-    private BitFieldReader present = null;
+    public BitFieldReader present = null;
     protected boolean valuePresent = false;
-    protected final Configuration conf;
 
-    TreeReader(Path path, int columnId, Configuration conf) {
-      this.path = path;
+    public TreeReader(int columnId) throws IOException {
+      this(columnId, null);
+    }
+
+    public TreeReader(int columnId, InStream in) throws IOException {
       this.columnId = columnId;
-      this.conf = conf;
+      if (in == null) {
+        present = null;
+        valuePresent = true;
+      } else {
+        present = new BitFieldReader(in, 1);
+      }
     }
 
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
       if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
     IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
         InStream in,
-        boolean signed) throws IOException {
+        boolean signed, boolean skipCorrupt) throws IOException {
       switch (kind) {
       case DIRECT_V2:
       case DICTIONARY_V2:
-        return new RunLengthIntegerReaderV2(in, signed, conf);
+        return new RunLengthIntegerReaderV2(in, signed, skipCorrupt);
       case DIRECT:
       case DICTIONARY:
         return new RunLengthIntegerReader(in, signed);
@@ -395,8 +310,12 @@ class RecordReaderImpl implements Record
      * @throws IOException
      */
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    public void seek(PositionProvider index) throws IOException {
       if (present != null) {
-        present.seek(index[columnId]);
+        present.seek(index);
       }
     }
 
@@ -431,8 +350,7 @@ class RecordReaderImpl implements Record
      * @return
      * @throws IOException
      */
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
-
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       ColumnVector result = (ColumnVector) previousVector;
       if (present != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
@@ -456,11 +374,18 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class BooleanTreeReader extends TreeReader{
-    private BitFieldReader reader = null;
+  public static class BooleanTreeReader extends TreeReader {
+    protected BitFieldReader reader = null;
+
+    public BooleanTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
 
-    BooleanTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      if (data != null) {
+        reader = new BitFieldReader(data, 1);
+      }
     }
 
     @Override
@@ -474,8 +399,13 @@ class RecordReaderImpl implements Record
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -499,7 +429,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -516,11 +446,16 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class ByteTreeReader extends TreeReader{
-    private RunLengthByteReader reader = null;
+  public static class ByteTreeReader extends TreeReader{
+    protected RunLengthByteReader reader = null;
+
+    ByteTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
 
-    ByteTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      this.reader = new RunLengthByteReader(data);
     }
 
     @Override
@@ -534,8 +469,13 @@ class RecordReaderImpl implements Record
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -554,7 +494,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -576,11 +516,21 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class ShortTreeReader extends TreeReader{
-    private IntegerReader reader = null;
+  public static class ShortTreeReader extends TreeReader{
+    protected IntegerReader reader = null;
+
+    public ShortTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
 
-    ShortTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public ShortTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
+      }
     }
 
     @Override
@@ -588,7 +538,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -599,13 +549,19 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true,
+          false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -624,7 +580,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -646,11 +602,21 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class IntTreeReader extends TreeReader{
-    private IntegerReader reader = null;
+  public static class IntTreeReader extends TreeReader{
+    protected IntegerReader reader = null;
 
-    IntTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public IntTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
+
+    public IntTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
+      }
     }
 
     @Override
@@ -658,7 +624,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -669,13 +635,19 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true,
+          false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -694,7 +666,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -716,11 +688,22 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class LongTreeReader extends TreeReader{
-    private IntegerReader reader = null;
+  public static class LongTreeReader extends TreeReader{
+    protected IntegerReader reader = null;
+
+    LongTreeReader(int columnId, boolean skipCorrupt) throws IOException {
+      this(columnId, null, null, null, skipCorrupt);
+    }
 
-    LongTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public LongTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding,
+        boolean skipCorrupt)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, skipCorrupt);
+      }
     }
 
     @Override
@@ -728,7 +711,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -739,13 +722,19 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true,
+          false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -764,7 +753,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -786,13 +775,18 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class FloatTreeReader extends TreeReader{
-    private InStream stream;
+  public static class FloatTreeReader extends TreeReader{
+    protected InStream stream;
     private final SerializationUtils utils;
 
-    FloatTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public FloatTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    public FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
       this.utils = new SerializationUtils();
+      this.stream = data;
     }
 
     @Override
@@ -807,8 +801,13 @@ class RecordReaderImpl implements Record
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
+      stream.seek(index);
     }
 
     @Override
@@ -827,7 +826,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       DoubleColumnVector result = null;
       if (previousVector == null) {
         result = new DoubleColumnVector();
@@ -860,7 +859,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    void skipRows(long items) throws IOException {
+    protected void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       for(int i=0; i < items; ++i) {
         utils.readFloat(stream);
@@ -868,13 +867,18 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class DoubleTreeReader extends TreeReader{
-    private InStream stream;
+  public static class DoubleTreeReader extends TreeReader{
+    protected InStream stream;
     private final SerializationUtils utils;
 
-    DoubleTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public DoubleTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    public DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
       this.utils = new SerializationUtils();
+      this.stream = data;
     }
 
     @Override
@@ -890,8 +894,13 @@ class RecordReaderImpl implements Record
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
+      stream.seek(index);
     }
 
     @Override
@@ -910,7 +919,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       DoubleColumnVector result = null;
       if (previousVector == null) {
         result = new DoubleColumnVector();
@@ -948,15 +957,25 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class BinaryTreeReader extends TreeReader{
+  public static class BinaryTreeReader extends TreeReader{
     protected InStream stream;
     protected IntegerReader lengths = null;
 
     protected final LongColumnVector scratchlcv;
 
-    BinaryTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    BinaryTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null, null);
+    }
+
+    public BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
       scratchlcv = new LongColumnVector();
+      this.stream = data;
+      if (length != null && encoding != null) {
+        checkEncoding(encoding);
+        this.lengths = createIntegerReader(encoding.getKind(), length, false, false);
+      }
     }
 
     @Override
@@ -964,7 +983,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -977,14 +996,19 @@ class RecordReaderImpl implements Record
           OrcProto.Stream.Kind.DATA);
       stream = streams.get(name);
       lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new
-          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false);
+          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
-      lengths.seek(index[columnId]);
+      stream.seek(index);
+      lengths.seek(index);
     }
 
     @Override
@@ -1013,7 +1037,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       BytesColumnVector result = null;
       if (previousVector == null) {
         result = new BytesColumnVector();
@@ -1039,13 +1063,31 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class TimestampTreeReader extends TreeReader{
-    private IntegerReader data = null;
-    private IntegerReader nanos = null;
-    private final LongColumnVector nanoVector = new LongColumnVector();
+  public static class TimestampTreeReader extends TreeReader {
+    protected IntegerReader data = null;
+    protected IntegerReader nanos = null;
+    private final boolean skipCorrupt;
 
-    TimestampTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
+      this(columnId, null, null, null, null, skipCorrupt);
+    }
+
+    public TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
+        InStream nanosStream, OrcProto.ColumnEncoding encoding, boolean skipCorrupt)
+        throws IOException {
+      super(columnId, presentStream);
+      this.skipCorrupt = skipCorrupt;
+      if (encoding != null) {
+        checkEncoding(encoding);
+
+        if (dataStream != null) {
+          this.data = createIntegerReader(encoding.getKind(), dataStream, true, skipCorrupt);
+        }
+
+        if (nanosStream != null) {
+          this.nanos = createIntegerReader(encoding.getKind(), nanosStream, false, skipCorrupt);
+        }
+      }
     }
 
     @Override
@@ -1053,7 +1095,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1064,17 +1106,22 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       data = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.DATA)), true);
+              OrcProto.Stream.Kind.DATA)), true, skipCorrupt);
       nanos = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.SECONDARY)), false);
+              OrcProto.Stream.Kind.SECONDARY)), false, skipCorrupt);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      data.seek(index[columnId]);
-      nanos.seek(index[columnId]);
+      data.seek(index);
+      nanos.seek(index);
     }
 
     @Override
@@ -1105,7 +1152,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -1149,11 +1196,20 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class DateTreeReader extends TreeReader{
-    private IntegerReader reader = null;
+  public static class DateTreeReader extends TreeReader{
+    protected IntegerReader reader = null;
 
-    DateTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    DateTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
+
+    public DateTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        reader = createIntegerReader(encoding.getKind(), data, true, false);
+      }
     }
 
     @Override
@@ -1161,7 +1217,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1172,13 +1228,18 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -1197,7 +1258,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -1219,18 +1280,30 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class DecimalTreeReader extends TreeReader{
-    private InStream valueStream;
-    private IntegerReader scaleStream = null;
-    private LongColumnVector scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  public static class DecimalTreeReader extends TreeReader{
+    protected InStream valueStream;
+    protected IntegerReader scaleReader = null;
+    private LongColumnVector scratchScaleVector;
 
     private final int precision;
     private final int scale;
 
-    DecimalTreeReader(Path path, int columnId, int precision, int scale, Configuration conf) {
-      super(path, columnId, conf);
+    DecimalTreeReader(int columnId, int precision, int scale) throws IOException {
+      this(columnId, precision, scale, null, null, null, null);
+    }
+
+    public DecimalTreeReader(int columnId, int precision, int scale, InStream present,
+        InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
       this.precision = precision;
       this.scale = scale;
+      this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+      this.valueStream = valueStream;
+      if (scaleStream != null && encoding != null) {
+        checkEncoding(encoding);
+        this.scaleReader = createIntegerReader(encoding.getKind(), scaleStream, true, false);
+      }
     }
 
     @Override
@@ -1238,7 +1311,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1249,15 +1322,20 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       valueStream = streams.get(new StreamName(columnId,
           OrcProto.Stream.Kind.DATA));
-      scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
-          new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+      scaleReader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
+          new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      valueStream.seek(index[columnId]);
-      scaleStream.seek(index[columnId]);
+      valueStream.seek(index);
+      scaleReader.seek(index);
     }
 
     @Override
@@ -1271,14 +1349,14 @@ class RecordReaderImpl implements Record
           result = (HiveDecimalWritable) previous;
         }
         result.set(HiveDecimal.create(SerializationUtils.readBigInteger(valueStream),
-            (int) scaleStream.next()));
+            (int) scaleReader.next()));
         return HiveDecimalUtils.enforcePrecisionScale(result, precision, scale);
       }
       return null;
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       DecimalColumnVector result = null;
       if (previousVector == null) {
         result = new DecimalColumnVector(precision, scale);
@@ -1296,7 +1374,7 @@ class RecordReaderImpl implements Record
       if (result.isRepeating) {
         if (!result.isNull[0]) {
           BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-          short scaleInData = (short) scaleStream.next();
+          short scaleInData = (short) scaleReader.next();
           HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
           dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale);
           result.set(0, dec);
@@ -1304,7 +1382,7 @@ class RecordReaderImpl implements Record
       } else {
         // result vector has isNull values set, use the same to read scale vector.
         scratchScaleVector.isNull = result.isNull;
-        scaleStream.nextVector(scratchScaleVector, batchSize);
+        scaleReader.nextVector(scratchScaleVector, batchSize);
         for (int i = 0; i < batchSize; i++) {
           if (!result.isNull[i]) {
             BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
@@ -1326,7 +1404,7 @@ class RecordReaderImpl implements Record
       for(int i=0; i < items; i++) {
         SerializationUtils.readBigInteger(valueStream);
       }
-      scaleStream.skip(items);
+      scaleReader.skip(items);
     }
   }
 
@@ -1335,11 +1413,33 @@ class RecordReaderImpl implements Record
    * stripe, it creates an internal reader based on whether a direct or
    * dictionary encoding was used.
    */
-  private static class StringTreeReader extends TreeReader {
-    private TreeReader reader;
+  public static class StringTreeReader extends TreeReader {
+    protected TreeReader reader;
+
+    public StringTreeReader(int columnId) throws IOException {
+      super(columnId);
+    }
 
-    StringTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public StringTreeReader(int columnId, InStream present, InStream data, InStream length,
+        InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
+      if (encoding != null) {
+        switch (encoding.getKind()) {
+          case DIRECT:
+          case DIRECT_V2:
+            reader = new StringDirectTreeReader(columnId, present, data, length,
+                encoding.getKind());
+            break;
+          case DICTIONARY:
+          case DICTIONARY_V2:
+            reader = new StringDictionaryTreeReader(columnId, present, data, length, dictionary,
+                encoding);
+            break;
+          default:
+            throw new IllegalArgumentException("Unsupported encoding " +
+                encoding.getKind());
+        }
+      }
     }
 
     @Override
@@ -1356,11 +1456,11 @@ class RecordReaderImpl implements Record
       switch (encodings.get(columnId).getKind()) {
         case DIRECT:
         case DIRECT_V2:
-          reader = new StringDirectTreeReader(path, columnId, conf);
+          reader = new StringDirectTreeReader(columnId);
           break;
         case DICTIONARY:
         case DICTIONARY_V2:
-          reader = new StringDictionaryTreeReader(path, columnId, conf);
+          reader = new StringDictionaryTreeReader(columnId);
           break;
         default:
           throw new IllegalArgumentException("Unsupported encoding " +
@@ -1375,12 +1475,17 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    public void seek(PositionProvider index) throws IOException {
+      reader.seek(index);
+    }
+
+    @Override
     Object next(Object previous) throws IOException {
       return reader.next(previous);
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       return reader.nextVector(previousVector, batchSize);
     }
 
@@ -1393,7 +1498,7 @@ class RecordReaderImpl implements Record
   // This class collects together very similar methods for reading an ORC vector of byte arrays and
   // creating the BytesColumnVector.
   //
-  private static class BytesColumnVectorUtil {
+   public static class BytesColumnVectorUtil {
 
     private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv,
             BytesColumnVector result, long batchSize) throws IOException {
@@ -1464,15 +1569,23 @@ class RecordReaderImpl implements Record
    * A reader for string columns that are direct encoded in the current
    * stripe.
    */
-  private static class StringDirectTreeReader extends TreeReader {
-    private InStream stream;
-    private IntegerReader lengths;
-
+  public static class StringDirectTreeReader extends TreeReader {
+    public InStream stream;
+    public IntegerReader lengths;
     private final LongColumnVector scratchlcv;
 
-    StringDirectTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
-      scratchlcv = new LongColumnVector();
+    StringDirectTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null, null);
+    }
+
+    public StringDirectTreeReader(int columnId, InStream present, InStream data, InStream length,
+        OrcProto.ColumnEncoding.Kind encoding) throws IOException {
+      super(columnId, present);
+      this.scratchlcv = new LongColumnVector();
+      this.stream = data;
+      if (length != null && encoding != null) {
+        this.lengths = createIntegerReader(encoding, length, false, false);
+      }
     }
 
     @Override
@@ -1480,7 +1593,7 @@ class RecordReaderImpl implements Record
       if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
           encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1494,14 +1607,19 @@ class RecordReaderImpl implements Record
       stream = streams.get(name);
       lengths = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
-          false);
+          false, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
-      lengths.seek(index[columnId]);
+      stream.seek(index);
+      lengths.seek(index);
     }
 
     @Override
@@ -1531,7 +1649,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       BytesColumnVector result = null;
       if (previousVector == null) {
         result = new BytesColumnVector();
@@ -1561,17 +1679,34 @@ class RecordReaderImpl implements Record
    * A reader for string columns that are dictionary encoded in the current
    * stripe.
    */
-  private static class StringDictionaryTreeReader extends TreeReader {
+  public static class StringDictionaryTreeReader extends TreeReader {
     private DynamicByteArray dictionaryBuffer;
     private int[] dictionaryOffsets;
-    private IntegerReader reader;
+    public IntegerReader reader;
 
     private byte[] dictionaryBufferInBytesCache = null;
     private final LongColumnVector scratchlcv;
 
-    StringDictionaryTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    StringDictionaryTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null, null, null);
+    }
+
+    public StringDictionaryTreeReader(int columnId, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding)
+        throws IOException{
+      super(columnId, present);
       scratchlcv = new LongColumnVector();
+      if (data != null && encoding != null) {
+        this.reader = createIntegerReader(encoding.getKind(), data, false, false);
+      }
+
+      if (dictionary != null && encoding != null) {
+        readDictionaryStream(dictionary);
+      }
+
+      if (length != null && encoding != null) {
+        readDictionaryLengthStream(length, encoding);
+      }
     }
 
     @Override
@@ -1579,7 +1714,7 @@ class RecordReaderImpl implements Record
       if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
           encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1590,28 +1725,27 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
 
       // read the dictionary blob
-      int dictionarySize = encodings.get(columnId).getDictionarySize();
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
       InStream in = streams.get(name);
-      if (in != null) { // Guard against empty dictionary stream.
-        if (in.available() > 0) {
-          dictionaryBuffer = new DynamicByteArray(64, in.available());
-          dictionaryBuffer.readAll(in);
-          // Since its start of strip invalidate the cache.
-          dictionaryBufferInBytesCache = null;
-        }
-        in.close();
-      } else {
-        dictionaryBuffer = null;
-      }
+      readDictionaryStream(in);
 
       // read the lengths
       name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
       in = streams.get(name);
+      readDictionaryLengthStream(in, encodings.get(columnId));
+
+      // set up the row reader
+      name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
+      reader = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(name), false, false);
+    }
+
+    private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      int dictionarySize = encoding.getDictionarySize();
       if (in != null) { // Guard against empty LENGTH stream.
-        IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
-            .getKind(), in, false);
+        IntegerReader lenReader = createIntegerReader(encoding.getKind(), in, false, false);
         int offset = 0;
         if (dictionaryOffsets == null ||
             dictionaryOffsets.length < dictionarySize + 1) {
@@ -1625,16 +1759,31 @@ class RecordReaderImpl implements Record
         in.close();
       }
 
-      // set up the row reader
-      name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(),
-          streams.get(name), false);
+    }
+
+    private void readDictionaryStream(InStream in) throws IOException {
+      if (in != null) { // Guard against empty dictionary stream.
+        if (in.available() > 0) {
+          dictionaryBuffer = new DynamicByteArray(64, in.available());
+          dictionaryBuffer.readAll(in);
+          // Since its start of strip invalidate the cache.
+          dictionaryBufferInBytesCache = null;
+        }
+        in.close();
+      } else {
+        dictionaryBuffer = null;
+      }
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -1663,7 +1812,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       BytesColumnVector result = null;
       int offset = 0, length = 0;
       if (previousVector == null) {
@@ -1736,11 +1885,16 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class CharTreeReader extends StringTreeReader {
+  public static class CharTreeReader extends StringTreeReader {
     int maxLength;
 
-    CharTreeReader(Path path, int columnId, int maxLength, Configuration conf) {
-      super(path, columnId, conf);
+    public CharTreeReader(int columnId, int maxLength) throws IOException {
+      this(columnId, maxLength, null, null, null, null, null);
+    }
+
+    public CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, length, dictionary, encoding);
       this.maxLength = maxLength;
     }
 
@@ -1764,7 +1918,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (right trim and truncate) if necessary.
       BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
@@ -1800,11 +1954,16 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static class VarcharTreeReader extends StringTreeReader {
+  public static class VarcharTreeReader extends StringTreeReader {
     int maxLength;
 
-    VarcharTreeReader(Path path, int columnId, int maxLength, Configuration conf) {
-      super(path, columnId, conf);
+    public VarcharTreeReader(int columnId, int maxLength) throws IOException {
+      this(columnId, maxLength, null, null, null, null, null);
+    }
+
+    public VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, length, dictionary, encoding);
       this.maxLength = maxLength;
     }
 
@@ -1828,7 +1987,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (truncate) if necessary.
       BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
@@ -1867,19 +2026,23 @@ class RecordReaderImpl implements Record
   private static class StructTreeReader extends TreeReader {
     private final TreeReader[] fields;
     private final String[] fieldNames;
+    private final List<TreeReader> readers;
 
-    StructTreeReader(Path path, int columnId,
+    StructTreeReader(int columnId,
                      List<OrcProto.Type> types,
-                     boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                     boolean[] included,
+                     boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
       int fieldCount = type.getFieldNamesCount();
       this.fields = new TreeReader[fieldCount];
       this.fieldNames = new String[fieldCount];
+      this.readers = new ArrayList<TreeReader>();
       for(int i=0; i < fieldCount; ++i) {
         int subtype = type.getSubtypes(i);
         if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(path, subtype, types, included, conf);
+          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+          readers.add(this.fields[i]);
         }
         this.fieldNames[i] = type.getFieldNames(i);
       }
@@ -1922,7 +2085,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       ColumnVector[] result = null;
       if (previousVector == null) {
         result = new ColumnVector[fields.length];
@@ -1970,17 +2133,18 @@ class RecordReaderImpl implements Record
     private final TreeReader[] fields;
     private RunLengthByteReader tags;
 
-    UnionTreeReader(Path path, int columnId,
+    UnionTreeReader(int columnId,
                     List<OrcProto.Type> types,
-                    boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                    boolean[] included,
+                    boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
       int fieldCount = type.getSubtypesCount();
       this.fields = new TreeReader[fieldCount];
       for(int i=0; i < fieldCount; ++i) {
         int subtype = type.getSubtypes(i);
         if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(path, subtype, types, included, conf);
+          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
         }
       }
     }
@@ -2013,7 +2177,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for Union type");
     }
@@ -2049,13 +2213,13 @@ class RecordReaderImpl implements Record
     private final TreeReader elementReader;
     private IntegerReader lengths = null;
 
-    ListTreeReader(Path path, int columnId,
+    ListTreeReader(int columnId,
                    List<OrcProto.Type> types,
-                   boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                   boolean[] included,
+                   boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
-      elementReader = createTreeReader(path, type.getSubtypes(0), types,
-          included, conf);
+      elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt);
     }
 
     @Override
@@ -2096,7 +2260,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previous, long batchSize) throws IOException {
+    public Object nextVector(Object previous, long batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for List type");
     }
@@ -2106,7 +2270,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -2117,7 +2281,7 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       lengths = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.LENGTH)), false);
+              OrcProto.Stream.Kind.LENGTH)), false, false);
       if (elementReader != null) {
         elementReader.startStripe(streams, encodings);
       }
@@ -2139,21 +2303,21 @@ class RecordReaderImpl implements Record
     private final TreeReader valueReader;
     private IntegerReader lengths = null;
 
-    MapTreeReader(Path path,
-                  int columnId,
+    MapTreeReader(int columnId,
                   List<OrcProto.Type> types,
-                  boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                  boolean[] included,
+                  boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
       int keyColumn = type.getSubtypes(0);
       int valueColumn = type.getSubtypes(1);
       if (included == null || included[keyColumn]) {
-        keyReader = createTreeReader(path, keyColumn, types, included, conf);
+        keyReader = createTreeReader(keyColumn, types, included, skipCorrupt);
       } else {
         keyReader = null;
       }
       if (included == null || included[valueColumn]) {
-        valueReader = createTreeReader(path, valueColumn, types, included, conf);
+        valueReader = createTreeReader(valueColumn, types, included, skipCorrupt);
       } else {
         valueReader = null;
       }
@@ -2190,7 +2354,7 @@ class RecordReaderImpl implements Record
     }
 
     @Override
-    Object nextVector(Object previous, long batchSize) throws IOException {
+    public Object nextVector(Object previous, long batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for Map type");
     }
@@ -2200,7 +2364,7 @@ class RecordReaderImpl implements Record
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -2211,7 +2375,7 @@ class RecordReaderImpl implements Record
       super.startStripe(streams, encodings);
       lengths = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.LENGTH)), false);
+              OrcProto.Stream.Kind.LENGTH)), false, false);
       if (keyReader != null) {
         keyReader.startStripe(streams, encodings);
       }
@@ -2232,77 +2396,65 @@ class RecordReaderImpl implements Record
     }
   }
 
-  private static TreeReader createTreeReader(Path path,
-                                             int columnId,
+  private static TreeReader createTreeReader(int columnId,
                                              List<OrcProto.Type> types,
                                              boolean[] included,
-                                             Configuration conf
+                                             boolean skipCorrupt
                                             ) throws IOException {
     OrcProto.Type type = types.get(columnId);
     switch (type.getKind()) {
       case BOOLEAN:
-        return new BooleanTreeReader(path, columnId, conf);
+        return new BooleanTreeReader(columnId);
       case BYTE:
-        return new ByteTreeReader(path, columnId, conf);
+        return new ByteTreeReader(columnId);
       case DOUBLE:
-        return new DoubleTreeReader(path, columnId, conf);
+        return new DoubleTreeReader(columnId);
       case FLOAT:
-        return new FloatTreeReader(path, columnId, conf);
+        return new FloatTreeReader(columnId);
       case SHORT:
-        return new ShortTreeReader(path, columnId, conf);
+        return new ShortTreeReader(columnId);
       case INT:
-        return new IntTreeReader(path, columnId, conf);
+        return new IntTreeReader(columnId);
       case LONG:
-        return new LongTreeReader(path, columnId, conf);
+        return new LongTreeReader(columnId, skipCorrupt);
       case STRING:
-        return new StringTreeReader(path, columnId, conf);
+        return new StringTreeReader(columnId);
       case CHAR:
         if (!type.hasMaximumLength()) {
           throw new IllegalArgumentException("ORC char type has no length specified");
         }
-        return new CharTreeReader(path, columnId, type.getMaximumLength(), conf);
+        return new CharTreeReader(columnId, type.getMaximumLength());
       case VARCHAR:
         if (!type.hasMaximumLength()) {
           throw new IllegalArgumentException("ORC varchar type has no length specified");
         }
-        return new VarcharTreeReader(path, columnId, type.getMaximumLength(), conf);
+        return new VarcharTreeReader(columnId, type.getMaximumLength());
       case BINARY:
-        return new BinaryTreeReader(path, columnId, conf);
+        return new BinaryTreeReader(columnId);
       case TIMESTAMP:
-        return new TimestampTreeReader(path, columnId, conf);
+        return new TimestampTreeReader(columnId, skipCorrupt);
       case DATE:
-        return new DateTreeReader(path, columnId, conf);
+        return new DateTreeReader(columnId);
       case DECIMAL:
         int precision = type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION;
         int scale =  type.hasScale()? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
-        return new DecimalTreeReader(path, columnId, precision, scale, conf);
+        return new DecimalTreeReader(columnId, precision, scale);
       case STRUCT:
-        return new StructTreeReader(path, columnId, types, included, conf);
+        return new StructTreeReader(columnId, types, included, skipCorrupt);
       case LIST:
-        return new ListTreeReader(path, columnId, types, included, conf);
+        return new ListTreeReader(columnId, types, included, skipCorrupt);
       case MAP:
-        return new MapTreeReader(path, columnId, types, included, conf);
+        return new MapTreeReader(columnId, types, included, skipCorrupt);
       case UNION:
-        return new UnionTreeReader(path, columnId, types, included, conf);
+        return new UnionTreeReader(columnId, types, included, skipCorrupt);
       default:
         throw new IllegalArgumentException("Unsupported type " +
           type.getKind());
     }
   }
 
-  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
-                                         ) throws IOException {
-    long offset = stripe.getOffset() + stripe.getIndexLength() +
-        stripe.getDataLength();
-    int tailLength = (int) stripe.getFooterLength();
-
-    // read the footer
-    ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
-    file.seek(offset);
-    file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-    return OrcProto.StripeFooter.parseFrom(InStream.create("footer",
-        new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec,
-        bufferSize));
+  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
+    return metadata.readStripeFooter(stripe);
   }
 
   static enum Location {
@@ -2692,58 +2844,95 @@ class RecordReaderImpl implements Record
     return statsObj;
   }
 
+  public static class SargApplier {
+    private final SearchArgument sarg;
+    private final List<PredicateLeaf> sargLeaves;
+    private final int[] filterColumns;
+    private final long rowIndexStride;
+    private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
+    // same as the above array, but indices are set to true
+    private final boolean[] sargColumns;
+    public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
+        List<OrcProto.Type> types, int includedCount) {
+      this.sarg = sarg;
+      sargLeaves = sarg.getLeaves();
+      filterColumns = mapSargColumns(sargLeaves, columnNames, 0);
+      bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
+      this.rowIndexStride = rowIndexStride;
+      // included will not be null, row options will fill the array with trues if null
+      sargColumns = new boolean[includedCount];
+      for (int i : filterColumns) {
+        // filter columns may have -1 as index which could be partition column in SARG.
+        if (i > 0) {
+          sargColumns[i] = true;
+        }
+      }
+    }
+
+    /**
+     * Pick the row groups that we need to load from the current stripe.
+     *
+     * @return an array with a boolean for each row group or null if all of the
+     * row groups must be read.
+     * @throws IOException
+     */
+    public boolean[] pickRowGroups(
+        StripeInformation stripe, OrcProto.RowIndex[] indexes) throws IOException {
+      long rowsInStripe = stripe.getNumberOfRows();
+      int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
+      boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
+      TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+      for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
+        for (int pred = 0; pred < leafValues.length; ++pred) {
+          if (filterColumns[pred] != -1) {
+            OrcProto.ColumnStatistics stats =
+                indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
+            OrcProto.BloomFilter bf = null;
+            if (bloomFilterIndices[filterColumns[pred]] != null) {
+              bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
+            }
+            leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Stats = " + stats);
+              LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
+                  leafValues[pred]);
+            }
+          } else {
+            // the column is a virtual column
+            leafValues[pred] = TruthValue.YES_NO_NULL;
+          }
+        }
+        result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+              (rowIndexStride * (rowGroup + 1) - 1) + " is " +
+              (result[rowGroup] ? "" : "not ") + "included.");
+        }
+      }
+
+      // if we found something to skip, use the array. otherwise, return null.
+      for (boolean b : result) {
+        if (!b) {
+          return result;
+        }
+      }
+      return null;
+    }
+  }
+
   /**
    * Pick the row groups that we need to load from the current stripe.
    * @return an array with a boolean for each row group or null if all of the
    *    row groups must be read.
    * @throws IOException
    */
-  private boolean[] pickRowGroups() throws IOException {
+  protected boolean[] pickRowGroups() throws IOException {
     // if we don't have a sarg or indexes, we read everything
-    if (sarg == null || rowIndexStride == 0) {
+    if (sargApp == null) {
       return null;
     }
-    readRowIndex(currentStripe, sargColumns);
-    long rowsInStripe = stripes.get(currentStripe).getNumberOfRows();
-    int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) /
-        rowIndexStride);
-    boolean[] result = new boolean[groupsInStripe];
-    TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
-    for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
-      for(int pred=0; pred < leafValues.length; ++pred) {
-        if (filterColumns[pred] != -1) {
-          OrcProto.ColumnStatistics stats =
-              indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
-          OrcProto.BloomFilter bf = null;
-          if (bloomFilterIndices[filterColumns[pred]] != null) {
-            bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
-          }
-          leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Stats = " + stats);
-            LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
-                leafValues[pred]);
-          }
-        } else {
-          // the column is a virtual column
-          leafValues[pred] = TruthValue.YES_NO_NULL;
-        }
-      }
-      result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
-            (rowIndexStride * (rowGroup+1) - 1) + " is " +
-            (result[rowGroup] ? "" : "not ") + "included.");
-      }
-    }
-
-    // if we found something to skip, use the array. otherwise, return null.
-    for(boolean b: result) {
-      if (!b) {
-        return result;
-      }
-    }
-    return null;
+    readRowIndex(currentStripe, included, sargApp.sargColumns);
+    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes);
   }
 
   private void clearStreams() throws IOException {
@@ -2751,13 +2940,14 @@ class RecordReaderImpl implements Record
     for(InStream is: streams.values()) {
       is.close();
     }
-    if(bufferChunks != null) {
-      if(zcr != null) {
-        for (BufferChunk bufChunk : bufferChunks) {
-          zcr.releaseBuffer(bufChunk.chunk);
+    if (bufferChunks != null) {
+      if (zcr != null) {
+        for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
+          if (!(range instanceof BufferChunk)) continue;
+          zcr.releaseBuffer(((BufferChunk)range).chunk);
         }
       }
-      bufferChunks.clear();
+      bufferChunks = null;
     }
     streams.clear();
   }
@@ -2767,20 +2957,7 @@ class RecordReaderImpl implements Record
    * @throws IOException
    */
   private void readStripe() throws IOException {
-    StripeInformation stripe = stripes.get(currentStripe);
-    stripeFooter = readStripeFooter(stripe);
-    clearStreams();
-    // setup the position in the stripe
-    rowCountInStripe = stripe.getNumberOfRows();
-    rowInStripe = 0;
-    rowBaseInStripe = 0;
-    for(int i=0; i < currentStripe; ++i) {
-      rowBaseInStripe += stripes.get(i).getNumberOfRows();
-    }
-    // reset all of the indexes
-    for(int i=0; i < indexes.length; ++i) {
-      indexes[i] = null;
-    }
+    StripeInformation stripe = beginReadStripe();
     includedRowGroups = pickRowGroups();
 
     // move forward to the first unskipped row
@@ -2802,167 +2979,88 @@ class RecordReaderImpl implements Record
       reader.startStripe(streams, stripeFooter.getColumnsList());
       // if we skipped the first row group, move the pointers forward
       if (rowInStripe != 0) {
-        seekToRowEntry((int) (rowInStripe / rowIndexStride));
+        seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
       }
     }
   }
 
-  private void readAllDataStreams(StripeInformation stripe
-                                  ) throws IOException {
+  private StripeInformation beginReadStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    clearStreams();
+    // setup the position in the stripe
+    rowCountInStripe = stripe.getNumberOfRows();
+    rowInStripe = 0;
+    rowBaseInStripe = 0;
+    for(int i=0; i < currentStripe; ++i) {
+      rowBaseInStripe += stripes.get(i).getNumberOfRows();
+    }
+    // reset all of the indexes
+    for(int i=0; i < indexes.length; ++i) {
+      indexes[i] = null;
+    }
+    return stripe;
+  }
+
+  private void readAllDataStreams(StripeInformation stripe) throws IOException {
     long start = stripe.getIndexLength();
     long end = start + stripe.getDataLength();
     // explicitly trigger 1 big read
-    DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
-    bufferChunks = readDiskRanges(file, stripe.getOffset(), Arrays.asList(ranges));
+    DiskRangeList toRead = new DiskRangeList(start, end);
+    bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false);
     List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
-    createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
-  }
-
-  /**
-   * The sections of stripe that we need to read.
-   */
-  static class DiskRange {
-    /** the first address we need to read. */
-    long offset;
-    /** the first address afterwards. */
-    long end;
-
-    DiskRange(long offset, long end) {
-      this.offset = offset;
-      this.end = end;
-      if (end < offset) {
-        throw new IllegalArgumentException("invalid range " + this);
-      }
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (other == null || other.getClass() != getClass()) {
-        return false;
-      }
-      DiskRange otherR = (DiskRange) other;
-      return otherR.offset == offset && otherR.end == end;
-    }
-
-    @Override
-    public String toString() {
-      return "range start: " + offset + " end: " + end;
-    }
+    createStreams(
+        streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
   }
 
   /**
    * The sections of stripe that we have read.
    * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
    */
-  static class BufferChunk {
+  public static class BufferChunk extends DiskRangeList {
     final ByteBuffer chunk;
-    /** the first address we need to read. */
-    final long offset;
-    /** end of the buffer **/
-    final long end;
 
     BufferChunk(ByteBuffer chunk, long offset) {
-      this.offset = offset;
+      super(offset, offset + chunk.remaining());
       this.chunk = chunk;
-      end = offset + chunk.remaining();
     }
 
     @Override
-    public final String toString() {
-      return "range start: " + offset + " size: " + chunk.remaining() + " type: "
-          + (chunk.isDirect() ? "direct" : "array-backed");
+    public boolean hasData() {
+      return chunk != null;
     }
-  }
-
-  private static final int BYTE_STREAM_POSITIONS = 1;
-  private static final int RUN_LENGTH_BYTE_POSITIONS =
-      BYTE_STREAM_POSITIONS + 1;
-  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
-  private static final int RUN_LENGTH_INT_POSITIONS =
-    BYTE_STREAM_POSITIONS + 1;
 
-  /**
-   * Get the offset in the index positions for the column that the given
-   * stream starts.
-   * @param encoding the encoding of the column
-   * @param type the type of the column
-   * @param stream the kind of the stream
-   * @param isCompressed is the file compressed
-   * @param hasNulls does the column have a PRESENT stream?
-   * @return the number of positions that will be used for that stream
-   */
-  static int getIndexPosition(OrcProto.ColumnEncoding.Kind encoding,
-                              OrcProto.Type.Kind type,
-                              OrcProto.Stream.Kind stream,
-                              boolean isCompressed,
-                              boolean hasNulls) {
-    if (stream == OrcProto.Stream.Kind.PRESENT) {
-      return 0;
-    }
-    int compressionValue = isCompressed ? 1 : 0;
-    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
-    switch (type) {
-      case BOOLEAN:
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-      case DATE:
-      case STRUCT:
-      case MAP:
-      case LIST:
-      case UNION:
-        return base;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        if (encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
-            encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
-          return base;
-        } else {
-          if (stream == OrcProto.Stream.Kind.DATA) {
-            return base;
-          } else {
-            return base + BYTE_STREAM_POSITIONS + compressionValue;
-          }
-        }
-      case BINARY:
-        if (stream == OrcProto.Stream.Kind.DATA) {
-          return base;
-        }
-        return base + BYTE_STREAM_POSITIONS + compressionValue;
-      case DECIMAL:
-        if (stream == OrcProto.Stream.Kind.DATA) {
-          return base;
-        }
-        return base + BYTE_STREAM_POSITIONS + compressionValue;
-      case TIMESTAMP:
-        if (stream == OrcProto.Stream.Kind.DATA) {
-          return base;
-        }
-        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
-      default:
-        throw new IllegalArgumentException("Unknown type " + type);
+    @Override
+    public final String toString() {
+      boolean makesSense = chunk.remaining() == (end - offset);
+      return "data range [" + offset + ", " + end + "), size: " + chunk.remaining()
+          + (makesSense ? "" : "(!)") + " type: " + (chunk.isDirect() ? "direct" : "array-backed");
     }
-  }
 
-  // for uncompressed streams, what is the most overlap with the following set
-  // of rows (long vint literal group).
-  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+    @Override
+    public DiskRange sliceAndShift(long offset, long end, long shiftBy) {
+      assert offset <= end && offset >= this.offset && end <= this.end;
+      assert offset + shiftBy >= 0;
+      ByteBuffer sliceBuf = chunk.slice();
+      int newPos = (int)(offset - this.offset);
+      int newLimit = newPos + (int)(end - offset);
+      try {
+        sliceBuf.position(newPos);
+        sliceBuf.limit(newLimit);
+      } catch (Throwable t) {
+        LOG.error("Failed to slice buffer chunk with range" + " [" + this.offset + ", " + this.end
+            + "), position: " + chunk.position() + " limit: " + chunk.limit() + ", "
+            + (chunk.isDirect() ? "direct" : "array") + "; to [" + offset + ", " + end + ") "
+            + t.getClass());
+        throw new RuntimeException(t);
+      }
+      return new BufferChunk(sliceBuf, offset + shiftBy);
+    }
 
-  /**
-   * Is this stream part of a dictionary?
-   * @return is this part of a dictionary?
-   */
-  static boolean isDictionary(OrcProto.Stream.Kind kind,
-                              OrcProto.ColumnEncoding encoding) {
-    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
-    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
-      (kind == OrcProto.Stream.Kind.LENGTH &&
-       (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
-        encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+    @Override
+    public ByteBuffer getData() {
+      return chunk;
+    }
   }
 
   /**
@@ -2978,7 +3076,7 @@ class RecordReaderImpl implements Record
    * @param compressionSize the compression block size
    * @return the list of disk ranges that will be loaded
    */
-  static List<DiskRange> planReadPartialDataStreams
+  static DiskRangeList planReadPartialDataStreams
       (List<OrcProto.Stream> streamList,
        OrcProto.RowIndex[] indexes,
        boolean[] includedColumns,
@@ -2986,17 +3084,13 @@ class RecordReaderImpl implements Record
        boolean isCompressed,
        List<OrcProto.ColumnEncoding> encodings,
        List<OrcProto.Type> types,
-       int compressionSize) {
-    List<DiskRange> result = new ArrayList<DiskRange>();
+       int compressionSize,
+       boolean doMergeBuffers) {
     long offset = 0;
     // figure out which columns have a present stream
-    boolean[] hasNull = new boolean[types.size()];
-    for(OrcProto.Stream stream: streamList) {
-      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
-        hasNull[stream.getColumn()] = true;
-      }
-    }
-    for(OrcProto.Stream stream: streamList) {
+    boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
+    DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+    for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
       int column = stream.getColumn();
       OrcProto.Stream.Kind streamKind = stream.getKind();
@@ -3005,203 +3099,58 @@ class RecordReaderImpl implements Record
           (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
           includedColumns[column]) {
         // if we aren't filtering or it is a dictionary, load it.
-        if (includedRowGroups == null ||
-            isDictionary(streamKind, encodings.get(column))) {
-          result.add(new DiskRange(offset, offset + length));
-        } else {
-          for(int group=0; group < includedRowGroups.length; ++group) {
-            if (includedRowGroups[group]) {
-              int posn = getIndexPosition(encodings.get(column).getKind(),
-                  types.get(column).getKind(), stream.getKind(), isCompressed,
-                  hasNull[column]);
-              long start = indexes[column].getEntry(group).getPositions(posn);
-              final long nextGroupOffset;
-              if (group < includedRowGroups.length - 1) {
-                nextGroupOffset = indexes[column].getEntry(group + 1).getPositions(posn);
-              } else {
-                nextGroupOffset = length;
-              }
-
-              // figure out the worst case last location
-
-              // if adjacent groups have the same compressed block offset then stretch the slop
-              // by factor of 2 to safely accommodate the next compression block.
-              // One for the current compression block and another for the next compression block.
-              final long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + compressionSize)
-                  : WORST_UNCOMPRESSED_SLOP;
-              long end = (group == includedRowGroups.length - 1) ? length : Math.min(length,
-                  nextGroupOffset + slop);
-              result.add(new DiskRange(offset + start, offset + end));
-            }
-          }
+        if (includedRowGroups == null
+            || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
+          RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
+        } else {
+          RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
+              isCompressed, indexes[column], encodings.get(column), types.get(column),
+              compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
         }
       }
       offset += length;
     }
-    return result;
-  }
-
-  /**
-   * Update the disk ranges to collapse adjacent or overlapping ranges. It
-   * assumes that the ranges are sorted.
-   * @param ranges the list of disk ranges to merge
-   */
-  static void mergeDiskRanges(List<DiskRange> ranges) {
-    DiskRange prev = null;
-    for(int i=0; i < ranges.size(); ++i) {
-      DiskRange current = ranges.get(i);
-      if (prev != null && overlap(prev.offset, prev.end,
-          current.offset, current.end)) {
-        prev.offset = Math.min(prev.offset, current.offset);
-        prev.end = Math.max(prev.end, current.end);
-        ranges.remove(i);
-        i -= 1;
-      } else {
-        prev = current;
-      }
-    }
-  }
-
-  /**
-   * Read the list of ranges from the file.
-   * @param file the file to read
-   * @param base the base of the stripe
-   * @param ranges the disk ranges within the stripe to read
-   * @return the bytes read for each disk range, which is the same length as
-   *    ranges
-   * @throws IOException
-   */
-  List<BufferChunk> readDiskRanges(FSDataInputStream file,
-                                 long base,
-                                 List<DiskRange> ranges) throws IOException {
-    ArrayList<BufferChunk> result = new ArrayList<RecordReaderImpl.BufferChunk>(ranges.size());
-    for(DiskRange range: ranges) {
-      int len = (int) (range.end - range.offset);
-      long off = range.offset;
-      file.seek(base + off);
-      if(zcr != null) {
-        while(len > 0) {
-          ByteBuffer partial = zcr.readBuffer(len, false);
-          result.add(new BufferChunk(partial, off));
-          int read = partial.remaining();
-          len -= read;
-          off += read;
-        }
-      } else {
-        byte[] buffer = new byte[len];
-        file.readFully(buffer, 0, buffer.length);
-        result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Does region A overlap region B? The end points are inclusive on both sides.
-   * @param leftA A's left point
-   * @param rightA A's right point
-   * @param leftB B's left point
-   * @param rightB B's right point
-   * @return Does region A overlap region B?
-   */
-  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
-    if (leftA <= leftB) {
-      return rightA >= leftB;
-    }
-    return rightB >= leftA;
-  }
-
-  /**
-   * Build a string representation of a list of disk ranges.
-   * @param ranges ranges to stringify
-   * @return the resulting string
-   */
-  static String stringifyDiskRanges(List<DiskRange> ranges) {
-    StringBuilder buffer = new StringBuilder();
-    buffer.append("[");
-    for(int i=0; i < ranges.size(); ++i) {
-      if (i != 0) {
-        buffer.append(", ");
-      }
-      buffer.append(ranges.get(i).toString());
-    }
-    buffer.append("]");
-    return buffer.toString();
+    return list.extract();
   }
 
-  static void createStreams(List<OrcProto.Stream> streamDescriptions,
-                            List<BufferChunk> ranges,
+  void createStreams(List<OrcProto.Stream> streamDescriptions,
+                            DiskRangeList ranges,
                             boolean[] includeColumn,
                             CompressionCodec codec,
                             int bufferSize,
-                            Map<StreamName, InStream> streams
-                           ) throws IOException {
-    long offset = 0;
-    for(OrcProto.Stream streamDesc: streamDescriptions) {
+                            Map<StreamName, InStream> streams) throws IOException {
+    long streamOffset = 0;
+    for (OrcProto.Stream streamDesc: streamDescriptions) {
       int column = streamDesc.getColumn();
-      // do not create stream if stream kind does not exist
-      if ((includeColumn == null || includeColumn[column]) &&
+      if ((includeColumn != null && !includeColumn[column]) ||
           streamDesc.hasKind() &&
-          (StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA)) {
-        long length = streamDesc.getLength();
-        int first = -1;
-        int last = -2;
-        for(int i=0; i < ranges.size(); ++i) {
-          BufferChunk range = ranges.get(i);
-          if (overlap(offset, offset+length, range.offset, range.end)) {
-            if (first == -1) {
-              first = i;
-            }
-            last = i;
-          }
-        }
-        ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
-        long[] offsets = new long[last - first + 1];
-        for(int i=0; i < buffers.length; ++i) {
-          BufferChunk range = ranges.get(i + first);
-          long start = Math.max(range.offset, offset);
-          long end = Math.min(range.end, offset+length);
-          buffers[i] = range.chunk.slice();
-          assert range.chunk.position() == 0; // otherwise we'll mix up positions
-          /*
-           * buffers are positioned in-wards if the offset > range.offset
-           * offsets[i] == range.offset - offset, except if offset > range.offset
-           */
-          if(offset > range.offset) {
-            buffers[i].position((int)(offset - range.offset));
-            buffers[i].limit((int)(end - range.offset));
-            offsets[i] = 0;
-          } else {
-            buffers[i].position(0);
-            buffers[i].limit((int)(end - range.offset));

[... 332 lines stripped ...]


Mime
View raw message