hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject svn commit: r1659465 - in /hive/branches/llap: llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/
Date Fri, 13 Feb 2015 08:17:53 GMT
Author: prasanthj
Date: Fri Feb 13 08:17:53 2015
New Revision: 1659465

URL: http://svn.apache.org/r1659465
Log:
HIVE-9419p4: LLAP: ORC decoding of row-groups (Partial patch4)

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java
Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1659465&r1=1659464&r2=1659465&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java Fri Feb 13 08:17:53 2015
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.llap.io.decode;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,11 +27,19 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BooleanStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ByteStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.CharacterStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DateStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DecimalStreamReader;
 import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DoubleStreamReader;
 import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.FloatStreamReader;
 import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.IntStreamReader;
 import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.LongStreamReader;
 import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ShortStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.StringStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.TimestampStreamReader;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
@@ -44,8 +51,6 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
 
-import com.google.common.collect.Lists;
-
 public class OrcColumnVectorProducer extends ColumnVectorProducer<OrcBatchKey> {
   private final OrcEncodedDataProducer edp;
   private final OrcMetadataCache metadataCache;
@@ -96,6 +101,7 @@ public class OrcColumnVectorProducer ext
       for (int i = 0; i < maxBatchesRG; i++) {
         if (i == maxBatchesRG - 1) {
           batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE);
+          cvb.size = batchSize;
         }
 
         for (int idx = 0; idx < batch.columnIxs.length; idx++) {
@@ -168,6 +174,41 @@ public class OrcColumnVectorProducer ext
       }
 
       switch (colType.getKind()) {
+        case BINARY:
+          treeReaders[i] = BinaryStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setLengthStream(lengths)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case BOOLEAN:
+          treeReaders[i] = BooleanStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .build();
+          break;
+        case BYTE:
+          treeReaders[i] = ByteStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .build();
+          break;
         case SHORT:
           treeReaders[i] = ShortStreamReader.builder()
               .setFileName(file)
@@ -177,7 +218,7 @@ public class OrcColumnVectorProducer ext
               .setCompressionCodec(codec)
               .setBufferSize(bufferSize)
               .setRowIndex(rowIndex)
-              .setColumnEncodingKind(columnEncoding.getKind())
+              .setColumnEncoding(columnEncoding)
               .build();
           break;
         case INT:
@@ -189,7 +230,7 @@ public class OrcColumnVectorProducer ext
               .setCompressionCodec(codec)
               .setBufferSize(bufferSize)
               .setRowIndex(rowIndex)
-              .setColumnEncodingKind(columnEncoding.getKind())
+              .setColumnEncoding(columnEncoding)
               .build();
           break;
         case LONG:
@@ -201,7 +242,7 @@ public class OrcColumnVectorProducer ext
               .setCompressionCodec(codec)
               .setBufferSize(bufferSize)
               .setRowIndex(rowIndex)
-              .setColumnEncodingKind(columnEncoding.getKind())
+              .setColumnEncoding(columnEncoding)
               .skipCorrupt(skipCorrupt)
               .build();
           break;
@@ -229,9 +270,75 @@ public class OrcColumnVectorProducer ext
           break;
         case CHAR:
         case VARCHAR:
+          treeReaders[i] = CharacterStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setMaxLength(colType.getMaximumLength())
+              .setCharacterType(colType)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setLengthStream(lengths)
+              .setDictionaryStream(dictionary)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
         case STRING:
-//          columnStreams[i] = new StringColumnStream(file, colIx, present, data, dictionary, lengths,
-//              columnEncoding, codec, bufferSize, rowIndex);
+          treeReaders[i] = StringStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setLengthStream(lengths)
+              .setDictionaryStream(dictionary)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case DECIMAL:
+          treeReaders[i] = DecimalStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPrecision(colType.getPrecision())
+              .setScale(colType.getScale())
+              .setPresentStream(present)
+              .setValueStream(data)
+              .setScaleStream(secondary)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncoding(columnEncoding)
+              .build();
+          break;
+        case TIMESTAMP:
+          treeReaders[i] = TimestampStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setSecondsStream(data)
+              .setNanosStream(secondary)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncoding(columnEncoding)
+              .skipCorrupt(skipCorrupt)
+              .build();
+          break;
+        case DATE:
+          treeReaders[i] = DateStreamReader.builder()
+              .setFileName(file)
+              .setColumnIndex(colIx)
+              .setPresentStream(present)
+              .setDataStream(data)
+              .setCompressionCodec(codec)
+              .setBufferSize(bufferSize)
+              .setRowIndex(rowIndex)
+              .setColumnEncoding(columnEncoding)
+              .build();
           break;
         default:
           throw new UnsupportedOperationException("Data type not supported yet! " + colType);
@@ -240,25 +347,6 @@ public class OrcColumnVectorProducer ext
     return treeReaders;
   }
 
-  private List<OrcProto.Stream> getDataStreams(int colIx, List<OrcProto.Stream> streams) {
-    List<OrcProto.Stream> result = Lists.newArrayList();
-    for (OrcProto.Stream stream : streams) {
-      if (stream.getColumn() == colIx) {
-        switch (stream.getKind()) {
-          case PRESENT:
-          case DATA:
-          case LENGTH:
-          case DICTIONARY_DATA:
-          case SECONDARY:
-            result.add(stream);
-          default:
-            // ignore
-        }
-      }
-    }
-    return result;
-  }
-
   private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) {
     return rowIndexEntry.getStatistics().getNumberOfValues();
   }

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BinaryStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class BinaryStreamReader extends RecordReaderImpl.BinaryTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private BinaryStreamReader(int columnId, InStream present,
+      InStream data, InStream length, boolean isFileCompressed,
+      OrcProto.ColumnEncoding encoding, OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, length, encoding);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private EncodedColumnBatch.StreamBuffer lengthStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding columnEncoding;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+      this.lengthStream = secondaryStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
+      return this;
+    }
+
+    public BinaryStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      InStream length = null;
+      if (lengthStream != null) {
+        length = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, null, bufferSize,
+                lengthStream);
+      }
+      return new BinaryStreamReader(columnIndex, present, data, length,
+          compressionCodec != null, columnEncoding, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/BooleanStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class BooleanStreamReader extends RecordReaderImpl.BooleanTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private BooleanStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public BooleanStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new BooleanStreamReader(columnIndex, present, data,
+          compressionCodec != null, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ByteStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class ByteStreamReader extends RecordReaderImpl.ByteTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private ByteStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public ByteStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new ByteStreamReader(columnIndex, present, data, compressionCodec != null, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/CharacterStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class CharacterStreamReader extends RecordReaderImpl.StringTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private CharacterStreamReader(int columnId, int maxLength, OrcProto.Type charType,
+      InStream present,
+      InStream data, InStream length, InStream dictionary,
+      boolean isFileCompressed,
+      OrcProto.ColumnEncoding encoding,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId);
+
+    if (charType.getKind() == OrcProto.Type.Kind.CHAR) {
+      reader = new RecordReaderImpl.CharTreeReader(columnId, maxLength, present, data, length,
+          dictionary, encoding);
+    } else if (charType.getKind() == OrcProto.Type.Kind.VARCHAR) {
+      reader = new RecordReaderImpl.VarcharTreeReader(columnId, maxLength, present, data,
+          length, dictionary, encoding);
+    } else {
+      throw new IOException("Unknown character type " + charType + ". Expected CHAR or VARCHAR.");
+    }
+
+    this.isFileCompressed = isFileCompressed;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    reader.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private int maxLength;
+    private OrcProto.Type charType;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private EncodedColumnBatch.StreamBuffer dictionaryStream;
+    private EncodedColumnBatch.StreamBuffer lengthStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding columnEncoding;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setMaxLength(int maxLength) {
+      this.maxLength = maxLength;
+      return this;
+    }
+
+    public StreamReaderBuilder setCharacterType(OrcProto.Type charType) {
+      this.charType = charType;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+      this.lengthStream = lengthStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+      this.dictionaryStream = dictStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
+      return this;
+    }
+
+    public CharacterStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      InStream length = null;
+      if (lengthStream != null) {
+        length = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, null, bufferSize,
+                lengthStream);
+      }
+
+      InStream dictionary = null;
+      if (dictionaryStream != null) {
+        dictionary = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DICTIONARY_DATA.name(), fileName, null, bufferSize,
+                dictionaryStream);
+      }
+      return new CharacterStreamReader(columnIndex, maxLength, charType, present, data, length,
+          dictionary, compressionCodec != null, columnEncoding, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DateStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class DateStreamReader extends RecordReaderImpl.DateTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private DateStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.ColumnEncoding encoding,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, encoding);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding columnEncoding;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
+      return this;
+    }
+
+    public DateStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new DateStreamReader(columnIndex, present, data,
+          compressionCodec != null, columnEncoding, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DecimalStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class DecimalStreamReader extends RecordReaderImpl.DecimalTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private DecimalStreamReader(int columnId, int precision, int scale, InStream presentStream,
+      InStream valueStream, InStream scaleStream, boolean isFileCompressed,
+      OrcProto.RowIndexEntry rowIndex, OrcProto.ColumnEncoding encoding) throws IOException {
+    super(columnId, precision, scale, presentStream, valueStream, scaleStream, encoding);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer valueStream;
+    private EncodedColumnBatch.StreamBuffer scaleStream;
+    private int scale;
+    private int precision;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding columnEncoding;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPrecision(int precision) {
+      this.precision = precision;
+      return this;
+    }
+
+    public StreamReaderBuilder setScale(int scale) {
+      this.scale = scale;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setValueStream(EncodedColumnBatch.StreamBuffer valueStream) {
+      this.valueStream = valueStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setScaleStream(EncodedColumnBatch.StreamBuffer scaleStream) {
+      this.scaleStream = scaleStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
+      return this;
+    }
+
+    public DecimalStreamReader build() throws IOException {
+      InStream presentInStream = null;
+      if (presentStream != null) {
+        presentInStream = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream valueInStream = null;
+      if (valueStream != null) {
+        valueInStream = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                valueStream);
+      }
+
+      InStream scaleInStream = null;
+      if (scaleStream != null) {
+        scaleInStream = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.SECONDARY.name(), fileName, null, bufferSize,
+                scaleStream);
+      }
+
+      return new DecimalStreamReader(columnIndex, precision, scale, presentInStream, valueInStream,
+          scaleInStream, compressionCodec != null, rowIndex, columnEncoding);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java?rev=1659465&r1=1659464&r2=1659465&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java Fri Feb 13 08:17:53 2015
@@ -36,9 +36,9 @@ public class IntStreamReader extends Rec
 
   private IntStreamReader(int columnId, InStream present,
       InStream data, boolean isFileCompressed,
-      OrcProto.ColumnEncoding.Kind kind,
+      OrcProto.ColumnEncoding encoding,
       OrcProto.RowIndexEntry rowIndex) throws IOException {
-    super(columnId, present, data, kind);
+    super(columnId, present, data, encoding);
     this.isFileCompressed = isFileCompressed;
     this.rowIndex = rowIndex;
 
@@ -59,7 +59,7 @@ public class IntStreamReader extends Rec
     private CompressionCodec compressionCodec;
     private int bufferSize;
     private OrcProto.RowIndexEntry rowIndex;
-    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+    private OrcProto.ColumnEncoding columnEncoding;
 
     public StreamReaderBuilder setFileName(String fileName) {
       this.fileName = fileName;
@@ -96,8 +96,8 @@ public class IntStreamReader extends Rec
       return this;
     }
 
-    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
-      this.columnEncodingKind = kind;
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
       return this;
     }
 
@@ -117,7 +117,7 @@ public class IntStreamReader extends Rec
       }
 
       return new IntStreamReader(columnIndex, present, data,
-          compressionCodec != null, columnEncodingKind, rowIndex);
+          compressionCodec != null, columnEncoding, rowIndex);
     }
   }
 

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java?rev=1659465&r1=1659464&r2=1659465&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java Fri Feb 13 08:17:53 2015
@@ -36,9 +36,9 @@ public class LongStreamReader extends Re
 
   private LongStreamReader(int columnId, InStream present,
       InStream data, boolean isFileCompressed,
-      OrcProto.ColumnEncoding.Kind kind, boolean skipCorrupt,
+      OrcProto.ColumnEncoding encoding, boolean skipCorrupt,
       OrcProto.RowIndexEntry rowIndex) throws IOException {
-    super(columnId, present, data, kind, skipCorrupt);
+    super(columnId, present, data, encoding, skipCorrupt);
     this.isFileCompressed = isFileCompressed;
     this.rowIndex = rowIndex;
 
@@ -59,7 +59,7 @@ public class LongStreamReader extends Re
     private CompressionCodec compressionCodec;
     private int bufferSize;
     private OrcProto.RowIndexEntry rowIndex;
-    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+    private OrcProto.ColumnEncoding columnEncoding;
     private boolean skipCorrupt;
 
     public StreamReaderBuilder setFileName(String fileName) {
@@ -97,8 +97,8 @@ public class LongStreamReader extends Re
       return this;
     }
 
-    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
-      this.columnEncodingKind = kind;
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
       return this;
     }
 
