hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject svn commit: r1659112 - in /hive/branches/llap: llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/ ql/src/java/...
Date Thu, 12 Feb 2015 00:25:39 GMT
Author: prasanthj
Date: Thu Feb 12 00:25:38 2015
New Revision: 1659112

URL: http://svn.apache.org/r1659112
Log:
HIVE-9419p2: LLAP: ORC decoding of row-groups (Partial patch2)

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/LongStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ShortStreamReader.java
Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1659112&r1=1659111&r2=1659112&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java Thu Feb 12 00:25:38 2015
@@ -23,15 +23,16 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.ColumnStream;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.DoubleColumnStream;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.FloatColumnStream;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.IntegerColumnStream;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.StringColumnStream;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.DoubleStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.FloatStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.IntStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.LongStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.ShortStreamReader;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
 
 import com.google.common.collect.Lists;
 
@@ -48,6 +50,7 @@ public class OrcColumnVectorProducer ext
   private final OrcEncodedDataProducer edp;
   private final OrcMetadataCache metadataCache;
   private ColumnVectorBatch cvb;
+  private boolean skipCorrupt;
 
   public OrcColumnVectorProducer(
       ExecutorService executor, OrcEncodedDataProducer edp, Configuration conf) {
@@ -55,6 +58,7 @@ public class OrcColumnVectorProducer ext
     this.edp = edp;
     this.metadataCache = OrcMetadataCache.getInstance();
     this.cvb = null;
+    this.skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
   }
 
   @Override
@@ -87,7 +91,7 @@ public class OrcColumnVectorProducer ext
       int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
       int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
       int numCols = batch.columnIxs.length;
-      ColumnStream[] columnStreams = createColumnStreamReaders(numCols, batch, fileMetadata,
+      RecordReaderImpl.TreeReader[] columnStreams = createTreeReaders(numCols, batch, fileMetadata,
           stripeMetadata);
       for (int i = 0; i < maxBatchesRG; i++) {
         if (i == maxBatchesRG - 1) {
@@ -95,7 +99,7 @@ public class OrcColumnVectorProducer ext
         }
 
         for (int idx = 0; idx < batch.columnIxs.length; idx++) {
-          cvb.cols[idx] = columnStreams[idx].nextVector(null, batchSize);
+          cvb.cols[idx] = (ColumnVector) columnStreams[idx].nextVector(null, batchSize);
         }
 
         // we are done reading a batch, send it to consumer for processing
@@ -108,12 +112,12 @@ public class OrcColumnVectorProducer ext
     }
   }
 
