hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1656595 [1/2] - in /hive/branches/llap: llap-client/src/java/org/apache/hadoop/hive/llap/io/api/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/java/or...
Date Mon, 02 Feb 2015 22:28:16 GMT
Author: sershe
Date: Mon Feb  2 22:28:15 2015
New Revision: 1656595

URL: http://svn.apache.org/r1656595
Log:
HIVE-9418p2 : Part of the encoded data production pipeline (incomplete, only to allow parallel work)

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
Removed:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadata.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Mon Feb  2 22:28:15 2015
@@ -18,24 +18,42 @@
 
 package org.apache.hadoop.hive.llap.io.api;
 
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 
 public class EncodedColumn<BatchKey> {
   // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
   //       generality, and ability to not copy data from underlying low-level cached buffers.
-  public static class ColumnBuffer {
+  public static class StreamBuffer {
+    public StreamBuffer(int firstOffset, int lastLength) {
+      this.firstOffset = firstOffset;
+      this.lastLength = lastLength;
+    }
     // TODO: given how ORC will allocate, it might make sense to share array between all
     //       returned encodedColumn-s, and store index and length in the array.
-    public LlapMemoryBuffer[] cacheBuffers;
+    public List<LlapMemoryBuffer> cacheBuffers;
     public int firstOffset, lastLength;
+    // StreamBuffer can be reused for many RGs (e.g. dictionary case). To avoid locking every
+    // LlapMemoryBuffer 500 times, have a separate refcount on StreamBuffer itself.
+    public AtomicInteger refCount = new AtomicInteger(0);
+    public void incRef() {
+      refCount.incrementAndGet();
+    }
+    public int decRef() {
+      return refCount.decrementAndGet();
+    }
   }
-  public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) {
+  public EncodedColumn(BatchKey batchKey, int columnIndex, int streamCount) {
     this.batchKey = batchKey;
     this.columnIndex = columnIndex;
-    this.columnData = columnData;
+    this.streamData = new StreamBuffer[streamCount];
+    this.streamKind = new int[streamCount];
   }
 
   public BatchKey batchKey;
   public int columnIndex;
-  public ColumnBuffer columnData;
+  public StreamBuffer[] streamData;
+  public int[] streamKind; // TODO: can decoder infer this from metadata?
 }
\ No newline at end of file

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Mon Feb  2 22:28:15 2015
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.llap.io.api.cache;
 
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.hadoop.hive.common.DiskRange;
 
@@ -48,6 +49,8 @@ public interface LowLevelCache {
    */
   void allocateMultiple(LlapMemoryBuffer[] dest, int size);
 
-  void releaseBuffers(LlapMemoryBuffer[] cacheBuffers);
+  void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers);
+
+  LlapMemoryBuffer createUnallocated();
 
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java Mon Feb  2 22:28:15 2015
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
 
 /** Dummy interface for now, might be different. */
 public interface Cache<CacheKey> {
-  public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value);
-  public ColumnBuffer get(CacheKey key);
+  public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value);
+  public StreamBuffer get(CacheKey key);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Mon Feb  2 22:28:15 2015
@@ -121,8 +121,7 @@ public class LowLevelCacheImpl implement
     if (currentNotCached.offset == currentCached.offset) {
       if (currentNotCached.end <= currentCached.end) {  // we assume it's always "==" now
         // Replace the entire current DiskRange with new cached range.
-        drIter.remove();
-        drIter.add(currentCached);
+        drIter.set(currentCached);
         currentNotCached = null;
       } else {
         // Insert the new cache range before the disk range.
@@ -251,9 +250,9 @@ public class LowLevelCacheImpl implement
   }
 
   @Override
-  public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) {
-    for (int i = 0; i < cacheBuffers.length; ++i) {
-      releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]);
+  public void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers) {
+    for (LlapMemoryBuffer b : cacheBuffers) {
+      releaseBufferInternal((LlapCacheableBuffer)b);
     }
   }
 
@@ -399,4 +398,9 @@ public class LowLevelCacheImpl implement
       }
     }
   }
+
+  @Override
+  public LlapMemoryBuffer createUnallocated() {
+    return new LlapCacheableBuffer();
+  }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Mon Feb  2 22:28:15 2015
@@ -165,7 +165,6 @@ public class LowLevelLrfuCachePolicy ext
           continue;
         }
         // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us.
-        // TODO#: double check this is valid!
         nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
         evicted += nextCandidate.byteBuffer.remaining();
       }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java Mon Feb  2 22:28:15 2015
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
 
 public class NoopCache<CacheKey> implements Cache<CacheKey> {
   @Override
-  public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value) {
+  public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value) {
     return value;
   }
 
   @Override
-  public ColumnBuffer get(CacheKey key) {
+  public StreamBuffer get(CacheKey key) {
     return null;  // TODO: ensure real implementation increases refcount
   }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java Mon Feb  2 22:28:15 2015
@@ -68,8 +68,9 @@ public class LlapInputFormat
       if (includedCols.isEmpty()) {
         includedCols = null; // Also means read all columns? WTF?
       }
-      VectorReader reader = llapIo.getReader(
-          fileSplit, includedCols, SearchArgumentFactory.createFromConf(job));
+      VectorReader reader = llapIo.getReader(fileSplit, includedCols,
+          SearchArgumentFactory.createFromConf(job),
+          ColumnProjectionUtils.getReadColumnNames(job));
       return new LlapRecordReader(reader, job, fileSplit);
     } catch (Exception ex) {
       throw new IOException(ex);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Mon Feb  2 22:28:15 2015
@@ -70,8 +70,9 @@ public class LlapIoImpl implements LlapI
     this.cvp = new OrcColumnVectorProducer(threadPool, edp, conf);
   }
 
-  VectorReader getReader(InputSplit split, List<Integer> columnIds, SearchArgument sarg) {
-    return new VectorReader(split, columnIds, sarg, cvp);
+  VectorReader getReader(InputSplit split,
+      List<Integer> columnIds, SearchArgument sarg, String[] columnNames) {
+    return new VectorReader(split, columnIds, sarg, columnNames, cvp);
   }
 
   @Override

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java Mon Feb  2 22:28:15 2015
@@ -36,6 +36,7 @@ public class VectorReader implements Con
   private final InputSplit split;
   private final List<Integer> columnIds;
   private final SearchArgument sarg;
+  private final String[] columnNames;
   private final ColumnVectorProducer<?> cvp;
 
   private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
@@ -45,10 +46,11 @@ public class VectorReader implements Con
   private ConsumerFeedback<ColumnVectorBatch> feedback;
 
   public VectorReader(InputSplit split, List<Integer> columnIds, SearchArgument sarg,
-      ColumnVectorProducer<?> cvp) {
+      String[] columnNames, ColumnVectorProducer<?> cvp) {
     this.split = split;
     this.columnIds = columnIds;
     this.sarg = sarg;
+    this.columnNames = columnNames;
     this.cvp = cvp;
   }
 
@@ -56,7 +58,7 @@ public class VectorReader implements Con
     // TODO: if some collection is needed, return previous ColumnVectorBatch here
     ColumnVectorBatch current = null;
     if (feedback == null) {
-      feedback = cvp.read(split, columnIds, sarg, this);
+      feedback = cvp.read(split, columnIds, sarg, columnNames, this);
     }
     if (isClosed) {
       throw new AssertionError("next called after close");

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Mon Feb  2 22:28:15 2015
@@ -27,7 +27,7 @@ import java.util.concurrent.ExecutorServ
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
@@ -42,12 +42,13 @@ public abstract class ColumnVectorProduc
     this.executor = executor;
   }
 
+  // TODO#: Given how ORC reads data, it should return this and not separate columns.
   static class EncodedColumnBatch {
     public EncodedColumnBatch(int colCount) {
-      columnDatas = new ColumnBuffer[colCount];
+      columnDatas = new StreamBuffer[colCount][];
       columnsRemaining = colCount;
     }
-    public ColumnBuffer[] columnDatas;
+    public StreamBuffer[][] columnDatas;
     public int columnsRemaining;
   }
 
@@ -57,7 +58,7 @@ public abstract class ColumnVectorProduc
     // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
     private final HashMap<BatchKey, EncodedColumnBatch> pendingData =
         new HashMap<BatchKey, EncodedColumnBatch>();
-    private ConsumerFeedback<ColumnBuffer> upstreamFeedback;
+    private ConsumerFeedback<StreamBuffer> upstreamFeedback;
     private final Consumer<ColumnVectorBatch> downstreamConsumer;
     private final int colCount;
 
@@ -66,7 +67,7 @@ public abstract class ColumnVectorProduc
       this.colCount = colCount;
     }
 