@@ -123,7 +123,7 @@ public class LongStreamReader extends Re
       }
 
       return new LongStreamReader(columnIndex, present, data,
-          compressionCodec != null, columnEncodingKind, skipCorrupt, rowIndex);
+          compressionCodec != null, columnEncoding, skipCorrupt, rowIndex);
     }
   }
 

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java?rev=1659465&r1=1659464&r2=1659465&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java Fri Feb 13 08:17:53 2015
@@ -36,9 +36,9 @@ public class ShortStreamReader extends R
 
   private ShortStreamReader(int columnId, InStream present,
       InStream data, boolean isFileCompressed,
-      OrcProto.ColumnEncoding.Kind kind,
+      OrcProto.ColumnEncoding encoding,
       OrcProto.RowIndexEntry rowIndex) throws IOException {
-    super(columnId, present, data, kind);
+    super(columnId, present, data, encoding);
     this.isFileCompressed = isFileCompressed;
     this.rowIndex = rowIndex;
 
@@ -59,7 +59,7 @@ public class ShortStreamReader extends R
     private CompressionCodec compressionCodec;
     private int bufferSize;
     private OrcProto.RowIndexEntry rowIndex;
-    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+    private OrcProto.ColumnEncoding columnEncoding;
 
     public StreamReaderBuilder setFileName(String fileName) {
       this.fileName = fileName;
@@ -96,8 +96,8 @@ public class ShortStreamReader extends R
       return this;
     }
 
-    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
-      this.columnEncodingKind = kind;
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
       return this;
     }
 