-  private ColumnStream[] createColumnStreamReaders(int numCols,
+  private RecordReaderImpl.TreeReader[] createTreeReaders(int numCols,
       EncodedColumnBatch<OrcBatchKey> batch,
       OrcFileMetadata fileMetadata,
       OrcStripeMetadata stripeMetadata) throws IOException {
     String file = batch.batchKey.file;
-    ColumnStream[] columnStreams = new ColumnStream[numCols];
+    RecordReaderImpl.TreeReader[] treeReaders = new RecordReaderImpl.TreeReader[numCols];
 
     for (int i = 0; i < numCols; i++) {
       int colIx = batch.columnIxs[i];
@@ -131,69 +135,109 @@ public class OrcColumnVectorProducer ext
       OrcProto.ColumnEncoding columnEncoding = stripeMetadata.getEncodings().get(colIx);
       ColumnVector cv = null;
 
-      // FIXME: See if the stream buffers are in this same order. What will happen if some stream
-      // does not exist? Will stream buffer be null?
       EncodedColumnBatch.StreamBuffer present = null;
       EncodedColumnBatch.StreamBuffer data = null;
       EncodedColumnBatch.StreamBuffer dictionary = null;
       EncodedColumnBatch.StreamBuffer lengths = null;
       EncodedColumnBatch.StreamBuffer secondary = null;
+      for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
+        switch(streamBuffer.streamKind) {
+          case 0:
+            // PRESENT stream
+            present = streamBuffer;
+            break;
+          case 1:
+            // DATA stream
+            data = streamBuffer;
+            break;
+          case 2:
+            // LENGTH stream
+            lengths = streamBuffer;
+            break;
+          case 3:
+            // DICTIONARY_DATA stream
+            dictionary = streamBuffer;
+            break;
+          case 5:
+            // SECONDARY stream
+            secondary = streamBuffer;
+            break;
+          default:
+            throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
+        }
+      }
+
       switch (colType.getKind()) {
         case SHORT:
+          treeReaders[i] = ShortStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncodingKind(columnEncoding.getKind())
+              .build();
+          break;
         case INT:
+          treeReaders[i] = IntStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncodingKind(columnEncoding.getKind())
+              .build();
+          break;
         case LONG:
-          // TODO: EncodedDataProducer should produce stream buffers in enum order of stream kind.
-          // So if a stream does not exist, it should have null instead.
-          if (streamBuffers.length != 2) {
-            present = null;
-            data = streamBuffers[0];
-          } else {
-            present = streamBuffers[0];
-            data = streamBuffers[1];
-          }
-          // FIXME: Creating column stream readers for every row group will be expensive.
-          columnStreams[i] = new IntegerColumnStream(file, colIx, present, data, columnEncoding, codec,
-              bufferSize, rowIndex);
+          treeReaders[i] = LongStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncodingKind(columnEncoding.getKind())
+              .skipCorrupt(skipCorrupt)
+              .build();
           break;
         case FLOAT:
-          if (streamBuffers.length != 2) {
-            present = null;
-            data = streamBuffers[0];
-          } else {
-            present = streamBuffers[0];
-            data = streamBuffers[1];
-          }
-          columnStreams[i] = new FloatColumnStream(file, colIx, present, data, codec, bufferSize,
-              rowIndex);
+          treeReaders[i] = FloatStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .build();
           break;
         case DOUBLE:
-          if (streamBuffers.length != 2) {
-            present = null;
-            data = streamBuffers[0];
-          } else {
-            present = streamBuffers[0];
-            data = streamBuffers[1];
-          }
-          columnStreams[i] = new DoubleColumnStream(file, colIx, present, data, codec, bufferSize,
-              rowIndex);
+          treeReaders[i] = DoubleStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .build();
           break;
         case CHAR:
         case VARCHAR:
         case STRING:
-          // FIXME: This is hacky! Will never work. Fix it cleanly everywhere. Hopefully encoded
-          // data producer will provide streams in enum order of stream kind
-          present = streamBuffers[0];
-          data = streamBuffers[1];
-          dictionary = streamBuffers[2];
-          lengths = streamBuffers[3];
-          columnStreams[i] = new StringColumnStream(file, colIx, present, data, dictionary, lengths,
-              columnEncoding, codec, bufferSize, rowIndex);
+//          columnStreams[i] = new StringColumnStream(file, colIx, present, data, dictionary, lengths,
+//              columnEncoding, codec, bufferSize, rowIndex);
           break;
         default:
           throw new UnsupportedOperationException("Data type not supported yet! " + colType);
       }
     }
-    return columnStreams;
+    return treeReaders;
   }
 
   private List<OrcProto.Stream> getDataStreams(int colIx, List<OrcProto.Stream> streams) {

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleStreamReader.java?rev=1659112&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleStreamReader.java Thu Feb 12 00:25:38 2015
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class DoubleStreamReader extends RecordReaderImpl.DoubleTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private DoubleStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data);
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public DoubleStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new DoubleStreamReader(columnIndex, present, data, compressionCodec != null, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatStreamReader.java?rev=1659112&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatStreamReader.java Thu Feb 12 00:25:38 2015
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class FloatStreamReader extends RecordReaderImpl.FloatTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private FloatStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public FloatStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new FloatStreamReader(columnIndex, present, data, compressionCodec != null, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntStreamReader.java?rev=1659112&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntStreamReader.java Thu Feb 12 00:25:38 2015
@@ -0,0 +1,110 @@
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ * Created by pjayachandran on 2/10/15.
+ */
+public class IntStreamReader extends RecordReaderImpl.IntTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private IntStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.ColumnEncoding.Kind kind,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, kind);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
+      this.columnEncodingKind = kind;
+      return this;
+    }
+
+    public IntStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new IntStreamReader(columnIndex, present, data,
+          compressionCodec != null, columnEncodingKind, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/LongStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/LongStreamReader.java?rev=1659112&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/LongStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/LongStreamReader.java Thu Feb 12 00:25:38 2015
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class LongStreamReader extends RecordReaderImpl.LongTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private LongStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.ColumnEncoding.Kind kind, boolean skipCorrupt,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, kind, skipCorrupt);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+    private boolean skipCorrupt;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
+      this.columnEncodingKind = kind;
+      return this;
+    }
+
+    public StreamReaderBuilder skipCorrupt(boolean skipCorrupt) {
+      this.skipCorrupt = skipCorrupt;
+      return this;
+    }
+
+    public LongStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new LongStreamReader(columnIndex, present, data,
+          compressionCodec != null, columnEncodingKind, skipCorrupt, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ShortStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ShortStreamReader.java?rev=1659112&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ShortStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ShortStreamReader.java Thu Feb 12 00:25:38 2015
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class ShortStreamReader extends RecordReaderImpl.ShortTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private ShortStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.ColumnEncoding.Kind kind,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, kind);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
+      this.columnEncodingKind = kind;
+      return this;
+    }
+
+    public ShortStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new ShortStreamReader(columnIndex, present, data,
+          compressionCodec != null, columnEncodingKind, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1659112&r1=1659111&r2=1659112&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Thu Feb 12 00:25:38 2015
@@ -184,7 +184,7 @@ public class OrcEncodedDataProducer impl
       Consumer<EncodedColumnBatch<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this;
       EncodedReader stripeReader = null;
       try {
-        orcReader.encodedReader(lowLevelCache, consumer);
+        stripeReader = orcReader.encodedReader(lowLevelCache, consumer);
       } catch (Throwable t) {
         // produceDataFromCache handles its own cleanup.
         consumer.setError(t);
@@ -347,7 +347,6 @@ public class OrcEncodedDataProducer impl
 
     /**
      * Reads the metadata for all stripes in the file.
-     * @param stripeReaders Array to preserve the readers used.
      */
     private ArrayList<OrcStripeMetadata> readStripesMetadata(
         OrcFileMetadata metadata, boolean[] globalInc) throws IOException {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java?rev=1659112&r1=1659111&r2=1659112&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetadataReader.java Thu Feb 12 00:25:38 2015
@@ -59,13 +59,14 @@ public class MetadataReader {
     for (OrcProto.Stream stream : footer.getStreamsList()) {
       if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) {
         int col = stream.getColumn();
-        if ((included != null && !included[col]) || indexes[col] == null) continue;
-        byte[] buffer = new byte[(int) stream.getLength()];
-        file.seek(offset);
-        file.readFully(buffer);
-        indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index",
-            Lists.<DiskRange>newArrayList(new BufferChunk(ByteBuffer.wrap(buffer), 0)),
-            stream.getLength(), codec, bufferSize, null));
+        if ((included == null || included[col]) && indexes[col] == null) {
+          byte[] buffer = new byte[(int) stream.getLength()];
+          file.seek(offset);
+          file.readFully(buffer);
+          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index",
+              Lists.<DiskRange>newArrayList(new BufferChunk(ByteBuffer.wrap(buffer), 0)),
+              stream.getLength(), codec, bufferSize, null));
+        }
       }
       offset += stream.getLength();
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1659112&r1=1659111&r2=1659112&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Thu Feb 12 00:25:38 2015
@@ -196,7 +196,8 @@ public class RecordReaderImpl implements
 
     firstRow = skippedRows;
     totalRowCount = rows;
