hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject svn commit: r1658797 [1/2] - in /hive/branches/llap: llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/ llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ llap-server/src/...
Date Tue, 10 Feb 2015 20:30:25 GMT
Author: prasanthj
Date: Tue Feb 10 20:30:24 2015
New Revision: 1658797

URL: http://svn.apache.org/r1658797
Log:
HIVE-9419: LLAP: ORC decoding of row-groups (Partial patch)

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DictionaryStringReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DirectStringReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/StringReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/BaseColumnStream.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ColumnStream.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleColumnStream.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatColumnStream.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntegerColumnStream.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StreamUtils.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StringColumnStream.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java Tue Feb 10 20:30:24 2015
@@ -49,4 +49,9 @@ public class OrcBatchKey {
     // Strings are interned and can thus be compared like this.
     return stripeIx == other.stripeIx && rgIx == other.rgIx && file == other.file;
   }
+
+  @Override
+  public OrcBatchKey clone() throws CloneNotSupportedException {
+    return new OrcBatchKey(file, stripeIx, rgIx);
+  }
 }
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ColumnVectorBatch.java Tue Feb 10 20:30:24 2015
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.llap.io.api.impl;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
 /**
  * Unlike VRB, doesn't have some fields, and doesn't have all columns
@@ -27,4 +28,13 @@ import org.apache.hadoop.hive.ql.exec.ve
 public class ColumnVectorBatch {
   public ColumnVector[] cols;
   public int size;
+
+  public ColumnVectorBatch(int columnCount) {
+    this(columnCount, VectorizedRowBatch.DEFAULT_SIZE);
+  }
+
+  public ColumnVectorBatch(int columnCount, int batchSize) {
+    this.cols = new ColumnVector[columnCount];
+    this.size = batchSize;
+  }
 }
\ No newline at end of file

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java Tue Feb 10 20:30:24 2015
@@ -44,12 +44,14 @@ public class LlapInputFormat
   implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface {
   private static final Log LOG = LogFactory.getLog(LlapInputFormat.class);
   private final LlapIoImpl llapIo;
+  private InputFormat sourceInputFormat;
 
   LlapInputFormat(LlapIoImpl llapIo, InputFormat sourceInputFormat) {
     // TODO: right now, we do nothing with source input format, ORC-only in the first cut.
     //       We'd need to plumb it thru and use it to get data to cache/etc.
     assert sourceInputFormat instanceof OrcInputFormat;
     this.llapIo = llapIo;
+    this.sourceInputFormat = sourceInputFormat;
   }
 
   @Override
@@ -79,7 +81,7 @@ public class LlapInputFormat
 
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    throw new UnsupportedOperationException();
+    return sourceInputFormat.getSplits(job, numSplits);
   }
 
   private static class LlapRecordReader

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java Tue Feb 10 20:30:24 2015
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.llap.io.decode;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.conf.Configuration;
@@ -25,16 +27,34 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.ColumnStream;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.DoubleColumnStream;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.FloatColumnStream;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.IntegerColumnStream;
+import org.apache.hadoop.hive.llap.io.decode.orc.streams.StringColumnStream;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
+import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+
+import com.clearspring.analytics.util.Lists;
 
 public class OrcColumnVectorProducer extends ColumnVectorProducer<OrcBatchKey> {
   private final OrcEncodedDataProducer edp;
+  private final OrcMetadataCache metadataCache;
+  private ColumnVectorBatch cvb;
 
   public OrcColumnVectorProducer(
       ExecutorService executor, OrcEncodedDataProducer edp, Configuration conf) {
     super(executor);
     this.edp = edp;
+    this.metadataCache = OrcMetadataCache.getInstance();
+    this.cvb = null;
   }
 
   @Override
@@ -45,11 +65,157 @@ public class OrcColumnVectorProducer ext
   @Override
   protected void decodeBatch(EncodedColumnBatch<OrcBatchKey> batch,
       Consumer<ColumnVectorBatch> downstreamConsumer) {
-    throw new UnsupportedOperationException("not implemented");
-    // TODO:  HERE decode EncodedColumn-s into ColumnVector-s
-    //        sarg columns first, apply sarg, then decode others if needed; can cols skip values?
-    //        fill lockedBuffers from batch as we go and lock multiple times /after first/;
-    //        the way unlocking would work is that consumer returns CVB and we unlock lockedBuffer,
-    //        so we must either lock for each CVB or have extra refcount here and a map.
+    String fileName = batch.batchKey.file;
+    // OrcEncodedDataProducer should have just loaded cache entries from this file.
+    // The default LRU algorithm shouldn't have dropped the entries. To make it
+    // safe, untie the code from EDP into separate class and make use of loading cache.
+    try {
+      OrcFileMetadata fileMetadata = metadataCache.getFileMetadata(fileName);
+      OrcBatchKey stripeKey = batch.batchKey.clone();
+      // we are interested only in the stripe number. To make sure we get the correct stripe
+      // metadata, set row group index to 0. That's how it is cached. See OrcEncodedDataProducer
+      stripeKey.rgIx = 0;
+      OrcStripeMetadata stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
+      if (cvb == null) {
+        cvb = new ColumnVectorBatch(batch.columnIxs.length);
+      }
+
+      // Get non null row count from root column
+      int rgIdx = batch.batchKey.rgIx;
+      OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexes()[0].getEntry(rgIdx);
+      long nonNullRowCount = getRowCount(rowIndex);
+      int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1);
+      int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
+      int numCols = batch.columnIxs.length;
+      ColumnStream[] columnStreams = createColumnStreamReaders(numCols, batch, fileMetadata,
+          stripeMetadata);
+      for (int i = 0; i < maxBatchesRG; i++) {
+        if (i == maxBatchesRG - 1) {
+          batchSize = (int) (nonNullRowCount % VectorizedRowBatch.DEFAULT_SIZE);
+        }
+
+        for (int idx = 0; idx < batch.columnIxs.length; idx++) {
+          cvb.cols[idx] = columnStreams[idx].nextVector(null, batchSize);
+        }
+
+        // we are done reading a batch, send it to consumer for processing
+        downstreamConsumer.consumeData(cvb);
+      }
+    } catch (IOException ioe) {
+      downstreamConsumer.setError(ioe);
+    } catch (CloneNotSupportedException e) {
+      downstreamConsumer.setError(e);
+    }
+  }
+
+  private ColumnStream[] createColumnStreamReaders(int numCols,
+      EncodedColumnBatch<OrcBatchKey> batch,
+      OrcFileMetadata fileMetadata,
+      OrcStripeMetadata stripeMetadata) throws IOException {
+    String file = batch.batchKey.file;
+    ColumnStream[] columnStreams = new ColumnStream[numCols];
+
+    for (int i = 0; i < numCols; i++) {
+      int colIx = batch.columnIxs[i];
+      int rgIdx = batch.batchKey.rgIx;
+      OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexes()[colIx].getEntry(rgIdx);
+      EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
+      OrcProto.Type colType = fileMetadata.getTypes().get(colIx);
+      // TODO: EncodedColumnBatch is already decompressed, we don't really need to pass codec.
+      // But we need to know if the original data is compressed or not. This is used to skip positions
+      // in row index. If the file is originally compressed, then 1st position (compressed offset)
+      // in row index should be skipped to get uncompressed offset, else 1st position should not
+      // be skipped.
+      CompressionCodec codec = fileMetadata.getCompressionCodec();
+      int bufferSize = fileMetadata.getCompressionBufferSize();
+      OrcProto.ColumnEncoding columnEncoding = stripeMetadata.getEncodings().get(colIx);
+      ColumnVector cv = null;
+
+      // FIXME: See if the stream buffers are in this same order. What will happen if some stream
+      // does not exist? Will stream buffer be null?
+      EncodedColumnBatch.StreamBuffer present = null;
+      EncodedColumnBatch.StreamBuffer data = null;
+      EncodedColumnBatch.StreamBuffer dictionary = null;
+      EncodedColumnBatch.StreamBuffer lengths = null;
+      EncodedColumnBatch.StreamBuffer secondary = null;
+      switch (colType.getKind()) {
+        case SHORT:
+        case INT:
+        case LONG:
+          // TODO: EncodedDataProducer should produce stream buffers in enum order of stream kind.
+          // So if a stream does not exist, it should have null instead.
+          if (streamBuffers.length != 2) {
+            present = null;
+            data = streamBuffers[0];
+          } else {
+            present = streamBuffers[0];
+            data = streamBuffers[1];
+          }
+          // FIXME: Creating column stream readers for every row group will be expensive.
+          columnStreams[i] = new IntegerColumnStream(file, colIx, present, data, columnEncoding, codec,
+              bufferSize, rowIndex);
+          break;
+        case FLOAT:
+          if (streamBuffers.length != 2) {
+            present = null;
+            data = streamBuffers[0];
+          } else {
+            present = streamBuffers[0];
+            data = streamBuffers[1];
+          }
+          columnStreams[i] = new FloatColumnStream(file, colIx, present, data, codec, bufferSize,
+              rowIndex);
+          break;
+        case DOUBLE:
+          if (streamBuffers.length != 2) {
+            present = null;
+            data = streamBuffers[0];
+          } else {
+            present = streamBuffers[0];
+            data = streamBuffers[1];
+          }
+          columnStreams[i] = new DoubleColumnStream(file, colIx, present, data, codec, bufferSize,
+              rowIndex);
+          break;
+        case CHAR:
+        case VARCHAR:
+        case STRING:
+          // FIXME: This is hacky! Will never work. Fix it cleanly everywhere. Hopefully encoded
+          // data producer will provide streams in enum order of stream kind
+          present = streamBuffers[0];
+          data = streamBuffers[1];
+          dictionary = streamBuffers[2];
+          lengths = streamBuffers[3];
+          columnStreams[i] = new StringColumnStream(file, colIx, present, data, dictionary, lengths,
+              columnEncoding, codec, bufferSize, rowIndex);
+          break;
+        default:
+          throw new UnsupportedOperationException("Data type not supported yet! " + colType);
+      }
+    }
+    return columnStreams;
+  }
+
+  private List<OrcProto.Stream> getDataStreams(int colIx, List<OrcProto.Stream> streams) {
+    List<OrcProto.Stream> result = Lists.newArrayList();
+    for (OrcProto.Stream stream : streams) {
+      if (stream.getColumn() == colIx) {
+        switch (stream.getKind()) {
+          case PRESENT:
+          case DATA:
+          case LENGTH:
+          case DICTIONARY_DATA:
+          case SECONDARY:
+            result.add(stream);
+          default:
+            // ignore
+        }
+      }
+    }
+    return result;
+  }
+
+  private long getRowCount(OrcProto.RowIndexEntry rowIndexEntry) {
+    return rowIndexEntry.getStatistics().getNumberOfValues();
   }
 }

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DictionaryStringReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DictionaryStringReader.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DictionaryStringReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DictionaryStringReader.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.DynamicByteArray;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.IntegerReader;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class DictionaryStringReader implements StringReader {
+  private DynamicByteArray dictionaryBuffer;
+  private IntegerReader data;
+  private IntegerReader lengths;
+  private InStream dictStream;
+  private int[] dictionaryOffsets;
+  private byte[] dictionaryBufferInBytesCache = null;
+  private final LongColumnVector scratchlcv;
+  private int dictionarySize;
+
+  public DictionaryStringReader(IntegerReader lengths, IntegerReader data, InStream dictionary,
+      int dictionarySize)
+      throws IOException {
+    this.lengths = lengths;
+    this.dictionaryBuffer = null;
+    this.dictStream = dictionary;
+    this.dictionaryOffsets = null;
+    this.dictionarySize = dictionarySize;
+    this.data = data;
+    this.scratchlcv = new LongColumnVector();
+    readDictionary();
+  }
+
+  private void readDictionary() throws IOException {
+    if (dictionaryBuffer == null) {
+      dictionaryBuffer = new DynamicByteArray(64, dictStream.available());
+      this.dictionaryBuffer.readAll(dictStream);
+    }
+
+    int offset = 0;
+    if (dictionaryOffsets == null ||
+        dictionaryOffsets.length < dictionarySize + 1) {
+      dictionaryOffsets = new int[dictionarySize + 1];
+    }
+    for (int i = 0; i < dictionarySize; ++i) {
+      dictionaryOffsets[i] = offset;
+      offset += (int) lengths.next();
+    }
+    dictionaryOffsets[dictionarySize] = offset;
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return data.hasNext();
+  }
+
+  @Override
+  public Text next() throws IOException {
+    int entry = (int) data.next();
+    int offset = dictionaryOffsets[entry];
+    int length = getDictionaryEntryLength(entry, offset);
+    Text result = new Text();
+    dictionaryBuffer.setText(result, offset, length);
+    return result;
+  }
+
+  int getDictionaryEntryLength(int entry, int offset) {
+    int length = 0;
+    // if it isn't the last entry, subtract the offsets otherwise use
+    // the buffer length.
+    if (entry < dictionaryOffsets.length - 1) {
+      length = dictionaryOffsets[entry + 1] - offset;
+    } else {
+      length = dictionaryBuffer.size() - offset;
+    }
+    return length;
+  }
+
+  @Override
+  public ColumnVector nextVector(ColumnVector previous, long batchSize) throws IOException {
+    BytesColumnVector result = null;
+    int offset = 0, length = 0;
+    if (previous == null) {
+      result = new BytesColumnVector();
+    } else {
+      result = (BytesColumnVector) previous;
+    }
+
+    if (dictionaryBuffer != null) {
+
+      // Load dictionaryBuffer into cache.
+      if (dictionaryBufferInBytesCache == null) {
+        dictionaryBufferInBytesCache = dictionaryBuffer.get();
+      }
+
+      // Read string offsets
+      scratchlcv.isNull = result.isNull;
+      lengths.nextVector(scratchlcv, batchSize);
+      if (!scratchlcv.isRepeating) {
+
+        // The vector has non-repeating strings. Iterate thru the batch
+        // and set strings one by one
+        for (int i = 0; i < batchSize; i++) {
+          if (!scratchlcv.isNull[i]) {
+            offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+            length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+            result.setRef(i, dictionaryBufferInBytesCache, offset, length);
+          } else {
+            // If the value is null then set offset and length to zero (null string)
+            result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+          }
+        }
+      } else {
+        // If the value is repeating then just set the first value in the
+        // vector and set the isRepeating flag to true. No need to iterate thru and
+        // set all the elements to the same value
+        offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
+        length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
+        result.setRef(0, dictionaryBufferInBytesCache, offset, length);
+      }
+      result.isRepeating = scratchlcv.isRepeating;
+    } else {
+      // Entire stripe contains null strings.
+      result.isRepeating = true;
+      result.noNulls = false;
+      result.isNull[0] = true;
+      result.setRef(0, "".getBytes(), 0, 0);
+    }
+    return result;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DirectStringReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DirectStringReader.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DirectStringReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/DirectStringReader.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.readers;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.IntegerReader;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public class DirectStringReader implements StringReader {
+  private IntegerReader lengths;
+  private InStream data;
+  private final LongColumnVector scratchlcv;
+
+  public DirectStringReader(IntegerReader lengths, InStream data) {
+    this.lengths = lengths;
+    this.data = data;
+    this.scratchlcv = new LongColumnVector();
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    throw new UnsupportedOperationException("Operation unsupported yet.");
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    throw new UnsupportedOperationException("Operation unsupported yet.");
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return lengths.hasNext();
+  }
+
+  @Override
+  public Text next() throws IOException {
+    int len = (int) lengths.next();
+    int offset = 0;
+    byte[] bytes = new byte[len];
+    while (len > 0) {
+      int written = data.read(bytes, offset, len);
+      if (written < 0) {
+        throw new EOFException("Can't finish byte read from " + data);
+      }
+      len -= written;
+      offset += written;
+    }
+    return new Text(bytes);
+  }
+
+  @Override
+  public ColumnVector nextVector(ColumnVector previous, long batchSize) throws IOException {
+    BytesColumnVector result = null;
+    if (previous == null) {
+      result = new BytesColumnVector();
+    } else {
+      result = (BytesColumnVector) previous;
+    }
+
+    RecordReaderImpl.BytesColumnVectorUtil
+        .readOrcByteArrays(data, lengths, scratchlcv, result, batchSize);
+    return result;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/StringReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/StringReader.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/StringReader.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/StringReader.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ */
+public interface StringReader {
+
+  /**
+   * Seek to the position provided by index.
+   * @param index
+   * @throws java.io.IOException
+   */
+  void seek(PositionProvider index) throws IOException;
+
+  /**
+   * Skip number of specified rows.
+   * @param numValues
+   * @throws IOException
+   */
+  void skip(long numValues) throws IOException;
+
+  /**
+   * Check if there are any more values left.
+   * @return
+   * @throws IOException
+   */
+  boolean hasNext() throws IOException;
+
+  /**
+   * Return the next available value.
+   * @return
+   * @throws IOException
+   */
+  Text next() throws IOException;
+
+  /**
+   * Return the next available vector for values.
+   * @return
+   * @throws IOException
+   */
+  ColumnVector nextVector(ColumnVector previous, long previousLen)
+      throws IOException;
+}
\ No newline at end of file

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/BaseColumnStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/BaseColumnStream.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/BaseColumnStream.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/BaseColumnStream.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.BitFieldReader;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ */
+public abstract class BaseColumnStream implements ColumnStream {
+  protected int columnId;
+  protected EncodedColumnBatch.StreamBuffer nullStream;
+  protected InStream inStream;
+  protected BitFieldReader bitFieldReader;
+  protected boolean isFileCompressed;
+
+  public BaseColumnStream(String file, int columnId, EncodedColumnBatch.StreamBuffer present,
+      CompressionCodec codec, int bufferSize) throws IOException {
+    Preconditions.checkArgument(columnId >= 0, "ColumnId cannot be negative");
+    this.columnId = columnId;
+    if (present != null) {
+      this.nullStream = present;
+      isFileCompressed = codec != null;
+      // pass null for codec as the stream is decompressed
+      this.inStream = StreamUtils.createInStream("PRESENT", file, null, bufferSize, present);
+      this.bitFieldReader = present == null ? null : new BitFieldReader(inStream, 1);
+    } else {
+      this.nullStream = null;
+      this.inStream = null;
+      this.bitFieldReader = null;
+    }
+  }
+
+  public void positionReaders(PositionProvider positionProvider) throws IOException {
+    if (bitFieldReader != null) {
+      // stream is uncompressed and if file is compressed then skip 1st position in index
+      if (isFileCompressed) {
+        positionProvider.getNext();
+      }
+      bitFieldReader.seek(positionProvider);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (nullStream != null && inStream != null) {
+      nullStream.decRef();
+      nullStream = null;
+      inStream.close();
+    }
+  }
+
+  @Override
+  public ColumnVector nextVector(ColumnVector previousVector, int batchSize) throws IOException {
+    ColumnVector result = (ColumnVector) previousVector;
+    if (bitFieldReader != null) {
+      // Set noNulls and isNull vector of the ColumnVector based on
+      // present stream
+      result.noNulls = true;
+      for (int i = 0; i < batchSize; i++) {
+        result.isNull[i] = (bitFieldReader.next() != 1);
+        if (result.noNulls && result.isNull[i]) {
+          result.noNulls = false;
+        }
+      }
+    } else {
+      // There is not present stream, this means that all the values are
+      // present.
+      result.noNulls = true;
+      for (int i = 0; i < batchSize; i++) {
+        result.isNull[i] = false;
+      }
+    }
+    return previousVector;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ColumnStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ColumnStream.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ColumnStream.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/ColumnStream.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * Column stream reader interface.
+ */
+public interface ColumnStream {
+  /**
+   * Closes all internal stream readers.
+   * @throws IOException
+   */
+  public void close() throws IOException;
+
+  /**
+   * Returns next column vector from the stream.
+   * @param previousVector
+   * @param batchSize
+   * @return
+   * @throws IOException
+   */
+  public ColumnVector nextVector(ColumnVector previousVector, int batchSize) throws IOException;
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleColumnStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleColumnStream.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleColumnStream.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/DoubleColumnStream.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.SerializationUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Double stream reader.
+ */
+public class DoubleColumnStream extends BaseColumnStream {
+  private InStream dataStream;
+  private SerializationUtils utils;
+
+  public DoubleColumnStream(String file, int colIx, EncodedColumnBatch.StreamBuffer presentStream,
+      EncodedColumnBatch.StreamBuffer dataStream, CompressionCodec codec, int bufferSize,
+      OrcProto.RowIndexEntry rowIndex)
+      throws IOException {
+    super(file, colIx, presentStream, codec, bufferSize);
+
+    Preconditions.checkNotNull(dataStream, "DATA stream buffer cannot be null");
+
+    // pass null for codec as stream is already decompressed
+    this.dataStream = StreamUtils.createInStream("DATA", file, null, bufferSize, dataStream);
+    this.utils = new SerializationUtils();
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    positionReaders(positionProvider);
+  }
+
+  public void positionReaders(PositionProvider positionProvider) throws IOException {
+    super.positionReaders(positionProvider);
+    // stream is uncompressed and if file is compressed then skip 1st position in index
+    if (isFileCompressed) {
+      positionProvider.getNext();
+    }
+    inStream.seek(positionProvider);
+  }
+
+  @Override
+  public ColumnVector nextVector(ColumnVector previousVector, int batchSize) throws IOException {
+    DoubleColumnVector result = null;
+    if (previousVector == null) {
+      result = new DoubleColumnVector();
+    } else {
+      result = (DoubleColumnVector) previousVector;
+    }
+
+    // Read present/isNull stream
+    super.nextVector(result, batchSize);
+
+    // Read value entries based on isNull entries
+    for (int i = 0; i < batchSize; i++) {
+      if (!result.isNull[i]) {
+        result.vector[i] = utils.readDouble(dataStream);
+      } else {
+        // If the value is not present then set NaN
+        result.vector[i] = Double.NaN;
+      }
+    }
+
+    // Set isRepeating flag
+    result.isRepeating = true;
+    for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+      if (result.vector[i] != result.vector[i + 1]) {
+        result.isRepeating = false;
+      }
+    }
+    return result;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatColumnStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatColumnStream.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatColumnStream.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/FloatColumnStream.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.SerializationUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Float stream reader.
+ */
+public class FloatColumnStream extends BaseColumnStream {
+  private InStream dataStream;
+  private SerializationUtils utils;
+
+  public FloatColumnStream(String file, int colIx, EncodedColumnBatch.StreamBuffer presentStream,
+      EncodedColumnBatch.StreamBuffer dataStream, CompressionCodec codec, int bufferSize,
+      OrcProto.RowIndexEntry rowIndex)
+      throws IOException {
+    super(file, colIx, presentStream, codec, bufferSize);
+
+    Preconditions.checkNotNull(dataStream, "DATA stream buffer cannot be null");
+
+    // pass null for codec as stream is already decompressed
+    this.dataStream = StreamUtils.createInStream("DATA", file, null, bufferSize, dataStream);
+    this.utils = new SerializationUtils();
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    positionReaders(positionProvider);
+  }
+
+  public void positionReaders(PositionProvider positionProvider) throws IOException {
+    super.positionReaders(positionProvider);
+
+    // stream is uncompressed and if file is compressed then skip 1st position in index
+    if (isFileCompressed) {
+      positionProvider.getNext();
+    }
+    inStream.seek(positionProvider);
+  }
+
+  @Override
+  public ColumnVector nextVector(ColumnVector previousVector, int batchSize) throws IOException {
+    DoubleColumnVector result = null;
+    if (previousVector == null) {
+      result = new DoubleColumnVector();
+    } else {
+      result = (DoubleColumnVector) previousVector;
+    }
+
+    // Read present/isNull stream
+    super.nextVector(result, batchSize);
+
+    // Read value entries based on isNull entries
+    for (int i = 0; i < batchSize; i++) {
+      if (!result.isNull[i]) {
+        result.vector[i] = utils.readFloat(dataStream);
+      } else {
+
+        // If the value is not present then set NaN
+        result.vector[i] = Double.NaN;
+      }
+    }
+
+    // Set isRepeating flag
+    result.isRepeating = true;
+    for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+      if (result.vector[i] != result.vector[i + 1]) {
+        result.isRepeating = false;
+      }
+    }
+    return result;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntegerColumnStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntegerColumnStream.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntegerColumnStream.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/IntegerColumnStream.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.IntegerReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Integer stream reader.
+ */
+public class IntegerColumnStream extends BaseColumnStream {
+  private IntegerReader integerReader;
+  private InStream dataStream;
+
+  public IntegerColumnStream(String file, int colIx, EncodedColumnBatch.StreamBuffer present,
+      EncodedColumnBatch.StreamBuffer data, OrcProto.ColumnEncoding columnEncoding,
+      CompressionCodec codec, int bufferSize, OrcProto.RowIndexEntry rowIndex)
+      throws IOException {
+    super(file, colIx, present, codec, bufferSize);
+
+    Preconditions.checkNotNull(data, "DATA stream buffer cannot be null");
+
+    // pass null for codec as stream is already decompressed
+    this.dataStream = StreamUtils.createInStream("DATA", file, null, bufferSize, data);
+    this.integerReader = StreamUtils.createIntegerReader(columnEncoding.getKind(), dataStream, true);
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    positionReaders(positionProvider);
+  }
+
+  public void positionReaders(PositionProvider positionProvider) throws IOException {
+
+    // position the present stream
+    super.positionReaders(positionProvider);
+
+    // stream is uncompressed and if file is compressed then skip 1st position in index
+    if (isFileCompressed) {
+      positionProvider.getNext();
+    }
+    integerReader.seek(positionProvider);
+  }
+
+  @Override
+  public ColumnVector nextVector(ColumnVector previousVector, int batchSize) throws IOException {
+    LongColumnVector result = null;
+    if (previousVector == null) {
+      result = new LongColumnVector();
+    } else {
+      result = (LongColumnVector) previousVector;
+    }
+
+    // Read present/isNull stream
+    super.nextVector(result, batchSize);
+
+    // Read value entries based on isNull entries
+    integerReader.nextVector(result, batchSize);
+    return result;
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StreamUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StreamUtils.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StreamUtils.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StreamUtils.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.IntegerReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerReader;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerReaderV2;
+
+/**
+ *
+ */
+public class StreamUtils {
+  public static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+      InStream in,
+      boolean signed) throws IOException {
+    switch (kind) {
+      case DIRECT_V2:
+      case DICTIONARY_V2:
+        return new RunLengthIntegerReaderV2(in, signed, false);
+      case DIRECT:
+      case DICTIONARY:
+        return new RunLengthIntegerReader(in, signed);
+      default:
+        throw new IllegalArgumentException("Unknown encoding " + kind);
+    }
+  }
+
+  public static InStream createInStream(String streamName, String fileName, CompressionCodec codec,
+      int bufferSize, EncodedColumnBatch.StreamBuffer streamBuffer) throws IOException {
+    int numBuffers = streamBuffer.cacheBuffers.size();
+    List<DiskRange> input = new ArrayList<>(numBuffers);
+    int totalLength = 0;
+    for (int i = 0; i < numBuffers; i++) {
+      LlapMemoryBuffer lmb = streamBuffer.cacheBuffers.get(i);
+      input.add(new RecordReaderImpl.CacheChunk(lmb, 0, lmb.byteBuffer.limit()));
+      totalLength += lmb.byteBuffer.limit();
+    }
+    return InStream.create(fileName, streamName, input, totalLength, codec, bufferSize, null);
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StringColumnStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StringColumnStream.java?rev=1658797&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StringColumnStream.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/StringColumnStream.java Tue Feb 10 20:30:24 2015
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.streams;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.readers.DictionaryStringReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.readers.DirectStringReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.readers.StringReader;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.IntegerReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * String column stream reader.
+ */
+public class StringColumnStream extends BaseColumnStream {
+  private IntegerReader lengthReader;
+  private StringReader stringReader;
+  private IntegerReader dataReader;
+  private InStream dictionaryStream;
+  private InStream lengthStream;
+  private InStream dataStream;
+  private OrcProto.ColumnEncoding.Kind kind;
+
+  public StringColumnStream(String file, int colIx, EncodedColumnBatch.StreamBuffer present,
+      EncodedColumnBatch.StreamBuffer data, EncodedColumnBatch.StreamBuffer dictionary,
+      EncodedColumnBatch.StreamBuffer lengths, OrcProto.ColumnEncoding columnEncoding,
+      CompressionCodec codec, int bufferSize, OrcProto.RowIndexEntry rowIndex)
+      throws IOException {
+    super(file, colIx, present, codec, bufferSize);
+
+    // preconditions check
+    Preconditions.checkNotNull(data, "DATA stream buffer cannot be null");
+    Preconditions.checkNotNull(columnEncoding, "ColumnEncoding cannot be null");
+    Preconditions.checkNotNull(lengths, "ColumnEncoding is " + columnEncoding + "." +
+        " Length stream cannot be null");
+
+    this.dataStream = StreamUtils.createInStream("DATA", file, null, bufferSize, data);
+    this.dataReader = StreamUtils.createIntegerReader(kind, dataStream, false);
+
+    this.lengthStream = StreamUtils.createInStream("LENGTH", file, null, bufferSize, lengths);
+    this.lengthReader = StreamUtils.createIntegerReader(kind, lengthStream, false);
+
+    this.kind = columnEncoding.getKind();
+    if (kind.equals(OrcProto.ColumnEncoding.Kind.DICTIONARY) ||
+        kind.equals(OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)) {
+      Preconditions.checkNotNull(dictionary, "ColumnEncoding is " + columnEncoding + "." +
+          " Dictionary stream cannot be null");
+      this.dictionaryStream = StreamUtils.createInStream(kind.toString(), file, null, bufferSize,
+          dictionary);
+    }
+
+    if (kind.equals(OrcProto.ColumnEncoding.Kind.DIRECT) ||
+        kind.equals(OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+      this.stringReader = new DirectStringReader(lengthReader, dataStream);
+    } else {
+      this.stringReader = new DictionaryStringReader(lengthReader, dataReader, dictionaryStream,
+          columnEncoding.getDictionarySize());
+    }
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    positionReaders(positionProvider);
+  }
+
+  public void positionReaders(PositionProvider positionProvider) throws IOException {
+    super.positionReaders(positionProvider);
+
+    // stream is uncompressed and if file is compressed then skip 1st position in index
+    if (isFileCompressed) {
+      positionProvider.getNext();
+    }
+    dataReader.seek(positionProvider);
+
+    if (kind.equals(OrcProto.ColumnEncoding.Kind.DIRECT) || kind.equals(
+        OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+      if (isFileCompressed) {
+        positionProvider.getNext();
+      }
+      lengthReader.seek(positionProvider);
+    }
+  }
+
+  @Override
+  public ColumnVector nextVector(ColumnVector previousVector, int batchSize) throws IOException {
+    super.nextVector(previousVector, batchSize);
+    return stringReader.nextVector(previousVector, batchSize);
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Tue Feb 10 20:30:24 2015
@@ -609,7 +609,7 @@ public class OrcEncodedDataProducer impl
     this.cache = cache;
     this.lowLevelCache = lowLevelCache;
     this.conf = conf;
-    this.metadataCache = new OrcMetadataCache();
+    this.metadataCache = OrcMetadataCache.getInstance();
   }
 
   @Override

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java Tue Feb 10 20:30:24 2015
@@ -19,15 +19,17 @@
 package org.apache.hadoop.hive.llap.io.metadata;
 
 import java.util.List;
-import java.util.Map;
 
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
 import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
 
 public class OrcFileMetadata {
   private CompressionKind compressionKind;
+  private CompressionCodec codec;
   private int compressionBufferSize;
   private List<OrcProto.Type> types;
   private List<StripeInformation> stripes;
@@ -35,6 +37,7 @@ public class OrcFileMetadata {
 
   public OrcFileMetadata(Reader reader) {
     setCompressionKind(reader.getCompression());
+    setCompressionCodec(WriterImpl.createCodec(compressionKind));
     setCompressionBufferSize(reader.getCompressionSize());
     setStripes(reader.getStripes());
     setTypes(reader.getTypes());
@@ -80,4 +83,20 @@ public class OrcFileMetadata {
   public void setRowIndexStride(int rowIndexStride) {
     this.rowIndexStride = rowIndexStride;
   }
+
+  public int getColumnCount() {
+    return types.size();
+  }
+
+  public int getFlattenedColumnCount() {
+    return types.get(0).getSubtypesCount();
+  }
+
+  public CompressionCodec getCompressionCodec() {
+    return codec;
+  }
+
+  public void setCompressionCodec(CompressionCodec codec) {
+    this.codec = codec;
+  }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java Tue Feb 10 20:30:24 2015
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.llap.io.metadata;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 
@@ -36,6 +35,8 @@ public class OrcMetadataCache {
   private static final int DEFAULT_MAX_STRIPE_ENTRIES = 10000;
   private static Cache<String, OrcFileMetadata> METADATA;
   private static Cache<OrcBatchKey, OrcStripeMetadata> STRIPE_METADATA;
+  private static OrcMetadataCache instance = new OrcMetadataCache();
+  private OrcMetadataCache() {}
 
   static {
     METADATA = CacheBuilder.newBuilder()
@@ -46,7 +47,11 @@ public class OrcMetadataCache {
         .concurrencyLevel(DEFAULT_CACHE_ACCESS_CONCURRENCY)
         .maximumSize(DEFAULT_MAX_STRIPE_ENTRIES)
         .build();
-    }
+  }
+
+  public static OrcMetadataCache getInstance() {
+    return instance;
+  }
 
   public void putFileMetadata(String filePath, OrcFileMetadata metaData) {
     METADATA.put(filePath, metaData);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Tue Feb 10 20:30:24 2015
@@ -21,9 +21,8 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
 
-class BitFieldReader {
+public class BitFieldReader {
   private final RunLengthByteReader input;
   /** The number of bits in one item. Non-test code always uses 1. */
   private final int bitSize;
@@ -31,8 +30,8 @@ class BitFieldReader {
   private int bitsLeft;
   private final int mask;
 
-  BitFieldReader(InStream input,
-                 int bitSize) throws IOException {
+  public BitFieldReader(InStream input,
+      int bitSize) throws IOException {
     this.input = new RunLengthByteReader(input);
     this.bitSize = bitSize;
     mask = (1 << bitSize) - 1;
@@ -47,7 +46,7 @@ class BitFieldReader {
     }
   }
 
-  int next() throws IOException {
+  public int next() throws IOException {
     int result = 0;
     int bitsLeftToRead = bitSize;
     while (bitsLeftToRead > bitsLeft) {
@@ -149,7 +148,7 @@ class BitFieldReader {
     }
   }
 
-  void seek(PositionProvider index) throws IOException {
+  public void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();
     if (consumed > 8) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Tue Feb 10 20:30:24 2015
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.Text;
  * A class that is a growable array of bytes. Growth is managed in terms of
  * chunks that are allocated when needed.
  */
-final class DynamicByteArray {
+public final class DynamicByteArray {
   static final int DEFAULT_CHUNKSIZE = 32 * 1024;
   static final int DEFAULT_NUM_CHUNKS = 128;
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Tue Feb 10 20:30:24 2015
@@ -27,30 +27,50 @@ import java.util.ListIterator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.DiskRange;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
-import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
-import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
 
 import com.google.common.annotations.VisibleForTesting;
 
-abstract class InStream extends InputStream {
+public abstract class InStream extends InputStream {
 
   private static final Log LOG = LogFactory.getLog(InStream.class);
+  protected final String fileName;
+  protected final String name;
+  protected final long length;
+
+  public InStream(String fileName, String name, long length) {
+    this.fileName = fileName;
+    this.name = name;
+    this.length = length;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public String getStreamName() {
+    return name;
+  }
+
+  public long getStreamLength() {
+    return length;
+  }
 
   private static class UncompressedStream extends InStream {
-    private final String name;
     private final List<DiskRange> bytes;
     private final long length;
     private long currentOffset;
     private ByteBuffer range;
     private int currentRange;
 
-    public UncompressedStream(String name, List<DiskRange> input, long length) {
-      this.name = name;
+    public UncompressedStream(String fileName, String name, List<DiskRange> input, long length) {
+      super(fileName, name, length);
       this.bytes = input;
       this.length = length;
       currentRange = 0;
@@ -149,11 +169,8 @@ abstract class InStream extends InputStr
   }
 
   private static class CompressedStream extends InStream {
-    private final String fileName;
-    private final String name;
     private final List<DiskRange> bytes;
     private final int bufferSize;
-    private final long length;
     private LlapMemoryBuffer cacheBuffer;
     private ByteBuffer uncompressed;
     private final CompressionCodec codec;
@@ -166,11 +183,9 @@ abstract class InStream extends InputStr
 
     public CompressedStream(String fileName, String name, List<DiskRange> input, long length,
                             CompressionCodec codec, int bufferSize, LowLevelCache cache) {
-      this.fileName = fileName;
+      super(fileName, name, length);
       this.bytes = input;
-      this.name = name;
       this.codec = codec;
-      this.length = length;
       this.bufferSize = bufferSize;
       currentOffset = 0;
       currentRange = 0;
@@ -456,7 +471,7 @@ abstract class InStream extends InputStr
   /**
    * Create an input stream from a list of buffers.
    * @param name the name of the stream
-   * @param input the list of ranges of bytes for the stream
+   * @param buffers the list of ranges of bytes for the stream
    * @param offsets a list of offsets (the same length as input) that must
    *                contain the first offset of the each set of bytes in input
    * @param length the length in bytes of the stream
@@ -499,7 +514,7 @@ abstract class InStream extends InputStr
                                 int bufferSize,
                                 LowLevelCache cache) throws IOException {
     if (codec == null) {
-      return new UncompressedStream(name, input, length);
+      return new UncompressedStream(fileName, name, input, length);
     } else {
       return new CompressedStream(fileName, name, input, length, codec, bufferSize, cache);
     }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Tue Feb 10 20:30:24 2015
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 /**
  * Interface for reading integers.
  */
-interface IntegerReader {
+public interface IntegerReader {
 
   /**
    * Seek to the position provided by index.

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/PositionProvider.java Tue Feb 10 20:30:24 2015
@@ -21,6 +21,6 @@ package org.apache.hadoop.hive.ql.io.orc
 /**
  * An interface used for seeking to a row index.
  */
-interface PositionProvider {
+public interface PositionProvider {
   long getNext();
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Feb 10 20:30:24 2015
@@ -65,7 +65,6 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream.Kind;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -303,12 +302,17 @@ public class RecordReaderImpl implements
     advanceToNextRow(reader, 0L, true);
   }
 
-  private static final class PositionProviderImpl implements PositionProvider {
+  public static final class PositionProviderImpl implements PositionProvider {
     private final OrcProto.RowIndexEntry entry;
-    private int index = 0;
+    private int index;
 
-    PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
+      this(entry, 0);
+    }
+
+    public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
       this.entry = entry;
+      this.index = startPos;
     }
 
     @Override
@@ -343,7 +347,8 @@ public class RecordReaderImpl implements
       switch (kind) {
       case DIRECT_V2:
       case DICTIONARY_V2:
-        return new RunLengthIntegerReaderV2(in, signed, conf);
+        boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
+        return new RunLengthIntegerReaderV2(in, signed, skipCorrupt);
       case DIRECT:
       case DICTIONARY:
         return new RunLengthIntegerReader(in, signed);
@@ -1369,7 +1374,7 @@ public class RecordReaderImpl implements
   // This class collects together very similar methods for reading an ORC vector of byte arrays and
   // creating the BytesColumnVector.
   //
-  private static class BytesColumnVectorUtil {
+   public static class BytesColumnVectorUtil {
 
     private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths, LongColumnVector scratchlcv,
             BytesColumnVector result, long batchSize) throws IOException {
@@ -2832,35 +2837,32 @@ public class RecordReaderImpl implements
     }
   }
 
-
   private static final int BYTE_STREAM_POSITIONS = 1;
-  private static final int RUN_LENGTH_BYTE_POSITIONS =
-      BYTE_STREAM_POSITIONS + 1;
+  private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
   private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
-  private static final int RUN_LENGTH_INT_POSITIONS =
-    BYTE_STREAM_POSITIONS + 1;
+  private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
 
   /**
    * Get the offset in the index positions for the column that the given
    * stream starts.
-   * @param encoding the encoding of the column
-   * @param type the type of the column
-   * @param stream the kind of the stream
+   * @param columnEncoding the encoding of the column
+   * @param columnType the type of the column
+   * @param streamType the kind of the stream
    * @param isCompressed is the file compressed
    * @param hasNulls does the column have a PRESENT stream?
    * @return the number of positions that will be used for that stream
    */
-  static int getIndexPosition(OrcProto.ColumnEncoding.Kind encoding,
-                              OrcProto.Type.Kind type,
-                              OrcProto.Stream.Kind stream,
+  public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
+                              OrcProto.Type.Kind columnType,
+                              OrcProto.Stream.Kind streamType,
                               boolean isCompressed,
                               boolean hasNulls) {
-    if (stream == OrcProto.Stream.Kind.PRESENT) {
+    if (streamType == OrcProto.Stream.Kind.PRESENT) {
       return 0;
     }
     int compressionValue = isCompressed ? 1 : 0;
     int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
-    switch (type) {
+    switch (columnType) {
       case BOOLEAN:
       case BYTE:
       case SHORT:
@@ -2877,33 +2879,33 @@ public class RecordReaderImpl implements
       case CHAR:
       case VARCHAR:
       case STRING:
-        if (encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
-            encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+        if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+            columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
           return base;
         } else {
-          if (stream == OrcProto.Stream.Kind.DATA) {
+          if (streamType == OrcProto.Stream.Kind.DATA) {
             return base;
           } else {
             return base + BYTE_STREAM_POSITIONS + compressionValue;
           }
         }
       case BINARY:
-        if (stream == OrcProto.Stream.Kind.DATA) {
+        if (streamType == OrcProto.Stream.Kind.DATA) {
           return base;
         }
         return base + BYTE_STREAM_POSITIONS + compressionValue;
       case DECIMAL:
-        if (stream == OrcProto.Stream.Kind.DATA) {
+        if (streamType == OrcProto.Stream.Kind.DATA) {
           return base;
         }
         return base + BYTE_STREAM_POSITIONS + compressionValue;
       case TIMESTAMP:
-        if (stream == OrcProto.Stream.Kind.DATA) {
+        if (streamType == OrcProto.Stream.Kind.DATA) {
           return base;
         }
         return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
       default:
-        throw new IllegalArgumentException("Unknown type " + type);
+        throw new IllegalArgumentException("Unknown type " + columnType);
     }
   }
 
@@ -3504,6 +3506,22 @@ public class RecordReaderImpl implements
     public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
       streams[streamCount++] = new StreamContext(stream, offset, indexIx);
     }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(" column_index: ").append(colIx);
+      sb.append(" encoding: ").append(encoding);
+      sb.append(" stream_count: ").append(streamCount);
+      int i = 0;
+      for (StreamContext sc : streams) {
+        if (sc != null) {
+          sb.append(" stream_").append(i).append(":").append(sc.toString());
+        }
+        i++;
+      }
+      return sb.toString();
+    }
   }
 
   private static final class StreamContext {
@@ -3521,6 +3539,16 @@ public class RecordReaderImpl implements
     ListIterator<DiskRange> bufferIter;
     /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
     StreamBuffer stripeLevelStream;
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(" kind: ").append(kind);
+      sb.append(" offset: ").append(offset);
+      sb.append(" length: ").append(length);
+      sb.append(" index_offset: ").append(streamIndexOffset);
+      return sb.toString();
+    }
   }
 
   /*
@@ -3584,8 +3612,7 @@ public class RecordReaderImpl implements
         ctx = colCtxs[colRgIx] = new ColumnReadContext(
             colIx, encodings.get(colIx), indexes[colIx]);
         if (DebugUtils.isTraceOrcEnabled()) {
-          LOG.info("Creating context " + colRgIx + " for column " + colIx + " with encoding "
-              + encodings.get(colIx) + " and rowIndex " + indexes[colIx]);
+          LOG.info("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
         }
       } else {
         ctx = colCtxs[colRgIx];
@@ -3624,10 +3651,6 @@ public class RecordReaderImpl implements
     }
     // Force direct buffers, since we will be decompressing to cache.
     readDiskRanges(file, zcr, stripeOffset, rangesToRead, true);
-    if (DebugUtils.isTraceOrcEnabled()) {
-      LOG.info("Disk ranges after disk read (" + (zcr == null ? "no " : "") + " zero-copy, base"
-          + " offset " + stripeOffset + "): " + stringifyDiskRanges(rangesToRead));
-    }
 
     // 2.1. Separate buffers (relative to stream offset) for each stream from the data we have.
     // TODO: given how we read, we could potentially get rid of this step?
@@ -3651,8 +3674,12 @@ public class RecordReaderImpl implements
       // Create the batch we will use to return data for this RG.
       EncodedColumnBatch<OrcBatchKey> ecb = new EncodedColumnBatch<OrcBatchKey>(
           new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0);
+      boolean isRGSelected = true;
       for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
-        if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) continue; // RG x col filtered.
+        if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
+          isRGSelected = false;
+          continue;
+        } // RG x col filtered.
         ColumnReadContext ctx = colCtxs[colIxMod];
         RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
             nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
@@ -3688,7 +3715,9 @@ public class RecordReaderImpl implements
           ecb.setStreamData(colIxMod, streamIx, cb);
         }
       }
-      consumer.consumeData(ecb);
+      if (isRGSelected) {
+        consumer.consumeData(ecb);
+      }
     }
   }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Tue Feb 10 20:30:24 2015
@@ -21,12 +21,11 @@ import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
 
 /**
  * A reader that reads a sequence of integers.
  * */
-class RunLengthIntegerReader implements IntegerReader {
+public class RunLengthIntegerReader implements IntegerReader {
   private final InStream input;
   private final boolean signed;
   private final long[] literals =
@@ -37,7 +36,7 @@ class RunLengthIntegerReader implements
   private boolean repeat = false;
   private SerializationUtils utils;
 
-  RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
+  public RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
     this.input = input;
     this.signed = signed;
     this.utils = new SerializationUtils();

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java Tue Feb 10 20:30:24 2015
@@ -23,12 +23,8 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
 import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
 
 /**
@@ -36,7 +32,7 @@ import org.apache.hadoop.hive.ql.io.orc.
  * {@link RunLengthIntegerWriterV2} for description of various lightweight
  * compression techniques.
  */
-class RunLengthIntegerReaderV2 implements IntegerReader {
+public class RunLengthIntegerReaderV2 implements IntegerReader {
   public static final Log LOG = LogFactory.getLog(RunLengthIntegerReaderV2.class);
 
   private final InStream input;
@@ -49,11 +45,11 @@ class RunLengthIntegerReaderV2 implement
   private final SerializationUtils utils;
   private EncodingType currentEncoding;
 
-  RunLengthIntegerReaderV2(InStream input, boolean signed,
-      Configuration conf) throws IOException {
+  public RunLengthIntegerReaderV2(InStream input, boolean signed,
+      boolean skipCorrupt) throws IOException {
     this.input = input;
     this.signed = signed;
-    this.skipCorrupt = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
+    this.skipCorrupt = skipCorrupt;
     this.utils = new SerializationUtils();
   }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Tue Feb 10 20:30:24 2015
@@ -24,7 +24,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.math.BigInteger;
 
-final class SerializationUtils {
+public final class SerializationUtils {
 
   private final static int BUFFER_SIZE = 64;
   private final byte[] readBuffer;
@@ -72,7 +72,7 @@ final class SerializationUtils {
     return (result >>> 1) ^ -(result & 1);
   }
 
-  float readFloat(InputStream in) throws IOException {
+  public float readFloat(InputStream in) throws IOException {
     int ser = in.read() | (in.read() << 8) | (in.read() << 16) |
       (in.read() << 24);
     return Float.intBitsToFloat(ser);
@@ -86,7 +86,7 @@ final class SerializationUtils {
     output.write((ser >> 24) & 0xff);
   }
 
-  double readDouble(InputStream in) throws IOException {
+  public double readDouble(InputStream in) throws IOException {
     return Double.longBitsToDouble(readLongLE(in));
   }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1658797&r1=1658796&r2=1658797&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Feb 10 20:30:24 2015
@@ -20,10 +20,16 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -70,16 +76,10 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -94,7 +94,7 @@ import java.util.TreeMap;
  * particular, because the MemoryManager is shared between writers, this class
  * assumes that checkMemory may be called from a separate thread.
  */
-class WriterImpl implements Writer, MemoryManager.Callback {
+public class WriterImpl implements Writer, MemoryManager.Callback {
 
   private static final Log LOG = LogFactory.getLog(WriterImpl.class);
 
@@ -274,7 +274,7 @@ class WriterImpl implements Writer, Memo
     return totalMemoryPool;
   }
 
-  static CompressionCodec createCodec(CompressionKind kind) {
+  public static CompressionCodec createCodec(CompressionKind kind) {
     switch (kind) {
       case NONE:
         return null;



Mime
View raw message