@@ -117,7 +117,7 @@ public class ShortStreamReader extends R
       }
 
       return new ShortStreamReader(columnIndex, present, data,
-          compressionCodec != null, columnEncodingKind, rowIndex);
+          compressionCodec != null, columnEncoding, rowIndex);
     }
   }
 

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/StringStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class StringStreamReader extends RecordReaderImpl.StringTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private StringStreamReader(int columnId, InStream present,
+      InStream data, InStream length, InStream dictionary,
+      boolean isFileCompressed,
+      OrcProto.ColumnEncoding encoding,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, length, dictionary, encoding);
+    this.isFileCompressed = isFileCompressed;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private EncodedColumnBatch.StreamBuffer dictionaryStream;
+    private EncodedColumnBatch.StreamBuffer lengthStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding columnEncoding;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setLengthStream(EncodedColumnBatch.StreamBuffer lengthStream) {
+      this.lengthStream = lengthStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDictionaryStream(EncodedColumnBatch.StreamBuffer dictStream) {
+      this.dictionaryStream = dictStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
+      return this;
+    }
+
+    public StringStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      InStream length = null;
+      if (lengthStream != null) {
+        length = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.LENGTH.name(), fileName, null, bufferSize,
+                lengthStream);
+      }
+
+      InStream dictionary = null;
+      if (dictionaryStream != null) {
+        dictionary = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DICTIONARY_DATA.name(), fileName, null, bufferSize,
+                dictionaryStream);
+      }
+      return new StringStreamReader(columnIndex, present, data, length, dictionary,
+          compressionCodec != null, columnEncoding, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java?rev=1659465&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/TimestampStreamReader.java Fri Feb 13 08:17:53 2015
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class TimestampStreamReader extends RecordReaderImpl.TimestampTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private TimestampStreamReader(int columnId, InStream present,
+      InStream data, InStream nanos, boolean isFileCompressed,
+      OrcProto.ColumnEncoding encoding, boolean skipCorrupt,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, nanos, encoding, skipCorrupt);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private EncodedColumnBatch.StreamBuffer nanosStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding columnEncoding;
+    private boolean skipCorrupt;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream) {
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setSecondsStream(EncodedColumnBatch.StreamBuffer dataStream) {
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setNanosStream(EncodedColumnBatch.StreamBuffer secondaryStream) {
+      this.nanosStream = secondaryStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) {
+      this.columnEncoding = encoding;
+      return this;
+    }
+
+    public StreamReaderBuilder skipCorrupt(boolean skipCorrupt) {
+      this.skipCorrupt = skipCorrupt;
+      return this;
+    }
+
+    public TimestampStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      InStream nanos = null;
+      if (nanosStream != null) {
+        nanos = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.SECONDARY.name(), fileName, null, bufferSize,
+                nanosStream);
+      }
+      return new TimestampStreamReader(columnIndex, present, data, nanos,
+          compressionCodec != null, columnEncoding, skipCorrupt, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1659465&r1=1659464&r2=1659465&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Feb 13 08:17:53 2015
@@ -286,7 +286,7 @@ public class RecordReaderImpl implements
       seek(index[columnId]);
     }
 
-    void seek(PositionProvider index) throws IOException {
+    public void seek(PositionProvider index) throws IOException {
       if (present != null) {
         present.seek(index);
       }
@@ -351,7 +351,14 @@ public class RecordReaderImpl implements
     private BitFieldReader reader = null;
 
     public BooleanTreeReader(int columnId) throws IOException {
-      super(columnId);
+      this(columnId, null, null);
+    }
+
+    public BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      if (data != null) {
+        reader = new BitFieldReader(data, 1);
+      }
     }
 
     @Override
@@ -412,11 +419,16 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class ByteTreeReader extends TreeReader{
+  public static class ByteTreeReader extends TreeReader{
     private RunLengthByteReader reader = null;
 
     ByteTreeReader(int columnId) throws IOException {
-      super(columnId);
+      this(columnId, null, null);
+    }
+
+    public ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
+      super(columnId, present);
+      this.reader = new RunLengthByteReader(data);
     }
 
     @Override
@@ -485,11 +497,12 @@ public class RecordReaderImpl implements
     }
 
     public ShortTreeReader(int columnId, InStream present, InStream data,
-        OrcProto.ColumnEncoding.Kind encoding)
+        OrcProto.ColumnEncoding encoding)
         throws IOException {
       super(columnId, present);
       if (data != null && encoding != null) {
-        this.reader = createIntegerReader(encoding, data, true, false);
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
       }
     }
 
@@ -570,11 +583,12 @@ public class RecordReaderImpl implements
     }
 
     public IntTreeReader(int columnId, InStream present, InStream data,
-        OrcProto.ColumnEncoding.Kind encoding)
+        OrcProto.ColumnEncoding encoding)
         throws IOException {
       super(columnId, present);
       if (data != null && encoding != null) {
-        this.reader = createIntegerReader(encoding, data, true, false);
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, false);
       }
     }
 