-    reader = createTreeReader(path, 0, types, included, conf);
+    boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
+    reader = createTreeReader(0, types, included, skipCorrupt);
     indexes = new OrcProto.RowIndex[types.size()];
     advanceToNextRow(reader, 0L, true);
   }
@@ -220,39 +221,45 @@ public class RecordReaderImpl implements
     }
   }
 
-  private abstract static class TreeReader {
-    protected final Path path;
+  public abstract static class TreeReader {
     protected final int columnId;
     private BitFieldReader present = null;
     protected boolean valuePresent = false;
-    protected final Configuration conf;
 
-    TreeReader(Path path, int columnId, Configuration conf) {
-      this.path = path;
+    public TreeReader(int columnId) throws IOException {
+      this(columnId, null);
+    }
+
+    public TreeReader(int columnId, InStream in) throws IOException {
       this.columnId = columnId;
-      this.conf = conf;
+      if (in == null) {
+        present = null;
+        valuePresent = true;
+      } else {
+        present = new BitFieldReader(in, 1);
+      }
     }
 
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
       if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
     IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
         InStream in,
-        boolean signed) throws IOException {
+        boolean signed,
+        boolean skipCorrupt) throws IOException {
       switch (kind) {
-      case DIRECT_V2:
-      case DICTIONARY_V2:
-        boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
-        return new RunLengthIntegerReaderV2(in, signed, skipCorrupt);
-      case DIRECT:
-      case DICTIONARY:
-        return new RunLengthIntegerReader(in, signed);
-      default:
-        throw new IllegalArgumentException("Unknown encoding " + kind);
+        case DIRECT_V2:
+        case DICTIONARY_V2:
+          return new RunLengthIntegerReaderV2(in, signed, skipCorrupt);
+        case DIRECT:
+        case DICTIONARY:
+          return new RunLengthIntegerReader(in, signed);
+        default:
+          throw new IllegalArgumentException("Unknown encoding " + kind);
       }
     }
 
@@ -276,8 +283,12 @@ public class RecordReaderImpl implements
      * @throws IOException
      */
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    void seek(PositionProvider index) throws IOException {
       if (present != null) {
-        present.seek(index[columnId]);
+        present.seek(index);
       }
     }
 
@@ -312,7 +323,7 @@ public class RecordReaderImpl implements
      * @return
      * @throws IOException
      */
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       ColumnVector result = (ColumnVector) previousVector;
       if (present != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
@@ -336,11 +347,11 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class BooleanTreeReader extends TreeReader {
+  public static class BooleanTreeReader extends TreeReader {
     private BitFieldReader reader = null;
 
-    BooleanTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public BooleanTreeReader(int columnId) throws IOException {
+      super(columnId);
     }
 
     @Override
@@ -354,8 +365,13 @@ public class RecordReaderImpl implements
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -379,7 +395,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -399,8 +415,8 @@ public class RecordReaderImpl implements
   private static class ByteTreeReader extends TreeReader{
     private RunLengthByteReader reader = null;
 
-    ByteTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    ByteTreeReader(int columnId) throws IOException {
+      super(columnId);
     }
 
     @Override
@@ -414,8 +430,13 @@ public class RecordReaderImpl implements
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -434,7 +455,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -456,11 +477,20 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class ShortTreeReader extends TreeReader{
+  public static class ShortTreeReader extends TreeReader{
     private IntegerReader reader = null;
 
-    ShortTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public ShortTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
+
+    public ShortTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding.Kind encoding)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        this.reader = createIntegerReader(encoding, data, true, false);
+      }
     }
 
     @Override
@@ -468,7 +498,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -479,13 +509,19 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true,
+          false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -504,7 +540,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -526,11 +562,20 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class IntTreeReader extends TreeReader{
+  public static class IntTreeReader extends TreeReader{
     private IntegerReader reader = null;
 
-    IntTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public IntTreeReader(int columnId) throws IOException {
+      this(columnId, null, null, null);
+    }
+
+    public IntTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding.Kind encoding)
+        throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        this.reader = createIntegerReader(encoding, data, true, false);
+      }
     }
 
     @Override
@@ -538,7 +583,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -549,13 +594,19 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true,
+          false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -574,7 +625,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -596,11 +647,23 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class LongTreeReader extends TreeReader{
+  public static class LongTreeReader extends TreeReader{
     private IntegerReader reader = null;
+    private final boolean skipCorrupt;
 
-    LongTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    LongTreeReader(int columnId, boolean skipCorrupt) throws IOException {
+      this(columnId, null, null, null, skipCorrupt);
+    }
+
+    public LongTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding.Kind encoding,
+        boolean skipCorrupt)
+        throws IOException {
+      super(columnId, present);
+      this.skipCorrupt = skipCorrupt;
+      if (data != null && encoding != null) {
+        this.reader = createIntegerReader(encoding, data, true, skipCorrupt);
+      }
     }
 
     @Override
@@ -608,7 +671,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -619,13 +682,19 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true,
+          false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -644,7 +713,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -666,13 +735,18 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class FloatTreeReader extends TreeReader{
+  public static class FloatTreeReader extends TreeReader{
     private InStream stream;
     private final SerializationUtils utils;
 
-    FloatTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public FloatTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    public FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
       this.utils = new SerializationUtils();
+      this.stream = data;
     }
 
     @Override
@@ -687,8 +761,13 @@ public class RecordReaderImpl implements
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
+      stream.seek(index);
     }
 
     @Override
@@ -707,7 +786,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       DoubleColumnVector result = null;
       if (previousVector == null) {
         result = new DoubleColumnVector();
@@ -740,7 +819,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    void skipRows(long items) throws IOException {
+    protected void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       for(int i=0; i < items; ++i) {
         utils.readFloat(stream);
@@ -748,13 +827,18 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class DoubleTreeReader extends TreeReader{
+  public static class DoubleTreeReader extends TreeReader{
     private InStream stream;
     private final SerializationUtils utils;
 
-    DoubleTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    public DoubleTreeReader(int columnId) throws IOException {
+      this(columnId, null, null);
+    }
+
+    public DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
       this.utils = new SerializationUtils();
+      this.stream = data;
     }
 
     @Override
@@ -770,8 +854,13 @@ public class RecordReaderImpl implements
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
+      stream.seek(index);
     }
 
     @Override
@@ -790,7 +879,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       DoubleColumnVector result = null;
       if (previousVector == null) {
         result = new DoubleColumnVector();
@@ -834,8 +923,8 @@ public class RecordReaderImpl implements
 
     protected final LongColumnVector scratchlcv;
 
-    BinaryTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    BinaryTreeReader(int columnId) throws IOException {
+      super(columnId);
       scratchlcv = new LongColumnVector();
     }
 
@@ -844,7 +933,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -857,14 +946,19 @@ public class RecordReaderImpl implements
           OrcProto.Stream.Kind.DATA);
       stream = streams.get(name);
       lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new
-          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false);
+          StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
-      lengths.seek(index[columnId]);
+      stream.seek(index);
+      lengths.seek(index);
     }
 
     @Override
@@ -893,7 +987,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       BytesColumnVector result = null;
       if (previousVector == null) {
         result = new BytesColumnVector();
@@ -923,9 +1017,11 @@ public class RecordReaderImpl implements
     private IntegerReader data = null;
     private IntegerReader nanos = null;
     private final LongColumnVector nanoVector = new LongColumnVector();
+    private final boolean skipCorrupt;
 
-    TimestampTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
+      super(columnId);
+      this.skipCorrupt = skipCorrupt;
     }
 
     @Override
@@ -933,7 +1029,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -944,17 +1040,22 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       data = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.DATA)), true);
+              OrcProto.Stream.Kind.DATA)), true, skipCorrupt);
       nanos = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.SECONDARY)), false);