-    public void init(ConsumerFeedback<ColumnBuffer> upstreamFeedback) {
+    public void init(ConsumerFeedback<StreamBuffer> upstreamFeedback) {
       this.upstreamFeedback = upstreamFeedback;
     }
 
@@ -85,7 +86,7 @@ public abstract class ColumnVectorProduc
         }
       }
       if (localIsStopped) {
-        upstreamFeedback.returnData(data.columnData);
+        returnProcessed(data.streamData);
         return;
       }
 
@@ -94,7 +95,7 @@ public abstract class ColumnVectorProduc
         // Check if we are stopped and the batch was already cleaned.
         localIsStopped = (targetBatch.columnDatas == null);
         if (!localIsStopped) {
-          targetBatch.columnDatas[data.columnIndex] = data.columnData;
+          targetBatch.columnDatas[data.columnIndex] = data.streamData;
           colsRemaining = --targetBatch.columnsRemaining;
           if (0 == colsRemaining) {
             synchronized (pendingData) {
@@ -107,14 +108,22 @@ public abstract class ColumnVectorProduc
         }
       }
       if (localIsStopped) {
-        upstreamFeedback.returnData(data.columnData);
+        returnProcessed(data.streamData);
         return;
       }
       if (0 == colsRemaining) {
         ColumnVectorProducer.this.decodeBatch(data.batchKey, targetBatch, downstreamConsumer);
         // Batch has been decoded; unlock the buffers in cache
-        for (ColumnBuffer cb : targetBatch.columnDatas) {
-          upstreamFeedback.returnData(cb);
+        for (StreamBuffer[] columnData : targetBatch.columnDatas) {
+          returnProcessed(columnData);
+        }
+      }
+    }
+
+    private void returnProcessed(StreamBuffer[] data) {
+      for (StreamBuffer sb : data) {
+        if (sb.decRef() == 0) {
+          upstreamFeedback.returnData(sb);
         }
       }
     }
@@ -142,7 +151,7 @@ public abstract class ColumnVectorProduc
     }
 
     private void dicardPendingData(boolean isStopped) {
-      List<ColumnBuffer> dataToDiscard = new ArrayList<ColumnBuffer>(pendingData.size() * colCount);
+      List<StreamBuffer> dataToDiscard = new ArrayList<StreamBuffer>(pendingData.size() * colCount);
       List<EncodedColumnBatch> batches = new ArrayList<EncodedColumnBatch>(pendingData.size());
       synchronized (pendingData) {
         if (isStopped) {
@@ -153,14 +162,18 @@ public abstract class ColumnVectorProduc
       }
       for (EncodedColumnBatch batch : batches) {
         synchronized (batch) {
-          for (ColumnBuffer b : batch.columnDatas) {
-            dataToDiscard.add(b);
+          for (StreamBuffer[] bb : batch.columnDatas) {
+            for (StreamBuffer b : bb) {
+              dataToDiscard.add(b);
+            }
           }
           batch.columnDatas = null;
         }
       }
-      for (ColumnBuffer data : dataToDiscard) {
-        upstreamFeedback.returnData(data);
+      for (StreamBuffer data : dataToDiscard) {
+        if (data.decRef() == 0) {
+          upstreamFeedback.returnData(data);
+        }
       }
     }
 
@@ -191,13 +204,21 @@ public abstract class ColumnVectorProduc
    * @throws IOException 
    */
   public ConsumerFeedback<ColumnVectorBatch> read(InputSplit split, List<Integer> columnIds,
-      SearchArgument sarg, Consumer<ColumnVectorBatch> consumer) throws IOException {
+      SearchArgument sarg, String[] columnNames, Consumer<ColumnVectorBatch> consumer)
+          throws IOException {
     // Create the consumer of encoded data; it will coordinate decoding to CVBs.
     EncodedDataConsumer edc = new EncodedDataConsumer(consumer, columnIds.size());
     // Get the source of encoded data.
     EncodedDataProducer<BatchKey> edp = getEncodedDataProducer();
     // Then, get the specific reader of encoded data out of the producer.
-    EncodedDataReader<BatchKey> reader = edp.getReader(split, columnIds, sarg, edc);
+    /*
+[ERROR] reason: actual argument 
+org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer<BatchKey>.EncodedDataConsumer
+cannot be converted to 
+org.apache.hadoop.hive.llap.Consumer<
+  org.apache.hadoop.hive.llap.io.api.EncodedColumn<
+    org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey>> by method invocation conversion     * */
+    EncodedDataReader<BatchKey> reader = edp.getReader(split, columnIds, sarg, columnNames, edc);
     // Set the encoded data reader as upstream feedback for encoded data consumer, and start.
     edc.init(reader);
     executor.submit(reader);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java Mon Feb  2 22:28:15 2015
@@ -22,10 +22,11 @@ import java.util.List;
 
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.mapred.InputSplit;
 
 public interface EncodedDataProducer<BatchKey> {
-  public EncodedDataReader<BatchKey> getReader(InputSplit split, List<Integer> columnIds,
-      SearchArgument sarg, Consumer<EncodedColumn<BatchKey>> consumer);
+  EncodedDataReader<BatchKey> getReader(InputSplit split, List<Integer> columnIds,
+      SearchArgument sarg, String[] columnNames, Consumer<EncodedColumn<BatchKey>> consumer);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java Mon Feb  2 22:28:15 2015
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.llap.io.e
 import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
 
 /**
  * Interface for encoded data readers to implement.
@@ -29,5 +29,5 @@ import org.apache.hadoop.hive.llap.io.ap
  * The final threading design will probably change.
  */
 public interface EncodedDataReader<BatchKey>
-  extends ConsumerFeedback<ColumnBuffer>, Callable<Void> {
+  extends ConsumerFeedback<StreamBuffer>, Callable<Void> {
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Mon Feb  2 22:28:15 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.io.e
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -30,15 +31,18 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
+import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
@@ -49,7 +53,7 @@ import org.apache.hadoop.mapred.InputSpl
 public class OrcEncodedDataProducer implements EncodedDataProducer<OrcBatchKey> {
   private FileSystem cachedFs = null;
   private Configuration conf;
-  private OrcMetadataCache metadataCache;
+  private final OrcMetadataCache metadataCache;
   // TODO: it makes zero sense to have both at the same time and duplicate data. Add "cache mode".
   private final Cache<OrcCacheKey> cache;
   private final LowLevelCache lowLevelCache;
@@ -59,24 +63,23 @@ public class OrcEncodedDataProducer impl
     private final FileSplit split;
     private List<Integer> columnIds;
     private final SearchArgument sarg;
+    private final String[] columnNames;
     private final Consumer<EncodedColumn<OrcBatchKey>> consumer;
 
 
     // Read state.
-    private int stripeIxFrom, stripeIxTo;
+    private int stripeIxFrom;
     private Reader orcReader;
     private final String internedFilePath;
     /**
-     * readState[stripeIx'][colIx'] - bitmask (as long array) of rg-s that are done.
-     * Bitmasks are all well-known size so we don't bother with BitSets and such.
-     * Each long has natural bit indexes used, so rightmost bits are filled first.
+     * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be
+     * read. Contains only stripes that are read, and only columns included. null => read all RGs.
      */
-    private long[][][] readState;
-    private int[] rgsPerStripe = null;
+    private boolean[][][] readState;
     private boolean isStopped = false, isPaused = false;
 
     public OrcEncodedDataReader(InputSplit split, List<Integer> columnIds,
-        SearchArgument sarg, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
+        SearchArgument sarg, String[] columnNames, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
       this.split = (FileSplit)split;
       this.internedFilePath = this.split.getPath().toString().intern();
       this.columnIds = columnIds;
@@ -84,6 +87,7 @@ public class OrcEncodedDataProducer impl
         Collections.sort(this.columnIds);
       }
       this.sarg = sarg;
+      this.columnNames = columnNames;
       this.consumer = consumer;
     }
 
@@ -109,57 +113,128 @@ public class OrcEncodedDataProducer impl
     public Void call() throws IOException {
       LlapIoImpl.LOG.info("Processing split for " + internedFilePath);
       if (isStopped) return null;
-      List<StripeInformation> stripes = metadataCache.getStripes(internedFilePath);
-      List<Type> types = metadataCache.getTypes(internedFilePath);
       orcReader = null;
-      if (stripes == null || types == null) {
+      // Get FILE metadata from cache, or create the reader and read it.
+      OrcFileMetadata metadata = metadataCache.getFileMetadata(internedFilePath);
+      if (metadata == null) {
         orcReader = createOrcReader(split);
-        stripes = metadataCache.getStripes(internedFilePath);
-        types = metadataCache.getTypes(internedFilePath);
+        metadata = new OrcFileMetadata(orcReader);
+        metadataCache.putFileMetadata(internedFilePath, metadata);
       }
 
       if (columnIds == null) {
-        columnIds = new ArrayList<Integer>(types.size());
-        for (int i = 1; i < types.size(); ++i) {
+        columnIds = new ArrayList<Integer>(metadata.getTypes().size());
+        for (int i = 1; i < metadata.getTypes().size(); ++i) {
           columnIds.add(i);
         }
       }
-      determineWhatToRead(stripes);
+      // Then, determine which stripes to read based on the split.
+      determineStripesToRead(metadata.getStripes());
+      if (readState.length == 0) {
+        consumer.setDone();
+        return null; // No data to read.
+      }
+      int stride = metadata.getRowIndexStride();
+      ArrayList<OrcStripeMetadata> stripesMetadata = null;
+      boolean[] globalIncludes = OrcInputFormat.genIncludedColumns(
+          metadata.getTypes(), columnIds, true);
+      RecordReader[] stripeReaders = new RecordReader[readState.length];
+      if (sarg != null && stride != 0) {
+        // If SARG is present, get relevant stripe metadata from cache or readers.
+        stripesMetadata = readStripesMetadata(metadata, globalIncludes, stripeReaders);
+      }
+
+      // Now, apply SARG if any; w/o sarg, this will just initialize readState.
+      determineRgsToRead(metadata.getStripes(), metadata.getTypes(),
+          globalIncludes, stride, stripesMetadata);
       if (isStopped) return null;
-      List<Integer>[] stripeColsToRead = produceDataFromCache();
-      // readState now contains some 1s for column x rgs that were fetched from cache.
-      // TODO: I/O threadpool would be here (or below); for now, linear
-      for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+      // Get data from high-level cache; if some cols are fully in cache, this will also
+      // give us the modified list of columns to read for every stripe (null means all).
+      List<Integer>[] stripeColsToRead = produceDataFromCache(metadata.getStripes(), stride);
+      // readState has been modified for column x rgs that were fetched from cache.
+
+      // Then, create the readers for each stripe and prepare to read.
+      for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) {
+        List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
+        RecordReader stripeReader = stripeReaders[stripeIxMod];
+        if (colsToRead == null) {
+          colsToRead = columnIds;
+        } else if (colsToRead.isEmpty()) {
+          if (stripeReader != null) {
+            stripeReader.close();
+            stripeReaders[stripeIxMod] = null;
+          }
+          continue; // All the data for this stripe was in cache.
+        } else if (stripeReader != null) {
+          // We have created the reader to read stripe metadata with all includes.
+          // We will now recreate the reader with narrower included columns (due to cache).
+          stripeReader.close();
+          stripeReader = null;
+        }
+
+        if (stripeReader != null) continue; // We already have a reader.
+        // Create RecordReader that will be used to read only this stripe.
+        StripeInformation si = metadata.getStripes().get(stripeIxFrom + stripeIxMod);
+        boolean[] stripeIncludes = OrcInputFormat.genIncludedColumns(
+            metadata.getTypes(), colsToRead, true);
+        if (orcReader == null) {
+          orcReader = createOrcReader(split);
+        }
+        stripeReader = orcReader.rows(si.getOffset(), si.getLength(), stripeIncludes);
+        stripeReader.prepareEncodedColumnRead();
+        stripeReaders[stripeIxMod] = stripeReader;
+      }
+
+      // We now have one reader per stripe that needs to be read. Read.
+      // TODO: I/O threadpool would be here - one thread per stripe; for now, linear.
+      OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, -1, 0);
+      for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) {
+        RecordReader stripeReader = stripeReaders[stripeIxMod];
+        if (stripeReader == null) continue; // No need to read this stripe, see above.
         List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
-        long[][] colRgs = readState[stripeIxMod];
         if (colsToRead == null) {
           colsToRead = columnIds;
         }
-        if (colsToRead.isEmpty()) continue; // All the data for this stripe was in cache.
-        if (colsToRead.size() != colRgs.length) {
+        boolean[][] colRgs = readState[stripeIxMod];
+        if (colsToRead != null && colsToRead.size() != colRgs.length) {
           // We are reading subset of the original columns, remove unnecessary bitmasks.
-          long[][] colRgs2 = new long[colsToRead.size()][];
+          boolean[][] colRgs2 = new boolean[colsToRead.size()][];
           for (int i = 0, i2 = -1; i < colRgs.length; ++i) {
             if (colRgs[i] == null) continue;
             colRgs2[++i2] = colRgs[i];
           }
           colRgs = colRgs2;
         }
-        int stripeIx = stripeIxFrom + stripeIxMod;
-        StripeInformation si = stripes.get(stripeIx);
-        int rgCount = rgsPerStripe[stripeIxMod];
-        boolean[] includes = OrcInputFormat.genIncludedColumns(types, colsToRead, true);
-        if (orcReader == null) {
-          orcReader = createOrcReader(split);
+
+        // Get stripe metadata. We might have read it earlier for RG filtering.
+        OrcStripeMetadata stripeMetadata;
+        int stripeIx = stripeIxMod + stripeIxFrom;
+        if (stripesMetadata != null) {
+          stripeMetadata = stripesMetadata.get(stripeIxMod);
+        } else {
+          stripeKey.stripeIx = stripeIx;
+          stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
+          if (stripeMetadata == null) {
+            stripeMetadata = new OrcStripeMetadata(stripeReader, stripeKey.stripeIx);
+            metadataCache.putStripeMetadata(stripeKey, stripeMetadata);
+            stripeKey = new OrcBatchKey(internedFilePath, -1, 0);
+          }
         }
-        RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes);
+        stripeReader.setRowIndex(stripeMetadata.getRowIndexes());
+
         // In case if we have high-level cache, we will intercept the data and add it there;
         // otherwise just pass the data directly to the consumer.
         Consumer<EncodedColumn<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this;
-        stripeReader.readEncodedColumns(colRgs, rgCount, consumer, lowLevelCache);
+        // This is where I/O happens. This is a sync call that will feed data to the consumer.
+        try {
+          stripeReader.readEncodedColumns(stripeIx, colRgs, lowLevelCache, consumer);
+        } catch (Throwable t) {
+          consumer.setError(t);
+        }
         stripeReader.close();
       }
 
+      // Done with all the things.
       consumer.setDone();
       if (DebugUtils.isTraceMttEnabled()) {
         LlapIoImpl.LOG.info("done processing " + split);
@@ -167,16 +242,72 @@ public class OrcEncodedDataProducer impl
       return null;
     }
 
+    private ArrayList<OrcStripeMetadata> readStripesMetadata(OrcFileMetadata metadata,
+        boolean[] globalInc, RecordReader[] stripeReaders) throws IOException {
+      ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(stripeReaders.length);
+      OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, 0, 0);
+      for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) {
+        stripeKey.stripeIx = stripeIxMod + stripeIxFrom;
+        OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey);
+        if (value == null) {
+          // Metadata not present in cache - get it from the reader and put in cache.
+          if (orcReader == null) {
+            orcReader = createOrcReader(split);
+          }
+          StripeInformation si = metadata.getStripes().get(stripeKey.stripeIx);
+          stripeReaders[stripeIxMod] = orcReader.rows(si.getOffset(), si.getLength(), globalInc);
+          stripeReaders[stripeIxMod].prepareEncodedColumnRead();
+          value = new OrcStripeMetadata(stripeReaders[stripeIxMod], stripeKey.stripeIx);
+          metadataCache.putStripeMetadata(stripeKey, value);
+          // Create new key object to reuse for gets; we've used the old one to put in cache.
+          stripeKey = new OrcBatchKey(internedFilePath, 0, 0);
+        }
+        result.add(value);
+      }
+      return result;
+    }
+
     @Override
-    public void returnData(ColumnBuffer data) {
+    public void returnData(StreamBuffer data) {
       lowLevelCache.releaseBuffers(data.cacheBuffers);
     }
 
-    private void determineWhatToRead(List<StripeInformation> stripes) {
-      // The unit of caching for ORC is (stripe x column) (see OrcBatchKey).
+    private void determineRgsToRead(List<StripeInformation> stripes, List<Type> types,
+        boolean[] globalIncludes, int rowIndexStride, ArrayList<OrcStripeMetadata> metadata)
+            throws IOException {
+      SargApplier sargApp = null;
+      if (sarg != null) {
+        String[] colNamesForSarg = OrcInputFormat.getSargColumnNames(
+            columnNames, types, globalIncludes, OrcInputFormat.isOriginal(orcReader));
+        sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride);
+      }
+      // readState should have been initialized by this time with an empty array.
+      for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
+        int originalStripeIx = stripeIxMod + stripeIxFrom;
+        StripeInformation stripe = stripes.get(originalStripeIx);
+        int rgCount = getRgCount(stripe, rowIndexStride);
+        boolean[] rgsToRead = null;
+        if (sargApp != null) {
+          rgsToRead = sargApp.pickRowGroups(stripe, metadata.get(stripeIxMod).getRowIndexes());
+        }
+        assert rgsToRead == null || rgsToRead.length == rgCount;
+        readState[stripeIxMod] = new boolean[columnIds.size()][];
+        for (int j = 0; j < columnIds.size(); ++j) {
+          readState[stripeIxMod][j] = (rgsToRead == null) ? null :
+            Arrays.copyOf(rgsToRead, rgsToRead.length);
+        }
+      }
+    }
+
+    private int getRgCount(StripeInformation stripe, int rowIndexStride) {
+      return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
+    }
+
+    public void determineStripesToRead(List<StripeInformation> stripes) {
+      // The unit of caching for ORC is (rg x column) (see OrcBatchKey).
       long offset = split.getStart(), maxOffset = offset + split.getLength();
-      stripeIxFrom = stripeIxTo = -1;
-      int stripeIx = 0;
+      stripeIxFrom = -1;
+      int stripeIxTo = -1;
       if (LlapIoImpl.LOG.isDebugEnabled()) {
         String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes ";
         for (StripeInformation stripe : stripes) {
@@ -185,7 +316,7 @@ public class OrcEncodedDataProducer impl
         LlapIoImpl.LOG.debug(tmp);
       }
 
-      List<Integer> stripeRgCounts = new ArrayList<Integer>(stripes.size());
+      int stripeIx = 0;
       for (StripeInformation stripe : stripes) {
         long stripeStart = stripe.getOffset();
         if (offset > stripeStart) continue;
@@ -204,9 +335,6 @@ public class OrcEncodedDataProducer impl
           stripeIxTo = stripeIx;
           break;
         }
-        int rgCount = (int)Math.ceil(
-            (double)stripe.getNumberOfRows() / orcReader.getRowIndexStride());
-        stripeRgCounts.add(rgCount);
         ++stripeIx;
       }
       if (stripeIxTo == -1) {
@@ -215,54 +343,51 @@ public class OrcEncodedDataProducer impl
         }
         stripeIxTo = stripeIx;
       }
-      readState = new long[stripeRgCounts.size()][][];
-      for (int i = 0; i < stripeRgCounts.size(); ++i) {
-        int bitmaskSize = align64(stripeRgCounts.get(i)) >>> 6;
-        readState[i] = new long[columnIds.size()][];
-        for (int j = 0; j < columnIds.size(); ++j) {
-          readState[i][j] = new long[bitmaskSize];
-        }
-      }
-      // TODO: HERE, we need to apply sargs and mark RGs that are filtered as 1s
-      rgsPerStripe = new int[stripeRgCounts.size()];
-      for (int i = 0; i < rgsPerStripe.length; ++i) {
-         rgsPerStripe[i] = stripeRgCounts.get(i);
-      }
+      readState = new boolean[stripeIxTo - stripeIxFrom][][];
     }
 
     // TODO: split by stripe? we do everything by stripe, and it might be faster
-    private List<Integer>[] produceDataFromCache() {
+    private List<Integer>[] produceDataFromCache(
+        List<StripeInformation> stripes, int rowIndexStride) {
       if (cache == null) return null;
       OrcCacheKey key = new OrcCacheKey(internedFilePath, -1, -1, -1);
+      // For each stripe, keep a list of columns that are not fully in cache (null => all of them).
       @SuppressWarnings("unchecked") // No generics arrays - "J" in "Java" stands for "joke".
       List<Integer>[] stripeColsNotInCache = new List[readState.length];
       for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
         key.stripeIx = stripeIxFrom + stripeIxMod;
-        long[][] cols = readState[stripeIxMod];
-        int rgCount = rgsPerStripe[stripeIxMod];
+        boolean[][] cols = readState[stripeIxMod];
+        // TODO## at self-CR, see that colIx business here was not screwed up
         for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
+          boolean[] readMask = cols[colIxMod];
           key.colIx = columnIds.get(colIxMod);
-          long[] doneMask = cols[colIxMod];
+          // Assume first all RGs will be in cache; calculate or get the RG count.
           boolean areAllRgsInCache = true;
+          int rgCount = readMask != null ? readMask.length
+              : getRgCount(stripes.get(key.stripeIx), rowIndexStride);
           for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
-            int maskIndex = rgIx >>> 6, maskBit = 1 << (rgIx & 63);
-            if ((doneMask[maskIndex] & maskBit) != 0) continue; // RG eliminated by SARG
+            if (readMask != null && !readMask[rgIx]) continue; // RG eliminated by SARG
             key.rgIx = rgIx;
-            ColumnBuffer cached = cache.get(key);
+            StreamBuffer cached = cache.get(key);
             if (cached == null) {
               areAllRgsInCache = false;
               continue;
             }
+            // RG was in cache; send it over to the consumer.
             // TODO: pool of EncodedColumn-s objects. Someone will need to return them though.
-            EncodedColumn<OrcBatchKey> col = new EncodedColumn<OrcBatchKey>(
-                key.copyToPureBatchKey(), key.colIx, cached);
+            EncodedColumn<OrcBatchKey> col = null;
+            // TODO# new EncodedColumn<OrcBatchKey>(key.copyToPureBatchKey(), key.colIx, cached);
             consumer.consumeData(col);
-            doneMask[maskIndex] = doneMask[maskIndex] | maskBit;
+            if (readMask == null) {
+              // We were going to read all RGs, but now that some were in cache, allocate the mask.
+              cols[colIxMod] = readMask = new boolean[rgCount];
+              Arrays.fill(readMask, true);
+            }
+            readMask[rgIx] = false; // Got from cache, don't read from disk.
           }
-          boolean hasFetchList = stripeColsNotInCache[stripeIxMod] != null;
+          boolean hasExplicitColList = stripeColsNotInCache[stripeIxMod] != null;
           if (areAllRgsInCache) {
-            cols[colIxMod] = null; // No need for bitmask, all rgs are done.
-            if (!hasFetchList) {
+            if (!hasExplicitColList) {
               // All rgs for this stripe x column were fetched from cache. If this is the first
               // such column, create custom, smaller list of columns to fetch later for this
               // stripe (default is all the columns originally requested). Add all previous
@@ -272,7 +397,7 @@ public class OrcEncodedDataProducer impl
                 stripeColsNotInCache[stripeIxMod].addAll(columnIds.subList(0, colIxMod));
               }
             }
-          } else if (hasFetchList) {
+          } else if (hasExplicitColList) {
             // Only a subset of original columnIds need to be fetched for this stripe;
             // add the current one to this sublist.
             stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
@@ -292,11 +417,14 @@ public class OrcEncodedDataProducer impl
       // Store object in cache; create new key object - cannot be reused.
       assert cache != null;
       OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex);
-      ColumnBuffer cached = cache.cacheOrGet(key, data.columnData);
-      if (data.columnData != cached) {
+      // TODO#: change type of cache and restore this
+      /*
+      StreamBuffer cached = cache.cacheOrGet(key, data.columnData);
+      if (data.streamData != cached) {
         lowLevelCache.releaseBuffers(data.columnData.cacheBuffers);
         data.columnData = cached;
       }
+      */
       consumer.consumeData(data);
     }
 
@@ -312,16 +440,9 @@ public class OrcEncodedDataProducer impl
     if ("pfile".equals(path.toUri().getScheme())) {
       fs = path.getFileSystem(conf); // Cannot use cached FS due to hive tests' proxy FS.
     }
-    if (metadataCache == null) {
-      metadataCache = new OrcMetadataCache(cachedFs, path, conf);
-    }
     return OrcFile.createReader(path, OrcFile.readerOptions(conf).filesystem(fs));
   }
 
-  private static int align64(int number) {
-    return ((number + 63) & ~63);
-  }
-
   public OrcEncodedDataProducer(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
       Configuration conf) throws IOException {
     // We assume all splits will come from the same FS.
@@ -329,12 +450,12 @@ public class OrcEncodedDataProducer impl
     this.cache = cache;
     this.lowLevelCache = lowLevelCache;
     this.conf = conf;
-    this.metadataCache = null;
+    this.metadataCache = new OrcMetadataCache();
   }
 
   @Override
   public EncodedDataReader<OrcBatchKey> getReader(InputSplit split, List<Integer> columnIds,
-      SearchArgument sarg, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
-    return new OrcEncodedDataReader(split, columnIds, sarg, consumer);
+      SearchArgument sarg, String[] columnNames, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
+    return new OrcEncodedDataReader(split, columnIds, sarg, columnNames, consumer);
   }
 }

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java?rev=1656595&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java Mon Feb  2 22:28:15 2015
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+
+public class OrcFileMetadata {
+  private CompressionKind compressionKind;
+  private int compressionBufferSize;
+  private List<OrcProto.Type> types;
+  private List<StripeInformation> stripes;
+  private int rowIndexStride;
+
+  public OrcFileMetadata(Reader reader) {
+    setCompressionKind(reader.getCompression());
+    setCompressionBufferSize(reader.getCompressionSize());
+    setStripes(reader.getStripes());
+    setTypes(reader.getTypes());
+    setRowIndexStride(reader.getRowIndexStride());
+  }
+
+  public List<StripeInformation> getStripes() {
+    return stripes;
+  }
+
+  public void setStripes(List<StripeInformation> stripes) {
+    this.stripes = stripes;
+  }
+
+  public CompressionKind getCompressionKind() {
+    return compressionKind;
+  }
+
+  public void setCompressionKind(CompressionKind compressionKind) {
+    this.compressionKind = compressionKind;
+  }
+
+  public int getCompressionBufferSize() {
+    return compressionBufferSize;
+  }
+
+  public void setCompressionBufferSize(int compressionBufferSize) {
+    this.compressionBufferSize = compressionBufferSize;
+  }
+
+  public List<OrcProto.Type> getTypes() {
+    return types;
+  }
+
+  public void setTypes(List<OrcProto.Type> types) {
+    this.types = types;
+  }
+
+  public int getRowIndexStride() {
+    return rowIndexStride;
+  }
+
+  public void setRowIndexStride(int rowIndexStride) {
+    this.rowIndexStride = rowIndexStride;
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java Mon Feb  2 22:28:15 2015
@@ -19,79 +19,48 @@
 package org.apache.hadoop.hive.llap.io.metadata;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto;
-import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
 /**
  * ORC-specific metadata cache.
+ * TODO: should be merged with main cache somehow if we find this takes too much memory
  */
 public class OrcMetadataCache {
   private static final int DEFAULT_CACHE_ACCESS_CONCURRENCY = 10;
-  private static final int DEFAULT_MAX_CACHE_ENTRIES = 100;
-  private static Cache<String, OrcMetadata> METADATA;
+  private static final int DEFAULT_MAX_FILE_ENTRIES = 1000;
+  private static final int DEFAULT_MAX_STRIPE_ENTRIES = 10000;
+  private static Cache<String, OrcFileMetadata> METADATA;
+  private static Cache<OrcBatchKey, OrcStripeMetadata> STRIPE_METADATA;
 
   static {
     METADATA = CacheBuilder.newBuilder()
         .concurrencyLevel(DEFAULT_CACHE_ACCESS_CONCURRENCY)
-        .maximumSize(DEFAULT_MAX_CACHE_ENTRIES)
+        .maximumSize(DEFAULT_MAX_FILE_ENTRIES)
+        .build();
+    STRIPE_METADATA = CacheBuilder.newBuilder()
+        .concurrencyLevel(DEFAULT_CACHE_ACCESS_CONCURRENCY)
+        .maximumSize(DEFAULT_MAX_STRIPE_ENTRIES)
         .build();
-  }
-
-  private Path path;
-  private OrcMetadataLoader loader;
-
-  public OrcMetadataCache(FileSystem fs, Path path, Configuration conf) {
-    this.path = path;
-    this.loader = new OrcMetadataLoader(fs, path, conf);
-  }
-
-  public CompressionKind getCompression(String pathString) throws IOException {
-    try {
-      return METADATA.get(pathString, loader).getCompressionKind();
-    } catch (ExecutionException e) {
-      throw new IOException("Unable to load orc metadata for " + path.toString(), e);
     }
-  }
 
-  public int getCompressionBufferSize(String pathString) throws IOException {
-    try {
-      return METADATA.get(pathString, loader).getCompressionBufferSize();
-    } catch (ExecutionException e) {
-      throw new IOException("Unable to load orc metadata for " + path.toString(), e);
-    }
+  public void putFileMetadata(String filePath, OrcFileMetadata metaData) {
+    METADATA.put(filePath, metaData);
   }
 
-  public List<OrcProto.Type> getTypes(String pathString) throws IOException {
-    try {
-      return METADATA.get(pathString, loader).getTypes();
-    } catch (ExecutionException e) {
-      throw new IOException("Unable to load orc metadata for " + path.toString(), e);
-    }
+  public void putStripeMetadata(OrcBatchKey stripeKey, OrcStripeMetadata metaData) {
+    STRIPE_METADATA.put(stripeKey, metaData);
   }
 
-  public List<StripeInformation> getStripes(String pathString) throws IOException {
-    try {
-      return METADATA.get(pathString, loader).getStripes();
-    } catch (ExecutionException e) {
-      throw new IOException("Unable to load orc metadata for " + path.toString(), e);
-    }
+  public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws IOException {
+    return STRIPE_METADATA.getIfPresent(stripeKey);
   }
 
-  //  public boolean[] getIncludedRowGroups(String pathString, SearchArgument sarg, int stripeIdx) throws IOException {
-  //    try {
-  //      return METADATA.get(pathString, loader).getStripeToRowIndexEntries();
-  //    } catch (ExecutionException e) {
-  //      throw new IOException("Unable to load orc metadata for " + path.toString(), e);
-  //    }
-  //  }
+  public OrcFileMetadata getFileMetadata(String pathString) throws IOException {
+    return METADATA.getIfPresent(pathString);
+  }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java Mon Feb  2 22:28:15 2015
@@ -18,50 +18,21 @@
 
 package org.apache.hadoop.hive.llap.io.metadata;
 
-import static org.apache.hadoop.hive.ql.io.orc.OrcFile.readerOptions;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.llap.io.orc.OrcFile;
 import org.apache.hadoop.hive.llap.io.orc.Reader;
-import org.apache.hadoop.hive.llap.io.orc.RecordReader;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto;
-import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
-
-public class OrcMetadataLoader implements Callable<OrcMetadata> {
-  private FileSystem fs;
-  private Path path;
-  private Configuration conf;
-
-  public OrcMetadataLoader(FileSystem fs, Path path, Configuration conf) {
-    this.fs = fs;
-    this.path = path;
-    this.conf = conf;
+
+// TODO: this class is pointless
+public class OrcMetadataLoader implements Callable<OrcFileMetadata> {
+  private Reader reader;
+
+  public OrcMetadataLoader(Reader reader) {
+    this.reader = reader;
   }
 
   @Override
-  public OrcMetadata call() throws Exception {
-    Reader reader = OrcFile.createLLAPReader(path, readerOptions(conf).filesystem(fs));
-    OrcMetadata orcMetadata = new OrcMetadata();
-    orcMetadata.setCompressionKind(reader.getCompression());
-    orcMetadata.setCompressionBufferSize(reader.getCompressionSize());
-    List<StripeInformation> stripes = reader.getStripes();
-    orcMetadata.setStripes(stripes);
-    Map<Integer, List<OrcProto.ColumnEncoding>> stripeColEnc = new HashMap<Integer, List<OrcProto.ColumnEncoding>>();
-    Map<Integer, OrcProto.RowIndex[]> stripeRowIndices = new HashMap<Integer, OrcProto.RowIndex[]>();
-    RecordReader rows = reader.rows();
-    for (int i = 0; i < stripes.size(); i++) {
-      stripeColEnc.put(i, rows.getColumnEncodings(i));
-      stripeRowIndices.put(i, rows.getRowIndexEntries(i));
-    }
-    orcMetadata.setStripeToColEncodings(stripeColEnc);
-    orcMetadata.setStripeToRowIndexEntries(stripeRowIndices);
-    return orcMetadata;
+  public OrcFileMetadata call() throws Exception {
+    return new OrcFileMetadata(reader);
   }
 }

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java?rev=1656595&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java Mon Feb  2 22:28:15 2015
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+
+public class OrcStripeMetadata {
+  // TODO#: add encoding and stream list
+  OrcProto.RowIndex[] rowIndexes;
+
+  public OrcStripeMetadata(RecordReader reader, int stripeIx) throws IOException {
+    rowIndexes = reader.getCurrentRowIndexEntries();
+  }
+
+  public OrcProto.RowIndex[] getRowIndexes() {
+    return rowIndexes;
+  }
+
+  public void setRowIndexes(OrcProto.RowIndex[] rowIndexes) {
+    this.rowIndexes = rowIndexes;
+  }
+}

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Mon Feb  2 22:28:15 2015
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.llap.io.ap
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.orc.*;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 
@@ -45,24 +47,6 @@ public class LLAPRecordReaderImpl extend
   }
 
   @Override
-  public OrcProto.RowIndex[] getRowIndexEntries(int stripeIdx) throws IOException {
-    return readRowIndex(stripeIdx);
-  }
-
-  @Override
-  public List<OrcProto.ColumnEncoding> getColumnEncodings(int stripeIdx) throws IOException {
-    StripeInformation si = stripes.get(stripeIdx);
-    OrcProto.StripeFooter sf = readStripeFooter(si);
-    return sf.getColumnsList();
-  }
-
-  @Override
-  public boolean[] getIncludedRowGroups(int stripeIdx) throws IOException {
-    currentStripe = stripeIdx;
-    return pickRowGroups();
-  }
-
-  @Override
   public boolean hasNext() throws IOException {
     return false;
   }
@@ -96,10 +80,4 @@ public class LLAPRecordReaderImpl extend
   public void seekToRow(long rowCount) throws IOException {
 
   }
-
-  @Override
-  public void readEncodedColumns(long[][] colRgs, int rgCount,
-      Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) {
-    throw new UnsupportedOperationException("not implemented");
-  }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java Mon Feb  2 22:28:15 2015
@@ -26,27 +26,4 @@ import org.apache.hadoop.hive.ql.io.orc.
  *
  */
 public interface RecordReader extends org.apache.hadoop.hive.ql.io.orc.RecordReader {
-  /**
-   * Return all row index entries for the specified stripe index.
-   *
-   * @param stripeIdx - stripe index within orc file
-   * @return - all row index entries
-   */
-  OrcProto.RowIndex[] getRowIndexEntries(int stripeIdx) throws IOException;
-
-  /**
-   * Return column encodings of all columns for the specified stripe index.
-   *
-   * @param stripeIdx - stripe index within orc file
-   * @return - column encodings of all columns
-   */
-  List<OrcProto.ColumnEncoding> getColumnEncodings(int stripeIdx) throws IOException;
-
-  /**
-   * Return the row groups that satisfy the SARG condition for the specified stripe index.
-   *
-   * @param stripeIdx - stripe index within orc file
-   * @return - row groups qualifying the SARG
-   */
-  boolean[] getIncludedRowGroups(int stripeIdx) throws IOException;
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Mon Feb  2 22:28:15 2015
@@ -21,17 +21,21 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.sun.tools.javac.code.Attribute.Array;
 
 abstract class InStream extends InputStream {
   private static final Log LOG = LogFactory.getLog(InStream.class);
@@ -125,6 +129,15 @@ abstract class InStream extends InputStr
     }
   }
 
+  private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+    // TODO: use the same pool as the ORC readers
+    if (isDirect) {
+      return ByteBuffer.allocateDirect(size);
+    } else {
+      return ByteBuffer.allocate(size);
+    }
+  }
+
   private static class CompressedStream extends InStream {
     private final String fileName;
     private final String name;
@@ -154,15 +167,6 @@ abstract class InStream extends InputStr
       this.cache = cache;
     }
 
-    private ByteBuffer allocateBuffer(int size, boolean isDirect) {
-      // TODO: use the same pool as the ORC readers
-      if (isDirect) {
-        return ByteBuffer.allocateDirect(size);
-      } else {
-        return ByteBuffer.allocate(size);
-      }
-    }
-
     // TODO: This should not be used for main path.
     private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1];
     private void allocateForUncompressed(int size, boolean isDirect) {
@@ -295,7 +299,7 @@ abstract class InStream extends InputStr
     }
 
     /* slices a read only contiguous buffer of chunkLength */
-      private ByteBuffer slice(int chunkLength) throws IOException {
+    private ByteBuffer slice(int chunkLength) throws IOException {
       int len = chunkLength;
       final long oldOffset = currentOffset;
       ByteBuffer slice;
@@ -478,4 +482,204 @@ abstract class InStream extends InputStr
       return new CompressedStream(fileName, name, input, length, codec, bufferSize, cache);
     }
   }
+
+  private static class ProcCacheChunk extends CacheChunk {
+    public ProcCacheChunk(long cbStartOffset, long cbEndOffset,
+        boolean isCompressed, ByteBuffer originalData, LlapMemoryBuffer targetBuffer) {
+      super(targetBuffer, cbStartOffset, cbEndOffset);
+      this.isCompressed = isCompressed;
+      this.originalData = originalData;
+    }
+
+    boolean isCompressed;
+    ByteBuffer originalData = null;
+  }
+
+  /**
+   * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
+   * and remove what we have returned. We will keep iterator as a "hint" point.
+   * TODO: Java LinkedList and iter have a really stupid interface. Replace with own simple one?
+   */
+  public static void uncompressStream(String fileName,
+      ListIterator<DiskRange> ranges,
+      CompressionCodec codec, int bufferSize, LowLevelCache cache,
+      long cOffset, long endCOffset, StreamBuffer colBuffer)
+          throws IOException {
+    // TODO#: accpount for coffsets being -1 after finishing the normal methods.
+    colBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
+    List<ProcCacheChunk> toDecompress = new ArrayList<ProcCacheChunk>();
+
+    // Find our bearings in the stream. Normally, iter will already point either to where we
+    // want to be, or just before. However, RGs can overlap due to encoding, so we may have
+    // to return to a previous block.
+    DiskRange current = findCompressedPosition(ranges, cOffset);
+
+    // Go thru the blocks; add stuff to results and prepare the decompression work (see below).
+    int nextCbOffset = (cOffset >= 0) ? (int)(cOffset - current.offset) : -1;
+    long currentCOffset = cOffset;
+    while (true) {
+      if (current instanceof CacheChunk) {
+        // This is a cached compression buffer, add as is.
+        if (nextCbOffset > 0) throw new AssertionError("Compressed offset in the middle of cb");
+        CacheChunk cc = (CacheChunk)current;
+        colBuffer.cacheBuffers.add(cc.buffer);
+        currentCOffset = cc.end;
+      } else {
+        // This is a compressed buffer. We need to uncompress it; the buffer can comprise
+        // several disk ranges, so we might need to combine them.
+        BufferChunk bc = (BufferChunk)current;
+        // TODO#: DOUBLE check the iterator state.
+        int chunkLength = addOneCompressionBuffer(bc, ranges, bufferSize,
+            cache, colBuffer.cacheBuffers, toDecompress, nextCbOffset);
+        currentCOffset = bc.offset + chunkLength;
+      }
+      nextCbOffset = -1;
+      if ((endCOffset >= 0 && currentCOffset >= endCOffset) || !ranges.hasNext()) {
+        break;
+      }
+      current = ranges.next();
+    }
+
+    // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
+    // data and some unallocated membufs for decompression. toDecompress contains all the work we
+    // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
+    // has also been adjusted to point to these buffers instead of compressed data for the ranges.
+    // Allocate the buffers, prepare cache kets.
+    LlapMemoryBuffer[] targetBuffers = new LlapMemoryBuffer[toDecompress.size()];
+    DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
+    int ix = 0;
+    for (ProcCacheChunk chunk : toDecompress) {
+      cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store DiskRange.
+      targetBuffers[ix] = chunk.buffer;
+      ++ix;
+    }
+    cache.allocateMultiple(targetBuffers, bufferSize);
+
+    // Now decompress (or copy) the data into cache buffers.
+    for (ProcCacheChunk chunk : toDecompress) {
+      if (chunk.isCompressed) {
+        codec.decompress(chunk.originalData, chunk.buffer.byteBuffer);
+      } else {
+        chunk.buffer.byteBuffer.put(chunk.originalData); // Copy uncompressed data to cache.
+      }
+      chunk.originalData = null; // TODO#: are we supposed to release this to zcr in some cases
+    }
+
+    // Finally, put data to cache.
+    cache.putFileData(fileName, cacheKeys, targetBuffers);
+  }
+
+  private static DiskRange findCompressedPosition(
+      ListIterator<DiskRange> ranges, long cOffset) {
+    if (cOffset < 0) return ranges.next();
+    DiskRange current = null;
+    boolean doCallNext = false;
+    if (ranges.hasNext()) {
+      current = ranges.next();
+    } else if (ranges.hasPrevious()) {
+      current = ranges.previous();
+      doCallNext = true;
+    }
+    // We expect the offset to be valid TODO: rather, validate
+    while (current.end <= cOffset) {
+      current = ranges.next();
+      doCallNext = false;
+    }
+    while (current.offset > cOffset) {
+      current = ranges.previous();
+      doCallNext = true;
+    }
+    if (doCallNext) {
+      // TODO: WTF?
+      ranges.next(); // We called previous, make sure next is the real next and not current.
+    }
+    return current;
+  }
+
+
+  private static int addOneCompressionBuffer(BufferChunk current,
+      ListIterator<DiskRange> ranges, int bufferSize,
+      LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
+      List<ProcCacheChunk> toDecompress, int nextCbOffsetExpected) throws IOException {
+    // TODO#: HERE
+    ByteBuffer slice = null;
+    ByteBuffer compressed = current.chunk;
+    if (nextCbOffsetExpected >= 0 && nextCbOffsetExpected != compressed.position()) {
+      throw new AssertionError("We don't know what we are doing anymore");
+    }
+    long cbStartOffset = current.offset + compressed.position();
+    int b0 = compressed.get() & 0xff;
+    int b1 = compressed.get() & 0xff;
+    int b2 = compressed.get() & 0xff;
+    int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+    if (chunkLength > bufferSize) {
+      throw new IllegalArgumentException("Buffer size too small. size = " +
+          bufferSize + " needed = " + chunkLength);
+    }
+    boolean isUncompressed = ((b0 & 0x01) == 1);
+    if (compressed.remaining() >= chunkLength) {
+      // Simple case - CB fits entirely in the disk range.
+      slice = compressed.slice();
+      slice.limit(chunkLength);
+      addOneCompressionBlockByteBuffer(slice, isUncompressed, ranges, cache, compressed,
+          cbStartOffset, chunkLength, toDecompress, cacheBuffers);
+      return chunkLength;
+    }
+
+    // TODO: we could remove extra copy for isUncompressed case.
+    // We need to consolidate 2 or more buffers into one to decompress.
+    ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+    int remaining = chunkLength - compressed.remaining();
+    copy.put(compressed);
+    ranges.remove();
+
+    while (ranges.hasNext()) {
+      DiskRange range = ranges.next();
+      if (!(range instanceof BufferChunk)) {
+        throw new IOException("Trying to extend compressed block into uncompressed block");
+      }
+      compressed = range.getData();
+      if (compressed.remaining() >= remaining) {
+        slice = compressed.slice();
+        slice.limit(remaining);
+        copy.put(slice);
+        addOneCompressionBlockByteBuffer(copy, isUncompressed, ranges, cache, compressed,
+            cbStartOffset, chunkLength, toDecompress, cacheBuffers);
+        return chunkLength;
+      }
+      remaining -= compressed.remaining();
+      copy.put(compressed);
+      ranges.remove();
+    }
+    throw new IOException("EOF in while trying to read "
+        + chunkLength + " bytes at " + cbStartOffset);
+  }
+
+  private static void addOneCompressionBlockByteBuffer(
+      ByteBuffer data, boolean isUncompressed,
+      ListIterator<DiskRange> ranges, LowLevelCache cache,
+      ByteBuffer compressed, long cbStartOffset, int chunkLength,
+      List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) {
+    // Prepare future cache buffer.
+    LlapMemoryBuffer futureAlloc = cache.createUnallocated();
+    // Add it to result in order we are processing.
+    cacheBuffers.add(futureAlloc);
+    // Add it to the list of work to decompress.
+    long cbEndOffset = cbStartOffset + chunkLength;
+    ProcCacheChunk cc = new ProcCacheChunk(
+        cbStartOffset, cbEndOffset, !isUncompressed, data, futureAlloc);
+    toDecompress.add(cc);
+    // Adjust the compression block position.
+    compressed.position(compressed.position() + chunkLength);
+    // Finally, put it in the ranges list for future use (if shared between RGs).
+    // Before anyone else accesses it, it would have been allocated and decompressed locally.
+    if (compressed.remaining() <= 0) {
+      ranges.set(cc);
+    } else {
+      ranges.previous();
+      ranges.add(cc);
+      ranges.next(); // TODO: This is really stupid.
+    }
+  }
+
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Mon Feb  2 22:28:15 2015
@@ -219,14 +219,17 @@ public class OrcInputFormat  implements
                                                   long offset, long length
                                                   ) throws IOException {
     Reader.Options options = new Reader.Options().range(offset, length);
-    boolean isOriginal =
-        !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+    boolean isOriginal = isOriginal(file);
     List<OrcProto.Type> types = file.getTypes();
     options.include(genIncludedColumns(types, conf, isOriginal));
     setSearchArgument(options, types, conf, isOriginal);
     return file.rowsOptions(options);
   }
 
+  public static boolean isOriginal(Reader file) {
+    return !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+  }
+
   /**
    * Recurse down into a type subtree turning on all of the sub-columns.
    * @param types the types of the file
@@ -278,6 +281,21 @@ public class OrcInputFormat  implements
     }
   }
 
+  public static String[] getSargColumnNames(String[] originalColumnNames,
+      List<OrcProto.Type> types, boolean[] includedColumns, boolean isOriginal) {
+    int rootColumn = getRootColumn(isOriginal);
+    String[] columnNames = new String[types.size() - rootColumn];
+    int i = 0;
+    for(int columnId: types.get(rootColumn).getSubtypesList()) {
+      if (includedColumns == null || includedColumns[columnId - rootColumn]) {
+        // this is guaranteed to be positive because types only have children
+        // ids greater than their own id.
+        columnNames[columnId - rootColumn] = originalColumnNames[i++];
+      }
+    }
+    return columnNames;
+  }
+
   static void setSearchArgument(Reader.Options options,
                                 List<OrcProto.Type> types,
                                 Configuration conf,
@@ -296,19 +314,8 @@ public class OrcInputFormat  implements
     }
 
     LOG.info("ORC pushdown predicate: " + sarg);
-    int rootColumn = getRootColumn(isOriginal);
-    String[] neededColumnNames = columnNamesString.split(",");
-    String[] columnNames = new String[types.size() - rootColumn];
-    boolean[] includedColumns = options.getInclude();
-    int i = 0;
-    for(int columnId: types.get(rootColumn).getSubtypesList()) {
-      if (includedColumns == null || includedColumns[columnId - rootColumn]) {
-        // this is guaranteed to be positive because types only have children
-        // ids greater than their own id.
-        columnNames[columnId - rootColumn] = neededColumnNames[i++];
-      }
-    }
-    options.searchArgument(sarg, columnNames);
+    options.searchArgument(sarg, getSargColumnNames(
+        columnNamesString.split(","), types, options.getInclude(), isOriginal));
   }
 
   @Override

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Mon Feb  2 22:28:15 2015
@@ -18,8 +18,11 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
@@ -82,6 +85,9 @@ public interface RecordReader {
    */
   void seekToRow(long rowCount) throws IOException;
 
+  void prepareEncodedColumnRead() throws IOException;
+
+  // TODO: maybe all of this should be moved to LLAP-specific class
   /**
    * TODO: this API is subject to change; on one hand, external code should control the threading
    *       aspects, with ORC method returning one EncodedColumn as it will; on the other, it's
@@ -89,13 +95,18 @@ public interface RecordReader {
    *       return many EncodedColumn-s.
    *  TODO: assumes the reader is for one stripe, otherwise the signature makes no sense.
    *        Also has no columns passed, because that is in ctor.
-   * @param colRgs Bitmasks of what RGs are to be read. Has # of elements equal to the number of
-   *               included columns; then each bitmask is rgCount bits long; 0 means "need to read"
-   * @param rgCount The length of bitmasks in colRgs.
-   * @param sarg Sarg to apply additional filtering to RGs.
+   * @param colRgs What RGs are to be read. Has # of elements equal to the number of
+   *               included columns; then each boolean is rgCount long.
+   * @param cache Cache to get/put data and allocate memory.
    * @param consumer Consumer to pass the results too.
-   * @param allocator Allocator to allocate memory.
+   * @throws IOException
    */
-  void readEncodedColumns(long[][] colRgs, int rgCount,
-      Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache);
+  void readEncodedColumns(int stripeIx, boolean[][] colRgs,
+      LowLevelCache cache, Consumer<EncodedColumn<OrcBatchKey>> consumer) throws IOException;
+
+  RowIndex[] getCurrentRowIndexEntries() throws IOException;
+
+  List<ColumnEncoding> getCurrentColumnEncodings() throws IOException;
+
+  void setRowIndex(RowIndex[] rowIndex);
 }



Mime
View raw message