@@ -649,20 +663,19 @@ public class RecordReaderImpl implements
 
   public static class LongTreeReader extends TreeReader{
     private IntegerReader reader = null;
-    private final boolean skipCorrupt;
 
     LongTreeReader(int columnId, boolean skipCorrupt) throws IOException {
       this(columnId, null, null, null, skipCorrupt);
     }
 
     public LongTreeReader(int columnId, InStream present, InStream data,
-        OrcProto.ColumnEncoding.Kind encoding,
+        OrcProto.ColumnEncoding encoding,
         boolean skipCorrupt)
         throws IOException {
       super(columnId, present);
-      this.skipCorrupt = skipCorrupt;
       if (data != null && encoding != null) {
-        this.reader = createIntegerReader(encoding, data, true, skipCorrupt);
+        checkEncoding(encoding);
+        this.reader = createIntegerReader(encoding.getKind(), data, true, skipCorrupt);
       }
     }
 
@@ -917,15 +930,25 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class BinaryTreeReader extends TreeReader{
+  public static class BinaryTreeReader extends TreeReader{
     protected InStream stream;
     protected IntegerReader lengths = null;
 
     protected final LongColumnVector scratchlcv;
 
     BinaryTreeReader(int columnId) throws IOException {
-      super(columnId);
+      this(columnId, null, null, null, null);
+    }
+
+    public BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
       scratchlcv = new LongColumnVector();
+      this.stream = data;
+      if (length != null && encoding != null) {
+        checkEncoding(encoding);
+        this.lengths = createIntegerReader(encoding.getKind(), length, false, false);
+      }
     }
 
     @Override
@@ -1013,15 +1036,31 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class TimestampTreeReader extends TreeReader {
+  public static class TimestampTreeReader extends TreeReader {
     private IntegerReader data = null;
     private IntegerReader nanos = null;
-    private final LongColumnVector nanoVector = new LongColumnVector();
     private final boolean skipCorrupt;
 
     TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException {
-      super(columnId);
+      this(columnId, null, null, null, null, skipCorrupt);
+    }
+
+    public TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
+        InStream nanosStream, OrcProto.ColumnEncoding encoding, boolean skipCorrupt)
+        throws IOException {
+      super(columnId, presentStream);
       this.skipCorrupt = skipCorrupt;
+      if (encoding != null) {
+        checkEncoding(encoding);
+
+        if (dataStream != null) {
+          this.data = createIntegerReader(encoding.getKind(), dataStream, true, skipCorrupt);
+        }
+
+        if (nanosStream != null) {
+          this.nanos = createIntegerReader(encoding.getKind(), nanosStream, false, skipCorrupt);
+        }
+      }
     }
 
     @Override
@@ -1130,11 +1169,20 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class DateTreeReader extends TreeReader{
+  public static class DateTreeReader extends TreeReader{
     private IntegerReader reader = null;
 
     DateTreeReader(int columnId) throws IOException {
-      super(columnId);
+      this(columnId, null, null, null);
+    }
+
+    public DateTreeReader(int columnId, InStream present, InStream data,
+        OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
+      if (data != null && encoding != null) {
+        checkEncoding(encoding);
+        reader = createIntegerReader(encoding.getKind(), data, true, false);
+      }
     }
 
     @Override
@@ -1205,18 +1253,30 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class DecimalTreeReader extends TreeReader{
+  public static class DecimalTreeReader extends TreeReader{
     private InStream valueStream;
-    private IntegerReader scaleStream = null;
-    private LongColumnVector scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+    private IntegerReader scaleReader = null;
+    private LongColumnVector scratchScaleVector;
 
     private final int precision;
     private final int scale;
 
     DecimalTreeReader(int columnId, int precision, int scale) throws IOException {
-      super(columnId);
+      this(columnId, precision, scale, null, null, null, null);
+    }
+
+    public DecimalTreeReader(int columnId, int precision, int scale, InStream present,
+        InStream valueStream, InStream scaleStream, OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      super(columnId, present);
       this.precision = precision;
       this.scale = scale;
+      this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+      this.valueStream = valueStream;
+      if (scaleStream != null && encoding != null) {
+        checkEncoding(encoding);
+        this.scaleReader = createIntegerReader(encoding.getKind(), scaleStream, true, false);
+      }
     }
 
     @Override
@@ -1235,7 +1295,7 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
       valueStream = streams.get(new StreamName(columnId,
           OrcProto.Stream.Kind.DATA));
-      scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
+      scaleReader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
           new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true, false);
     }
 
@@ -1248,7 +1308,7 @@ public class RecordReaderImpl implements
     public void seek(PositionProvider index) throws IOException {
       super.seek(index);
       valueStream.seek(index);
-      scaleStream.seek(index);
+      scaleReader.seek(index);
     }
 
     @Override
@@ -1262,7 +1322,7 @@ public class RecordReaderImpl implements
           result = (HiveDecimalWritable) previous;
         }
         result.set(HiveDecimal.create(SerializationUtils.readBigInteger(valueStream),
-            (int) scaleStream.next()));
+            (int) scaleReader.next()));
         return HiveDecimalUtils.enforcePrecisionScale(result, precision, scale);
       }
       return null;
@@ -1287,7 +1347,7 @@ public class RecordReaderImpl implements
       if (result.isRepeating) {
         if (!result.isNull[0]) {
           BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
-          short scaleInData = (short) scaleStream.next();
+          short scaleInData = (short) scaleReader.next();
           HiveDecimal dec = HiveDecimal.create(bInt, scaleInData);
           dec = HiveDecimalUtils.enforcePrecisionScale(dec, precision, scale);
           result.set(0, dec);
@@ -1295,7 +1355,7 @@ public class RecordReaderImpl implements
       } else {
         // result vector has isNull values set, use the same to read scale vector.
         scratchScaleVector.isNull = result.isNull;
-        scaleStream.nextVector(scratchScaleVector, batchSize);
+        scaleReader.nextVector(scratchScaleVector, batchSize);
         for (int i = 0; i < batchSize; i++) {
           if (!result.isNull[i]) {
             BigInteger bInt = SerializationUtils.readBigInteger(valueStream);
@@ -1317,7 +1377,7 @@ public class RecordReaderImpl implements
       for(int i=0; i < items; i++) {
         SerializationUtils.readBigInteger(valueStream);
       }
-      scaleStream.skip(items);
+      scaleReader.skip(items);
     }
   }
 
@@ -1326,13 +1386,35 @@ public class RecordReaderImpl implements
    * stripe, it creates an internal reader based on whether a direct or
    * dictionary encoding was used.
    */
-  private static class StringTreeReader extends TreeReader {
-    private TreeReader reader;
+  public static class StringTreeReader extends TreeReader {
+    protected TreeReader reader;
 
-    StringTreeReader(int columnId) throws IOException {
+    public StringTreeReader(int columnId) throws IOException {
       super(columnId);
     }
 
+    public StringTreeReader(int columnId, InStream present, InStream data, InStream length,
+        InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present);
+      if (encoding != null) {
+        switch (encoding.getKind()) {
+          case DIRECT:
+          case DIRECT_V2:
+            reader = new StringDirectTreeReader(columnId, present, data, length,
+                encoding.getKind());
+            break;
+          case DICTIONARY:
+          case DICTIONARY_V2:
+            reader = new StringDictionaryTreeReader(columnId, present, data, length, dictionary,
+                encoding);
+            break;
+          default:
+            throw new IllegalArgumentException("Unsupported encoding " +
+                encoding.getKind());
+        }
+      }
+    }
+
     @Override
     void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
       reader.checkEncoding(encoding);
@@ -1460,15 +1542,23 @@ public class RecordReaderImpl implements
    * A reader for string columns that are direct encoded in the current
    * stripe.
    */
-  private static class StringDirectTreeReader extends TreeReader {
+  public static class StringDirectTreeReader extends TreeReader {
     private InStream stream;
     private IntegerReader lengths;
-
     private final LongColumnVector scratchlcv;
 
     StringDirectTreeReader(int columnId) throws IOException {
-      super(columnId);
-      scratchlcv = new LongColumnVector();
+      this(columnId, null, null, null, null);
+    }
+
+    public StringDirectTreeReader(int columnId, InStream present, InStream data, InStream length,
+        OrcProto.ColumnEncoding.Kind encoding) throws IOException {
+      super(columnId, present);
+      this.scratchlcv = new LongColumnVector();
+      this.stream = data;
+      if (length != null && encoding != null) {
+        this.lengths = createIntegerReader(encoding, length, false, false);
+      }
     }
 
     @Override
@@ -1562,7 +1652,7 @@ public class RecordReaderImpl implements
    * A reader for string columns that are dictionary encoded in the current
    * stripe.
    */
-  private static class StringDictionaryTreeReader extends TreeReader {
+  public static class StringDictionaryTreeReader extends TreeReader {
     private DynamicByteArray dictionaryBuffer;
     private int[] dictionaryOffsets;
     private IntegerReader reader;
@@ -1571,8 +1661,25 @@ public class RecordReaderImpl implements
     private final LongColumnVector scratchlcv;
 
     StringDictionaryTreeReader(int columnId) throws IOException {
-      super(columnId);
+      this(columnId, null, null, null, null, null);
+    }
+
+    public StringDictionaryTreeReader(int columnId, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding)
+        throws IOException{
+      super(columnId, present);
       scratchlcv = new LongColumnVector();
+      if (data != null && encoding != null) {
+        this.reader = createIntegerReader(encoding.getKind(), data, false, false);
+      }
+
+      if (dictionary != null && encoding != null) {
+        readDictionaryStream(dictionary);
+      }
+
+      if (length != null && encoding != null) {
+        readDictionaryLengthStream(length, encoding);
+      }
     }
 
     @Override
@@ -1591,28 +1698,27 @@ public class RecordReaderImpl implements
       super.startStripe(streams, encodings);
 
       // read the dictionary blob
-      int dictionarySize = encodings.get(columnId).getDictionarySize();
       StreamName name = new StreamName(columnId,
           OrcProto.Stream.Kind.DICTIONARY_DATA);
       InStream in = streams.get(name);
-      if (in != null) { // Guard against empty dictionary stream.
-        if (in.available() > 0) {
-          dictionaryBuffer = new DynamicByteArray(64, in.available());
-          dictionaryBuffer.readAll(in);
-          // Since its start of strip invalidate the cache.
-          dictionaryBufferInBytesCache = null;
-        }
-        in.close();
-      } else {
-        dictionaryBuffer = null;
-      }
+      readDictionaryStream(in);
 
       // read the lengths
       name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
       in = streams.get(name);
+      readDictionaryLengthStream(in, encodings.get(columnId));
+
+      // set up the row reader
+      name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
+      reader = createIntegerReader(encodings.get(columnId).getKind(),
+          streams.get(name), false, false);
+    }
+
+    private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding encoding)
+        throws IOException {
+      int dictionarySize = encoding.getDictionarySize();
       if (in != null) { // Guard against empty LENGTH stream.
-        IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
-            .getKind(), in, false, false);
+        IntegerReader lenReader = createIntegerReader(encoding.getKind(), in, false, false);
         int offset = 0;
         if (dictionaryOffsets == null ||
             dictionaryOffsets.length < dictionarySize + 1) {
@@ -1626,10 +1732,20 @@ public class RecordReaderImpl implements
         in.close();
       }
 
-      // set up the row reader
-      name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
-      reader = createIntegerReader(encodings.get(columnId).getKind(),
-          streams.get(name), false, false);
+    }
+
+    private void readDictionaryStream(InStream in) throws IOException {
+      if (in != null) { // Guard against empty dictionary stream.
+        if (in.available() > 0) {
+          dictionaryBuffer = new DynamicByteArray(64, in.available());
+          dictionaryBuffer.readAll(in);
+          // Since its start of strip invalidate the cache.
+          dictionaryBufferInBytesCache = null;
+        }
+        in.close();
+      } else {
+        dictionaryBuffer = null;
+      }
     }
 
     @Override
@@ -1742,11 +1858,16 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class CharTreeReader extends StringTreeReader {
+  public static class CharTreeReader extends StringTreeReader {
     int maxLength;
 
-    CharTreeReader(int columnId, int maxLength) throws IOException {
-      super(columnId);
+    public CharTreeReader(int columnId, int maxLength) throws IOException {
+      this(columnId, maxLength, null, null, null, null, null);
+    }
+
+    public CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, length, dictionary, encoding);
       this.maxLength = maxLength;
     }
 
@@ -1806,11 +1927,16 @@ public class RecordReaderImpl implements
     }
   }
 
-  private static class VarcharTreeReader extends StringTreeReader {
+  public static class VarcharTreeReader extends StringTreeReader {
     int maxLength;
 
-    VarcharTreeReader(int columnId, int maxLength) throws IOException {
-      super(columnId);
+    public VarcharTreeReader(int columnId, int maxLength) throws IOException {
+      this(columnId, maxLength, null, null, null, null, null);
+    }
+
+    public VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
+        InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
+      super(columnId, present, data, length, dictionary, encoding);
       this.maxLength = maxLength;
     }
 



Mime
View raw message