+              OrcProto.Stream.Kind.SECONDARY)), false, skipCorrupt);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      data.seek(index[columnId]);
-      nanos.seek(index[columnId]);
+      data.seek(index);
+      nanos.seek(index);
     }
 
     @Override
@@ -985,7 +1086,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -1032,8 +1133,8 @@ public class RecordReaderImpl implements
   private static class DateTreeReader extends TreeReader{
     private IntegerReader reader = null;
 
-    DateTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    DateTreeReader(int columnId) throws IOException {
+      super(columnId);
     }
 
     @Override
@@ -1041,7 +1142,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1052,13 +1153,18 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
+      reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -1077,7 +1183,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       LongColumnVector result = null;
       if (previousVector == null) {
         result = new LongColumnVector();
@@ -1107,8 +1213,8 @@ public class RecordReaderImpl implements
     private final int precision;
     private final int scale;
 
-    DecimalTreeReader(Path path, int columnId, int precision, int scale, Configuration conf) {
-      super(path, columnId, conf);
+    DecimalTreeReader(int columnId, int precision, int scale) throws IOException {
+      super(columnId);
       this.precision = precision;
       this.scale = scale;
     }
@@ -1118,7 +1224,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1130,14 +1236,19 @@ public class RecordReaderImpl implements
       valueStream = streams.get(new StreamName(columnId,
           OrcProto.Stream.Kind.DATA));
       scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
-          new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
+          new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      valueStream.seek(index[columnId]);
-      scaleStream.seek(index[columnId]);
+      valueStream.seek(index);
+      scaleStream.seek(index);
     }
 
     @Override
@@ -1158,7 +1269,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       DecimalColumnVector result = null;
       if (previousVector == null) {
         result = new DecimalColumnVector(precision, scale);
@@ -1218,8 +1329,8 @@ public class RecordReaderImpl implements
   private static class StringTreeReader extends TreeReader {
     private TreeReader reader;
 
-    StringTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    StringTreeReader(int columnId) throws IOException {
+      super(columnId);
     }
 
     @Override
@@ -1236,11 +1347,11 @@ public class RecordReaderImpl implements
       switch (encodings.get(columnId).getKind()) {
         case DIRECT:
         case DIRECT_V2:
-          reader = new StringDirectTreeReader(path, columnId, conf);
+          reader = new StringDirectTreeReader(columnId);
           break;
         case DICTIONARY:
         case DICTIONARY_V2:
-          reader = new StringDictionaryTreeReader(path, columnId, conf);
+          reader = new StringDictionaryTreeReader(columnId);
           break;
         default:
           throw new IllegalArgumentException("Unsupported encoding " +
@@ -1255,12 +1366,17 @@ public class RecordReaderImpl implements
     }
 
     @Override
+    public void seek(PositionProvider index) throws IOException {
+      reader.seek(index);
+    }
+
+    @Override
     Object next(Object previous) throws IOException {
       return reader.next(previous);
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       return reader.nextVector(previousVector, batchSize);
     }
 
@@ -1350,8 +1466,8 @@ public class RecordReaderImpl implements
 
     private final LongColumnVector scratchlcv;
 
-    StringDirectTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    StringDirectTreeReader(int columnId) throws IOException {
+      super(columnId);
       scratchlcv = new LongColumnVector();
     }
 
@@ -1360,7 +1476,7 @@ public class RecordReaderImpl implements
       if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
           encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1374,14 +1490,19 @@ public class RecordReaderImpl implements
       stream = streams.get(name);
       lengths = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
-          false);
+          false, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      stream.seek(index[columnId]);
-      lengths.seek(index[columnId]);
+      stream.seek(index);
+      lengths.seek(index);
     }
 
     @Override
@@ -1411,7 +1532,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       BytesColumnVector result = null;
       if (previousVector == null) {
         result = new BytesColumnVector();
@@ -1449,8 +1570,8 @@ public class RecordReaderImpl implements
     private byte[] dictionaryBufferInBytesCache = null;
     private final LongColumnVector scratchlcv;
 
-    StringDictionaryTreeReader(Path path, int columnId, Configuration conf) {
-      super(path, columnId, conf);
+    StringDictionaryTreeReader(int columnId) throws IOException {
+      super(columnId);
       scratchlcv = new LongColumnVector();
     }
 
@@ -1459,7 +1580,7 @@ public class RecordReaderImpl implements
       if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
           encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -1491,7 +1612,7 @@ public class RecordReaderImpl implements
       in = streams.get(name);
       if (in != null) { // Guard against empty LENGTH stream.
         IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
-            .getKind(), in, false);
+            .getKind(), in, false, false);
         int offset = 0;
         if (dictionaryOffsets == null ||
             dictionaryOffsets.length < dictionarySize + 1) {
@@ -1508,13 +1629,18 @@ public class RecordReaderImpl implements
       // set up the row reader
       name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
       reader = createIntegerReader(encodings.get(columnId).getKind(),
-          streams.get(name), false);
+          streams.get(name), false, false);
     }
 
     @Override
     void seek(PositionProvider[] index) throws IOException {
+      seek(index[columnId]);
+    }
+
+    @Override
+    public void seek(PositionProvider index) throws IOException {
       super.seek(index);
-      reader.seek(index[columnId]);
+      reader.seek(index);
     }
 
     @Override
@@ -1543,7 +1669,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       BytesColumnVector result = null;
       int offset = 0, length = 0;
       if (previousVector == null) {
@@ -1619,8 +1745,8 @@ public class RecordReaderImpl implements
   private static class CharTreeReader extends StringTreeReader {
     int maxLength;
 
-    CharTreeReader(Path path, int columnId, int maxLength, Configuration conf) {
-      super(path, columnId, conf);
+    CharTreeReader(int columnId, int maxLength) throws IOException {
+      super(columnId);
       this.maxLength = maxLength;
     }
 
@@ -1644,7 +1770,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (right trim and truncate) if necessary.
       BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
@@ -1683,8 +1809,8 @@ public class RecordReaderImpl implements
   private static class VarcharTreeReader extends StringTreeReader {
     int maxLength;
 
-    VarcharTreeReader(Path path, int columnId, int maxLength, Configuration conf) {
-      super(path, columnId, conf);
+    VarcharTreeReader(int columnId, int maxLength) throws IOException {
+      super(columnId);
       this.maxLength = maxLength;
     }
 
@@ -1708,7 +1834,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       // Get the vector of strings from StringTreeReader, then make a 2nd pass to
       // adjust down the length (truncate) if necessary.
       BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize);
@@ -1749,10 +1875,11 @@ public class RecordReaderImpl implements
     private final String[] fieldNames;
     private final List<TreeReader> readers;
 
-    StructTreeReader(Path path, int columnId,
+    StructTreeReader(int columnId,
                      List<OrcProto.Type> types,
-                     boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                     boolean[] included,
+                     boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
       int fieldCount = type.getFieldNamesCount();
       this.fields = new TreeReader[fieldCount];
@@ -1761,7 +1888,7 @@ public class RecordReaderImpl implements
       for(int i=0; i < fieldCount; ++i) {
         int subtype = type.getSubtypes(i);
         if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(path, subtype, types, included, conf);
+          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
           readers.add(this.fields[i]);
         }
         this.fieldNames[i] = type.getFieldNames(i);
@@ -1820,7 +1947,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       ColumnVector[] result = null;
       if (previousVector == null) {
         result = new ColumnVector[fields.length];
@@ -1868,17 +1995,18 @@ public class RecordReaderImpl implements
     private final TreeReader[] fields;
     private RunLengthByteReader tags;
 
-    UnionTreeReader(Path path, int columnId,
+    UnionTreeReader(int columnId,
                     List<OrcProto.Type> types,
-                    boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                    boolean[] included,
+                    boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
       int fieldCount = type.getSubtypesCount();
       this.fields = new TreeReader[fieldCount];
       for(int i=0; i < fieldCount; ++i) {
         int subtype = type.getSubtypes(i);
         if (included == null || included[subtype]) {
-          this.fields[i] = createTreeReader(path, subtype, types, included, conf);
+          this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
         }
       }
     }
@@ -1911,7 +2039,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previousVector, long batchSize) throws IOException {
+    public Object nextVector(Object previousVector, long batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for Union type");
     }
@@ -1947,13 +2075,13 @@ public class RecordReaderImpl implements
     private final TreeReader elementReader;
     private IntegerReader lengths = null;
 
-    ListTreeReader(Path path, int columnId,
+    ListTreeReader(int columnId,
                    List<OrcProto.Type> types,
-                   boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                   boolean[] included,
+                   boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
-      elementReader = createTreeReader(path, type.getSubtypes(0), types,
-          included, conf);
+      elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt);
     }
 
     @Override
@@ -1994,7 +2122,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previous, long batchSize) throws IOException {
+    public Object nextVector(Object previous, long batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for List type");
     }
@@ -2004,7 +2132,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -2015,7 +2143,7 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       lengths = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.LENGTH)), false);
+              OrcProto.Stream.Kind.LENGTH)), false, false);
       if (elementReader != null) {
         elementReader.startStripe(streams, encodings);
       }
@@ -2037,21 +2165,21 @@ public class RecordReaderImpl implements
     private final TreeReader valueReader;
     private IntegerReader lengths = null;
 
-    MapTreeReader(Path path,
-                  int columnId,
+    MapTreeReader(int columnId,
                   List<OrcProto.Type> types,
-                  boolean[] included, Configuration conf) throws IOException {
-      super(path, columnId, conf);
+                  boolean[] included,
+                  boolean skipCorrupt) throws IOException {
+      super(columnId);
       OrcProto.Type type = types.get(columnId);
       int keyColumn = type.getSubtypes(0);
       int valueColumn = type.getSubtypes(1);
       if (included == null || included[keyColumn]) {
-        keyReader = createTreeReader(path, keyColumn, types, included, conf);
+        keyReader = createTreeReader(keyColumn, types, included, skipCorrupt);
       } else {
         keyReader = null;
       }
       if (included == null || included[valueColumn]) {
-        valueReader = createTreeReader(path, valueColumn, types, included, conf);
+        valueReader = createTreeReader(valueColumn, types, included, skipCorrupt);
       } else {
         valueReader = null;
       }
@@ -2088,7 +2216,7 @@ public class RecordReaderImpl implements
     }
 
     @Override
-    Object nextVector(Object previous, long batchSize) throws IOException {
+    public Object nextVector(Object previous, long batchSize) throws IOException {
       throw new UnsupportedOperationException(
           "NextVector is not supported operation for Map type");
     }
@@ -2098,7 +2226,7 @@ public class RecordReaderImpl implements
       if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
           (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
-            columnId + " of " + path);
+            columnId);
       }
     }
 
@@ -2109,7 +2237,7 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       lengths = createIntegerReader(encodings.get(columnId).getKind(),
           streams.get(new StreamName(columnId,
-              OrcProto.Stream.Kind.LENGTH)), false);
+              OrcProto.Stream.Kind.LENGTH)), false, false);
       if (keyReader != null) {
         keyReader.startStripe(streams, encodings);
       }
@@ -2130,58 +2258,57 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static TreeReader createTreeReader(Path path,
-                                             int columnId,
+  private static TreeReader createTreeReader(int columnId,
                                              List<OrcProto.Type> types,
                                              boolean[] included,
-                                             Configuration conf
+                                             boolean skipCorrupt
                                             ) throws IOException {
     OrcProto.Type type = types.get(columnId);
     switch (type.getKind()) {
       case BOOLEAN:
-        return new BooleanTreeReader(path, columnId, conf);
+        return new BooleanTreeReader(columnId);
       case BYTE:
-        return new ByteTreeReader(path, columnId, conf);
+        return new ByteTreeReader(columnId);
       case DOUBLE:
-        return new DoubleTreeReader(path, columnId, conf);
+        return new DoubleTreeReader(columnId);
       case FLOAT:
-        return new FloatTreeReader(path, columnId, conf);
+        return new FloatTreeReader(columnId);
       case SHORT:
-        return new ShortTreeReader(path, columnId, conf);
+        return new ShortTreeReader(columnId);
       case INT:
-        return new IntTreeReader(path, columnId, conf);
+        return new IntTreeReader(columnId);
       case LONG:
-        return new LongTreeReader(path, columnId, conf);
+        return new LongTreeReader(columnId, skipCorrupt);
       case STRING:
-        return new StringTreeReader(path, columnId, conf);
+        return new StringTreeReader(columnId);
       case CHAR:
         if (!type.hasMaximumLength()) {
           throw new IllegalArgumentException("ORC char type has no length specified");
         }
-        return new CharTreeReader(path, columnId, type.getMaximumLength(), conf);
+        return new CharTreeReader(columnId, type.getMaximumLength());
       case VARCHAR:
         if (!type.hasMaximumLength()) {
           throw new IllegalArgumentException("ORC varchar type has no length specified");
         }
-        return new VarcharTreeReader(path, columnId, type.getMaximumLength(), conf);
+        return new VarcharTreeReader(columnId, type.getMaximumLength());
       case BINARY:
-        return new BinaryTreeReader(path, columnId, conf);
+        return new BinaryTreeReader(columnId);
       case TIMESTAMP:
-        return new TimestampTreeReader(path, columnId, conf);
+        return new TimestampTreeReader(columnId, skipCorrupt);
       case DATE:
-        return new DateTreeReader(path, columnId, conf);
+        return new DateTreeReader(columnId);
       case DECIMAL:
         int precision = type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION;
         int scale =  type.hasScale()? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
-        return new DecimalTreeReader(path, columnId, precision, scale, conf);
+        return new DecimalTreeReader(columnId, precision, scale);
       case STRUCT:
-        return new StructTreeReader(path, columnId, types, included, conf);
+        return new StructTreeReader(columnId, types, included, skipCorrupt);
       case LIST:
-        return new ListTreeReader(path, columnId, types, included, conf);
+        return new ListTreeReader(columnId, types, included, skipCorrupt);
       case MAP:
-        return new MapTreeReader(path, columnId, types, included, conf);
+        return new MapTreeReader(columnId, types, included, skipCorrupt);
       case UNION:
-        return new UnionTreeReader(path, columnId, types, included, conf);
+        return new UnionTreeReader(columnId, types, included, skipCorrupt);
       default:
         throw new IllegalArgumentException("Unsupported type " +
           type.getKind());
@@ -2914,10 +3041,6 @@ public class RecordReaderImpl implements
     // find the next row
     rowInStripe += 1;
     advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-    if (isLogTraceEnabled) {
-      LOG.trace("row from " + reader.path);
-      LOG.trace("orc row = " + result);
-    }
     return result;
   }
 



Mime
View raw message