hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1656977 - in /hive/branches/llap: llap-client/src/java/org/apache/hadoop/hive/llap/io/api/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/java/org/apac...
Date Wed, 04 Feb 2015 02:30:55 GMT
Author: sershe
Date: Wed Feb  4 02:30:54 2015
New Revision: 1656977

URL: http://svn.apache.org/r1656977
Log:
HIVE-9418p3 : Part of the encoded data production pipeline - missing index and bugfixing

Added:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
Removed:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java

Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java?rev=1656977&view=auto
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java (added)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java Wed Feb  4 02:30:54 2015
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+
+public class EncodedColumnBatch<BatchKey> {
+  // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
+  //       generality, and ability to not copy data from underlying low-level cached buffers.
+  public static class StreamBuffer {
+    // Decoder knows which stream this belongs to, and each buffer is a compression block,
+    // so he can figure out the offsets from metadata.
+    public List<LlapMemoryBuffer> cacheBuffers;
+
+    // StreamBuffer can be reused for many RGs (e.g. dictionary case). To avoid locking every
+    // LlapMemoryBuffer 500 times, have a separate refcount on StreamBuffer itself.
+    public AtomicInteger refCount = new AtomicInteger(0);
+    public void incRef() {
+      refCount.incrementAndGet();
+    }
+    public int decRef() {
+      int i = refCount.decrementAndGet();
+      assert i >= 0;
+      return i;
+    }
+  }
+
+  public BatchKey batchKey;
+  public StreamBuffer[][] columnData;
+  public int[] columnIxs;
+  public int colsRemaining = 0;
+
+  public EncodedColumnBatch(BatchKey batchKey, int columnCount, int colsRemaining) {
+    this.batchKey = batchKey;
+    this.columnData = new StreamBuffer[columnCount][];
+    this.columnIxs = new int[columnCount];
+    this.colsRemaining = colsRemaining;
+  }
+
+  public void merge(EncodedColumnBatch<BatchKey> other) {
+    // TODO: this may be called when high-level cache produces several columns and IO produces
+    //       several columns. So, for now this will never be called. Need to merge by columnIx-s.
+    throw new UnsupportedOperationException();
+  }
+
+  public void initColumn(int colIxMod, int colIx, int streamCount) {
+    columnIxs[colIxMod] = colIx;
+    columnData[colIxMod] = new StreamBuffer[streamCount];
+  }
+
+  public void setStreamData(int colIxMod, int streamIx, StreamBuffer sb) {
+    columnData[colIxMod][streamIx] = sb;
+  }
+
+  public void setAllStreams(int colIxMod, int colIx, StreamBuffer[] sbs) {
+    columnIxs[colIxMod] = colIx;
+    columnData[colIxMod] = sbs;
+  }
+}
\ No newline at end of file

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Wed Feb  4 02:30:54 2015
@@ -53,4 +53,5 @@ public interface LowLevelCache {
 
   LlapMemoryBuffer createUnallocated();
 
+  void notifyReused(LlapMemoryBuffer buffer);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java Wed Feb  4 02:30:54 2015
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 
 /** Dummy interface for now, might be different. */
 public interface Cache<CacheKey> {
-  public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value);
-  public StreamBuffer get(CacheKey key);
+  public StreamBuffer[] cacheOrGet(CacheKey key, StreamBuffer[] value);
+  public StreamBuffer[] get(CacheKey key);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Wed Feb  4 02:30:54 2015
@@ -403,4 +403,10 @@ public class LowLevelCacheImpl implement
   public LlapMemoryBuffer createUnallocated() {
     return new LlapCacheableBuffer();
   }
+
+  @Override
+  public void notifyReused(LlapMemoryBuffer buffer) {
+    int newVal = ((LlapCacheableBuffer)buffer).incRef();
+    assert newVal > 1;
+  }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java Wed Feb  4 02:30:54 2015
@@ -18,16 +18,16 @@
 
 package org.apache.hadoop.hive.llap.cache;
 
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 
 public class NoopCache<CacheKey> implements Cache<CacheKey> {
   @Override
-  public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value) {
+  public StreamBuffer[] cacheOrGet(CacheKey key, StreamBuffer[] value) {
     return value;
   }
 
   @Override
-  public StreamBuffer get(CacheKey key) {
+  public StreamBuffer[] get(CacheKey key) {
     return null;  // TODO: ensure real implementation increases refcount
   }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Wed Feb  4 02:30:54 2015
@@ -26,8 +26,8 @@ import java.util.concurrent.ExecutorServ
 
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
@@ -42,22 +42,12 @@ public abstract class ColumnVectorProduc
     this.executor = executor;
   }
 
-  // TODO#: Given how ORC reads data, it should return this and not separate columns.
-  static class EncodedColumnBatch {
-    public EncodedColumnBatch(int colCount) {
-      columnDatas = new StreamBuffer[colCount][];
-      columnsRemaining = colCount;
-    }
-    public StreamBuffer[][] columnDatas;
-    public int columnsRemaining;
-  }
-
   private class EncodedDataConsumer implements ConsumerFeedback<ColumnVectorBatch>,
-      Consumer<EncodedColumn<BatchKey>> {
+      Consumer<EncodedColumnBatch<BatchKey>> {
     private volatile boolean isStopped = false;
     // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
-    private final HashMap<BatchKey, EncodedColumnBatch> pendingData =
-        new HashMap<BatchKey, EncodedColumnBatch>();
+    private final HashMap<BatchKey, EncodedColumnBatch<BatchKey>> pendingData =
+        new HashMap<BatchKey, EncodedColumnBatch<BatchKey>>();
     private ConsumerFeedback<StreamBuffer> upstreamFeedback;
     private final Consumer<ColumnVectorBatch> downstreamConsumer;
     private final int colCount;
@@ -72,32 +62,31 @@ public abstract class ColumnVectorProduc
     }
 
     @Override
-    public void consumeData(EncodedColumn<BatchKey> data) {
-      EncodedColumnBatch targetBatch = null;
+    public void consumeData(EncodedColumnBatch<BatchKey> data) {
+      EncodedColumnBatch<BatchKey> targetBatch = null;
       boolean localIsStopped = false;
       synchronized (pendingData) {
         localIsStopped = isStopped;
         if (!localIsStopped) {
           targetBatch = pendingData.get(data.batchKey);
           if (targetBatch == null) {
-            targetBatch = new EncodedColumnBatch(colCount);
-            pendingData.put(data.batchKey, targetBatch);
+            pendingData.put(data.batchKey, data);
+            return;
           }
         }
       }
       if (localIsStopped) {
-        returnProcessed(data.streamData);
+        returnProcessed(data.columnData);
         return;
       }
 
       int colsRemaining = -1;
       synchronized (targetBatch) {
         // Check if we are stopped and the batch was already cleaned.
-        localIsStopped = (targetBatch.columnDatas == null);
+        localIsStopped = (targetBatch.columnData == null);
         if (!localIsStopped) {
-          targetBatch.columnDatas[data.columnIndex] = data.streamData;
-          colsRemaining = --targetBatch.columnsRemaining;
-          if (0 == colsRemaining) {
+          targetBatch.merge(data);
+          if (0 == targetBatch.colsRemaining) {
             synchronized (pendingData) {
               targetBatch = isStopped ? null : pendingData.remove(data.batchKey);
             }
@@ -108,21 +97,20 @@ public abstract class ColumnVectorProduc
         }
       }
       if (localIsStopped) {
-        returnProcessed(data.streamData);
+        returnProcessed(data.columnData);
         return;
       }
       if (0 == colsRemaining) {
-        ColumnVectorProducer.this.decodeBatch(data.batchKey, targetBatch, downstreamConsumer);
+        ColumnVectorProducer.this.decodeBatch(targetBatch, downstreamConsumer);
         // Batch has been decoded; unlock the buffers in cache
-        for (StreamBuffer[] columnData : targetBatch.columnDatas) {
-          returnProcessed(columnData);
-        }
+        returnProcessed(targetBatch.columnData);
       }
     }
 
-    private void returnProcessed(StreamBuffer[] data) {
-      for (StreamBuffer sb : data) {
-        if (sb.decRef() == 0) {
+    private void returnProcessed(StreamBuffer[][] data) {
+      for (StreamBuffer[] sbs : data) {
+        for (StreamBuffer sb : sbs) {
+          if (sb.decRef() != 0) continue;
           upstreamFeedback.returnData(sb);
         }
       }
@@ -151,8 +139,8 @@ public abstract class ColumnVectorProduc
     }
 
     private void dicardPendingData(boolean isStopped) {
-      List<StreamBuffer> dataToDiscard = new ArrayList<StreamBuffer>(pendingData.size() * colCount);
-      List<EncodedColumnBatch> batches = new ArrayList<EncodedColumnBatch>(pendingData.size());
+      List<EncodedColumnBatch<BatchKey>> batches = new ArrayList<EncodedColumnBatch<BatchKey>>(
+          pendingData.size());
       synchronized (pendingData) {
         if (isStopped) {
           this.isStopped = true;
@@ -160,14 +148,15 @@ public abstract class ColumnVectorProduc
         batches.addAll(pendingData.values());
         pendingData.clear();
       }
-      for (EncodedColumnBatch batch : batches) {
+      List<StreamBuffer> dataToDiscard = new ArrayList<StreamBuffer>(batches.size() * colCount * 2);
+      for (EncodedColumnBatch<BatchKey> batch : batches) {
         synchronized (batch) {
-          for (StreamBuffer[] bb : batch.columnDatas) {
+          for (StreamBuffer[] bb : batch.columnData) {
             for (StreamBuffer b : bb) {
               dataToDiscard.add(b);
             }
           }
-          batch.columnDatas = null;
+          batch.columnData = null;
         }
       }
       for (StreamBuffer data : dataToDiscard) {
@@ -211,13 +200,6 @@ public abstract class ColumnVectorProduc
     // Get the source of encoded data.
     EncodedDataProducer<BatchKey> edp = getEncodedDataProducer();
     // Then, get the specific reader of encoded data out of the producer.
-    /*
-[ERROR] reason: actual argument 
-org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer<BatchKey>.EncodedDataConsumer
-cannot be converted to 
-org.apache.hadoop.hive.llap.Consumer<
-  org.apache.hadoop.hive.llap.io.api.EncodedColumn<
-    org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey>> by method invocation conversion     * */
     EncodedDataReader<BatchKey> reader = edp.getReader(split, columnIds, sarg, columnNames, edc);
     // Set the encoded data reader as upstream feedback for encoded data consumer, and start.
     edc.init(reader);
@@ -227,6 +209,6 @@ org.apache.hadoop.hive.llap.Consumer<
 
   protected abstract EncodedDataProducer<BatchKey> getEncodedDataProducer();
 
-  protected abstract void decodeBatch(BatchKey batchKey, EncodedColumnBatch batch,
+  protected abstract void decodeBatch(EncodedColumnBatch<BatchKey> batch,
       Consumer<ColumnVectorBatch> downstreamConsumer);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java Wed Feb  4 02:30:54 2015
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorServ
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
@@ -42,8 +43,8 @@ public class OrcColumnVectorProducer ext
   }
 
   @Override
-  protected void decodeBatch(OrcBatchKey batchKey,
-      EncodedColumnBatch batch, Consumer<ColumnVectorBatch> downstreamConsumer) {
+  protected void decodeBatch(EncodedColumnBatch<OrcBatchKey> batch,
+      Consumer<ColumnVectorBatch> downstreamConsumer) {
     throw new UnsupportedOperationException("not implemented");
     // TODO:  HERE decode EncodedColumn-s into ColumnVector-s
     //        sarg columns first, apply sarg, then decode others if needed; can cols skip values?

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java Wed Feb  4 02:30:54 2015
@@ -21,12 +21,11 @@ package org.apache.hadoop.hive.llap.io.e
 import java.util.List;
 
 import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.mapred.InputSplit;
 
 public interface EncodedDataProducer<BatchKey> {
   EncodedDataReader<BatchKey> getReader(InputSplit split, List<Integer> columnIds,
-      SearchArgument sarg, String[] columnNames, Consumer<EncodedColumn<BatchKey>> consumer);
+      SearchArgument sarg, String[] columnNames, Consumer<EncodedColumnBatch<BatchKey>> consumer);
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java Wed Feb  4 02:30:54 2015
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.llap.io.e
 import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 
 /**
  * Interface for encoded data readers to implement.

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Wed Feb  4 02:30:54 2015
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.Cache;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
@@ -59,12 +59,12 @@ public class OrcEncodedDataProducer impl
   private final LowLevelCache lowLevelCache;
 
   private class OrcEncodedDataReader implements EncodedDataReader<OrcBatchKey>,
-    Consumer<EncodedColumn<OrcBatchKey>> {
+    Consumer<EncodedColumnBatch<OrcBatchKey>> {
     private final FileSplit split;
     private List<Integer> columnIds;
     private final SearchArgument sarg;
     private final String[] columnNames;
-    private final Consumer<EncodedColumn<OrcBatchKey>> consumer;
+    private final Consumer<EncodedColumnBatch<OrcBatchKey>> consumer;
 
 
     // Read state.
@@ -79,7 +79,7 @@ public class OrcEncodedDataProducer impl
     private boolean isStopped = false, isPaused = false;
 
     public OrcEncodedDataReader(InputSplit split, List<Integer> columnIds,
-        SearchArgument sarg, String[] columnNames, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
+        SearchArgument sarg, String[] columnNames, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) {
       this.split = (FileSplit)split;
       this.internedFilePath = this.split.getPath().toString().intern();
       this.columnIds = columnIds;
@@ -128,6 +128,7 @@ public class OrcEncodedDataProducer impl
           columnIds.add(i);
         }
       }
+
       // Then, determine which stripes to read based on the split.
       determineStripesToRead(metadata.getStripes());
       if (readState.length == 0) {
@@ -135,52 +136,44 @@ public class OrcEncodedDataProducer impl
         return null; // No data to read.
       }
       int stride = metadata.getRowIndexStride();
-      ArrayList<OrcStripeMetadata> stripesMetadata = null;
+      ArrayList<OrcStripeMetadata> stripeMetadatas = null;
       boolean[] globalIncludes = OrcInputFormat.genIncludedColumns(
           metadata.getTypes(), columnIds, true);
       RecordReader[] stripeReaders = new RecordReader[readState.length];
       if (sarg != null && stride != 0) {
         // If SARG is present, get relevant stripe metadata from cache or readers.
-        stripesMetadata = readStripesMetadata(metadata, globalIncludes, stripeReaders);
+        stripeMetadatas = readStripesMetadata(metadata, globalIncludes, stripeReaders);
       }
 
       // Now, apply SARG if any; w/o sarg, this will just initialize readState.
       determineRgsToRead(metadata.getStripes(), metadata.getTypes(),
-          globalIncludes, stride, stripesMetadata);
+          globalIncludes, stride, stripeMetadatas);
       if (isStopped) return null;
       // Get data from high-level cache; if some cols are fully in cache, this will also
       // give us the modified list of columns to read for every stripe (null means all).
       List<Integer>[] stripeColsToRead = produceDataFromCache(metadata.getStripes(), stride);
       // readState has been modified for column x rgs that were fetched from cache.
 
-      // Then, create the readers for each stripe and prepare to read.
+      // Then, create the readers for each stripe and prepare to read. We will create reader
+      // with global column list and then separately pass stripe-specific includes below, to
+      // allow it create the batches to return based on the former but only read the latter.
       for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length; ++stripeIxMod) {
-        List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
         RecordReader stripeReader = stripeReaders[stripeIxMod];
-        if (colsToRead == null) {
-          colsToRead = columnIds;
-        } else if (colsToRead.isEmpty()) {
+        if (stripeColsToRead != null && stripeColsToRead[stripeIxMod].isEmpty()) {
           if (stripeReader != null) {
             stripeReader.close();
             stripeReaders[stripeIxMod] = null;
           }
           continue; // All the data for this stripe was in cache.
-        } else if (stripeReader != null) {
-          // We have created the reader to read stripe metadata with all includes.
-          // We will now recreate the reader with narrower included columns (due to cache).
-          stripeReader.close();
-          stripeReader = null;
         }
 
         if (stripeReader != null) continue; // We already have a reader.
         // Create RecordReader that will be used to read only this stripe.
         StripeInformation si = metadata.getStripes().get(stripeIxFrom + stripeIxMod);
-        boolean[] stripeIncludes = OrcInputFormat.genIncludedColumns(
-            metadata.getTypes(), colsToRead, true);
         if (orcReader == null) {
           orcReader = createOrcReader(split);
         }
-        stripeReader = orcReader.rows(si.getOffset(), si.getLength(), stripeIncludes);
+        stripeReader = orcReader.rows(si.getOffset(), si.getLength(), globalIncludes);
         stripeReader.prepareEncodedColumnRead();
         stripeReaders[stripeIxMod] = stripeReader;
       }
@@ -192,42 +185,50 @@ public class OrcEncodedDataProducer impl
         RecordReader stripeReader = stripeReaders[stripeIxMod];
         if (stripeReader == null) continue; // No need to read this stripe, see above.
         List<Integer> colsToRead = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
-        if (colsToRead == null) {
-          colsToRead = columnIds;
-        }
+        boolean[] stripeIncludes = null;
         boolean[][] colRgs = readState[stripeIxMod];
-        if (colsToRead != null && colsToRead.size() != colRgs.length) {
-          // We are reading subset of the original columns, remove unnecessary bitmasks.
+        if (colsToRead == null || colsToRead.size() == colRgs.length) {
+          colsToRead = columnIds;
+          stripeIncludes = globalIncludes;
+        } else {
+          // We are reading subset of the original columns, remove unnecessary bitmasks/etc.
+          // This will never happen w/o high-level cache.
+          stripeIncludes = OrcInputFormat.genIncludedColumns(metadata.getTypes(), colsToRead, true);
           boolean[][] colRgs2 = new boolean[colsToRead.size()][];
           for (int i = 0, i2 = -1; i < colRgs.length; ++i) {
             if (colRgs[i] == null) continue;
-            colRgs2[++i2] = colRgs[i];
+            colRgs2[i2] = colRgs[i];
+            ++i2;
           }
           colRgs = colRgs2;
         }
 
-        // Get stripe metadata. We might have read it earlier for RG filtering.
+        // Get stripe metadata from cache or reader. We might have read it before for RG filtering.
         OrcStripeMetadata stripeMetadata;
         int stripeIx = stripeIxMod + stripeIxFrom;
-        if (stripesMetadata != null) {
-          stripeMetadata = stripesMetadata.get(stripeIxMod);
+        if (stripeMetadatas != null) {
+          stripeMetadata = stripeMetadatas.get(stripeIxMod);
         } else {
           stripeKey.stripeIx = stripeIx;
           stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
           if (stripeMetadata == null) {
-            stripeMetadata = new OrcStripeMetadata(stripeReader, stripeKey.stripeIx);
+            stripeMetadata = new OrcStripeMetadata(
+                stripeReader, stripeKey.stripeIx, stripeIncludes);
             metadataCache.putStripeMetadata(stripeKey, stripeMetadata);
             stripeKey = new OrcBatchKey(internedFilePath, -1, 0);
           }
         }
-        stripeReader.setRowIndex(stripeMetadata.getRowIndexes());
+        // Set stripe metadata externally in the reader.
+        stripeReader.setMetadata(stripeMetadata.getRowIndexes(),
+            stripeMetadata.getEncodings(), stripeMetadata.getStreams());
 
         // In case if we have high-level cache, we will intercept the data and add it there;
         // otherwise just pass the data directly to the consumer.
-        Consumer<EncodedColumn<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this;
+        Consumer<EncodedColumnBatch<OrcBatchKey>> consumer = (cache == null) ? this.consumer : this;
         // This is where I/O happens. This is a sync call that will feed data to the consumer.
         try {
-          stripeReader.readEncodedColumns(stripeIx, colRgs, lowLevelCache, consumer);
+          stripeReader.readEncodedColumns(
+              stripeIx, stripeIncludes, colRgs, lowLevelCache, consumer);
         } catch (Throwable t) {
           consumer.setError(t);
         }
@@ -257,7 +258,7 @@ public class OrcEncodedDataProducer impl
           StripeInformation si = metadata.getStripes().get(stripeKey.stripeIx);
           stripeReaders[stripeIxMod] = orcReader.rows(si.getOffset(), si.getLength(), globalInc);
           stripeReaders[stripeIxMod].prepareEncodedColumnRead();
-          value = new OrcStripeMetadata(stripeReaders[stripeIxMod], stripeKey.stripeIx);
+          value = new OrcStripeMetadata(stripeReaders[stripeIxMod], stripeKey.stripeIx, globalInc);
           metadataCache.putStripeMetadata(stripeKey, value);
           // Create new key object to reuse for gets; we've used the old one to put in cache.
           stripeKey = new OrcBatchKey(internedFilePath, 0, 0);
@@ -357,50 +358,49 @@ public class OrcEncodedDataProducer impl
       for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
         key.stripeIx = stripeIxFrom + stripeIxMod;
         boolean[][] cols = readState[stripeIxMod];
-        // TODO## at self-CR, see that colIx business here was not screwed up
-        for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
-          boolean[] readMask = cols[colIxMod];
-          key.colIx = columnIds.get(colIxMod);
-          // Assume first all RGs will be in cache; calculate or get the RG count.
-          boolean areAllRgsInCache = true;
-          int rgCount = readMask != null ? readMask.length
-              : getRgCount(stripes.get(key.stripeIx), rowIndexStride);
-          for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
-            if (readMask != null && !readMask[rgIx]) continue; // RG eliminated by SARG
-            key.rgIx = rgIx;
-            StreamBuffer cached = cache.get(key);
+        boolean[] isMissingAnyRgs = new boolean[cols.length];
+        int totalRgCount = getRgCount(stripes.get(key.stripeIx), rowIndexStride);
+        for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
+          EncodedColumnBatch<OrcBatchKey> col = new EncodedColumnBatch<OrcBatchKey>(
+              new OrcBatchKey(internedFilePath, key.stripeIx, rgIx), cols.length, cols.length);
+          boolean hasAnyCached = false;
+          key.rgIx = rgIx;
+          for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
+            boolean[] readMask = cols[colIxMod];
+            // Check if RG is eliminated by SARG
+            if (readMask != null && (readMask.length <= rgIx || !readMask[rgIx])) continue;
+            key.colIx = columnIds.get(colIxMod);
+            StreamBuffer[] cached = cache.get(key);
             if (cached == null) {
-              areAllRgsInCache = false;
+              isMissingAnyRgs[colIxMod] = true;
               continue;
             }
-            // RG was in cache; send it over to the consumer.
-            // TODO: pool of EncodedColumn-s objects. Someone will need to return them though.
-            EncodedColumn<OrcBatchKey> col = null;
-            // TODO# new EncodedColumn<OrcBatchKey>(key.copyToPureBatchKey(), key.colIx, cached);
-            consumer.consumeData(col);
+            col.setAllStreams(colIxMod, key.colIx, cached);
+            hasAnyCached = true;
             if (readMask == null) {
               // We were going to read all RGs, but now that some were in cache, allocate the mask.
-              cols[colIxMod] = readMask = new boolean[rgCount];
+              cols[colIxMod] = readMask = new boolean[totalRgCount];
               Arrays.fill(readMask, true);
             }
             readMask[rgIx] = false; // Got from cache, don't read from disk.
           }
-          boolean hasExplicitColList = stripeColsNotInCache[stripeIxMod] != null;
-          if (areAllRgsInCache) {
-            if (!hasExplicitColList) {
-              // All rgs for this stripe x column were fetched from cache. If this is the first
-              // such column, create custom, smaller list of columns to fetch later for this
-              // stripe (default is all the columns originally requested). Add all previous
-              // columns, need to fetch them since this is the first column.
-              stripeColsNotInCache[stripeIxMod] = new ArrayList<Integer>(cols.length);
-              if (stripeIxMod > 0) {
-                stripeColsNotInCache[stripeIxMod].addAll(columnIds.subList(0, colIxMod));
-              }
+          if (hasAnyCached) {
+            consumer.consumeData(col);
+          }
+        }
+        boolean makeStripeColList = false; // By default assume we'll fetch all original columns.
+        for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
+          if (isMissingAnyRgs[colIxMod]) {
+            if (makeStripeColList) {
+              stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
+            }
+          } else if (!makeStripeColList) {
+            // Some columns were fully in cache. Make a per-stripe col list, add previous columns.
+            makeStripeColList = true;
+            stripeColsNotInCache[stripeIxMod] = new ArrayList<Integer>(cols.length - 1);
+            for (int i = 0; i < colIxMod; ++i) {
+              stripeColsNotInCache[stripeIxMod].add(columnIds.get(i));
             }
-          } else if (hasExplicitColList) {
-            // Only a subset of original columnIds need to be fetched for this stripe;
-            // add the current one to this sublist.
-            stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
           }
         }
       }
@@ -413,18 +413,21 @@ public class OrcEncodedDataProducer impl
     }
 
     @Override
-    public void consumeData(EncodedColumn<OrcBatchKey> data) {
+    public void consumeData(EncodedColumnBatch<OrcBatchKey> data) {
       // Store object in cache; create new key object - cannot be reused.
       assert cache != null;
-      OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex);
-      // TODO#: change type of cache and restore this
-      /*
-      StreamBuffer cached = cache.cacheOrGet(key, data.columnData);
-      if (data.streamData != cached) {
-        lowLevelCache.releaseBuffers(data.columnData.cacheBuffers);
-        data.columnData = cached;
+      for (int i = 0; i < data.columnData.length; ++i) {
+        OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIxs[i]);
+        StreamBuffer[] toCache = data.columnData[i];
+        StreamBuffer[] cached = cache.cacheOrGet(key, toCache);
+        if (toCache != cached) {
+          for (StreamBuffer sb : toCache) {
+            if (sb.decRef() != 0) continue;
+            lowLevelCache.releaseBuffers(sb.cacheBuffers);
+          }
+          data.columnData[i] = cached;
+        }
       }
-      */
       consumer.consumeData(data);
     }
 
@@ -455,7 +458,7 @@ public class OrcEncodedDataProducer impl
 
   @Override
   public EncodedDataReader<OrcBatchKey> getReader(InputSplit split, List<Integer> columnIds,
-      SearchArgument sarg, String[] columnNames, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
+      SearchArgument sarg, String[] columnNames, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) {
     return new OrcEncodedDataReader(split, columnIds, sarg, columnNames, consumer);
   }
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java Wed Feb  4 02:30:54 2015
@@ -18,23 +18,47 @@
 package org.apache.hadoop.hive.llap.io.metadata;
 
 import java.io.IOException;
+import java.util.List;
 
-import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
 import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 
 public class OrcStripeMetadata {
-  // TODO#: add encoding and stream list
-  OrcProto.RowIndex[] rowIndexes;
-
-  public OrcStripeMetadata(RecordReader reader, int stripeIx) throws IOException {
-    rowIndexes = reader.getCurrentRowIndexEntries();
+  List<ColumnEncoding> encodings;
+  List<Stream> streams;
+  RowIndex[] rowIndexes;
+
+  public OrcStripeMetadata(
+      RecordReader reader, int stripeIx, boolean[] includes) throws IOException {
+    rowIndexes = reader.getCurrentRowIndexEntries(includes);
+    streams = reader.getCurrentStreams();
+    encodings = reader.getCurrentColumnEncodings();
   }
 
-  public OrcProto.RowIndex[] getRowIndexes() {
+  public RowIndex[] getRowIndexes() {
     return rowIndexes;
   }
 
-  public void setRowIndexes(OrcProto.RowIndex[] rowIndexes) {
+  public void setRowIndexes(RowIndex[] rowIndexes) {
     this.rowIndexes = rowIndexes;
   }
+
+  public List<ColumnEncoding> getEncodings() {
+    return encodings;
+  }
+
+  public void setEncodings(List<ColumnEncoding> encodings) {
+    this.encodings = encodings;
+  }
+
+  public List<Stream> getStreams() {
+    return streams;
+  }
+
+  public void setStreams(List<Stream> streams) {
+    this.streams = streams;
+  }
+
 }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Wed Feb  4 02:30:54 2015
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Wed Feb  4 02:30:54 2015
@@ -148,7 +148,7 @@ public final class FileDump {
           System.out.println(buf);
         }
         if (rowIndexCols != null) {
-          RowIndex[] indices = rows.readRowIndex(stripeIx);
+          RowIndex[] indices = rows.readRowIndex(stripeIx, null);
           for (int col : rowIndexCols) {
             StringBuilder buf = new StringBuilder();
             buf.append("    Row group index column ").append(col).append(":");

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Wed Feb  4 02:30:54 2015
@@ -30,12 +30,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
+import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.sun.tools.javac.code.Attribute.Array;
 
 abstract class InStream extends InputStream {
   private static final Log LOG = LogFactory.getLog(InStream.class);
@@ -484,11 +484,12 @@ abstract class InStream extends InputStr
   }
 
   private static class ProcCacheChunk extends CacheChunk {
-    public ProcCacheChunk(long cbStartOffset, long cbEndOffset,
-        boolean isCompressed, ByteBuffer originalData, LlapMemoryBuffer targetBuffer) {
+    public ProcCacheChunk(long cbStartOffset, long cbEndOffset, boolean isCompressed,
+        ByteBuffer originalData, LlapMemoryBuffer targetBuffer) {
       super(targetBuffer, cbStartOffset, cbEndOffset);
       this.isCompressed = isCompressed;
       this.originalData = originalData;
+      setReused(); // This block is immediately used by code that does the decompression.
     }
 
     boolean isCompressed;
@@ -499,15 +500,16 @@ abstract class InStream extends InputStr
    * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
    * and remove what we have returned. We will keep iterator as a "hint" point.
    * TODO: Java LinkedList and iter have a really stupid interface. Replace with own simple one?
+   * @param zcr
    */
   public static void uncompressStream(String fileName,
-      ListIterator<DiskRange> ranges,
+      ZeroCopyReaderShim zcr, ListIterator<DiskRange> ranges,
       CompressionCodec codec, int bufferSize, LowLevelCache cache,
       long cOffset, long endCOffset, StreamBuffer colBuffer)
           throws IOException {
-    // TODO#: accpount for coffsets being -1 after finishing the normal methods.
     colBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
-    List<ProcCacheChunk> toDecompress = new ArrayList<ProcCacheChunk>();
+    List<ProcCacheChunk> toDecompress = null;
+    List<ByteBuffer> toRelease = null;
 
     // Find our bearings in the stream. Normally, iter will already point either to where we
     // want to be, or just before. However, RGs can overlap due to encoding, so we may have
@@ -515,13 +517,20 @@ abstract class InStream extends InputStr
     DiskRange current = findCompressedPosition(ranges, cOffset);
 
     // Go thru the blocks; add stuff to results and prepare the decompression work (see below).
-    int nextCbOffset = (cOffset >= 0) ? (int)(cOffset - current.offset) : -1;
+    if (cOffset >= 0 && cOffset != current.offset) {
+      // We adjust offsets as we decompress, we expect to decompress sequentially, and we cache and
+      // decompress entire CBs (obviously). Therefore the offset in the next DiskRange should
+      // always be the start offset of a CB. TODO: what about at start?
+      throw new AssertionError("Unexpected offset - for " + cOffset + ", got " + current.offset);
+    }
     long currentCOffset = cOffset;
     while (true) {
       if (current instanceof CacheChunk) {
         // This is a cached compression buffer, add as is.
-        if (nextCbOffset > 0) throw new AssertionError("Compressed offset in the middle of cb");
         CacheChunk cc = (CacheChunk)current;
+        if (cc.setReused()) {
+          cache.notifyReused(cc.buffer);
+        }
         colBuffer.cacheBuffers.add(cc.buffer);
         currentCOffset = cc.end;
       } else {
@@ -529,11 +538,14 @@ abstract class InStream extends InputStr
         // several disk ranges, so we might need to combine them.
         BufferChunk bc = (BufferChunk)current;
         // TODO#: DOUBLE check the iterator state.
-        int chunkLength = addOneCompressionBuffer(bc, ranges, bufferSize,
-            cache, colBuffer.cacheBuffers, toDecompress, nextCbOffset);
+        if (toDecompress == null) {
+          toDecompress = new ArrayList<ProcCacheChunk>();
+          toRelease = (zcr == null) ? null : new ArrayList<ByteBuffer>();
+        }
+        int chunkLength = addOneCompressionBuffer(bc, ranges, zcr, bufferSize,
+            cache, colBuffer.cacheBuffers, toDecompress, toRelease);
         currentCOffset = bc.offset + chunkLength;
       }
-      nextCbOffset = -1;
       if ((endCOffset >= 0 && currentCOffset >= endCOffset) || !ranges.hasNext()) {
         break;
       }
@@ -544,12 +556,14 @@ abstract class InStream extends InputStr
     // data and some unallocated membufs for decompression. toDecompress contains all the work we
     // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
     // has also been adjusted to point to these buffers instead of compressed data for the ranges.
-    // Allocate the buffers, prepare cache kets.
+    // Allocate the buffers, prepare cache keys.
+    if (toDecompress == null) return; // Nothing to decompress.
+
     LlapMemoryBuffer[] targetBuffers = new LlapMemoryBuffer[toDecompress.size()];
     DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
     int ix = 0;
     for (ProcCacheChunk chunk : toDecompress) {
-      cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store DiskRange.
+      cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
       targetBuffers[ix] = chunk.buffer;
       ++ix;
     }
@@ -562,7 +576,15 @@ abstract class InStream extends InputStr
       } else {
         chunk.buffer.byteBuffer.put(chunk.originalData); // Copy uncompressed data to cache.
       }
-      chunk.originalData = null; // TODO#: are we supposed to release this to zcr in some cases
+      chunk.originalData = null;
+    }
+
+    // Release original compressed buffers to zero-copy reader if needed.
+    if (toRelease != null) {
+      assert zcr != null;
+      for (ByteBuffer buf : toRelease) {
+        zcr.releaseBuffer(buf);
+      }
     }
 
     // Finally, put data to cache.
@@ -598,15 +620,11 @@ abstract class InStream extends InputStr
 
 
   private static int addOneCompressionBuffer(BufferChunk current,
-      ListIterator<DiskRange> ranges, int bufferSize,
+      ListIterator<DiskRange> ranges, ZeroCopyReaderShim zcr, int bufferSize,
       LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
-      List<ProcCacheChunk> toDecompress, int nextCbOffsetExpected) throws IOException {
-    // TODO#: HERE
+      List<ProcCacheChunk> toDecompress, List<ByteBuffer> toRelease) throws IOException {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.chunk;
-    if (nextCbOffsetExpected >= 0 && nextCbOffsetExpected != compressed.position()) {
-      throw new AssertionError("We don't know what we are doing anymore");
-    }
     long cbStartOffset = current.offset + compressed.position();
     int b0 = compressed.get() & 0xff;
     int b1 = compressed.get() & 0xff;
@@ -621,8 +639,12 @@ abstract class InStream extends InputStr
       // Simple case - CB fits entirely in the disk range.
       slice = compressed.slice();
       slice.limit(chunkLength);
-      addOneCompressionBlockByteBuffer(slice, isUncompressed, ranges, cache, compressed,
-          cbStartOffset, chunkLength, toDecompress, cacheBuffers);
+      addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset,
+          chunkLength, cache, compressed, ranges, toDecompress, cacheBuffers);
+      current.offset += chunkLength;
+      if (compressed.remaining() <= 0 && zcr != null) {
+        toRelease.add(compressed);
+      }
       return chunkLength;
     }
 
@@ -632,6 +654,13 @@ abstract class InStream extends InputStr
     int remaining = chunkLength - compressed.remaining();
     copy.put(compressed);
     ranges.remove();
+    if (zcr != null) {
+      if (compressed.position() == 0) {
+        zcr.releaseBuffer(compressed); // We copied the entire buffer.
+      } else {
+        toRelease.add(compressed); // There might be slices depending on this buffer.
+      }
+    }
 
     while (ranges.hasNext()) {
       DiskRange range = ranges.next();
@@ -643,22 +672,28 @@ abstract class InStream extends InputStr
         slice = compressed.slice();
         slice.limit(remaining);
         copy.put(slice);
-        addOneCompressionBlockByteBuffer(copy, isUncompressed, ranges, cache, compressed,
-            cbStartOffset, chunkLength, toDecompress, cacheBuffers);
+        addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
+            chunkLength, cache, compressed, ranges, toDecompress, cacheBuffers);
+        range.offset += chunkLength;
+        if (compressed.remaining() <= 0 && zcr != null) {
+          zcr.releaseBuffer(compressed); // We copied the entire buffer.
+        }
         return chunkLength;
       }
       remaining -= compressed.remaining();
       copy.put(compressed);
+      if (zcr != null) {
+        zcr.releaseBuffer(compressed); // We copied the entire buffer.
+      }
       ranges.remove();
     }
     throw new IOException("EOF in while trying to read "
         + chunkLength + " bytes at " + cbStartOffset);
   }
 
-  private static void addOneCompressionBlockByteBuffer(
-      ByteBuffer data, boolean isUncompressed,
-      ListIterator<DiskRange> ranges, LowLevelCache cache,
-      ByteBuffer compressed, long cbStartOffset, int chunkLength,
+  private static void addOneCompressionBlockByteBuffer(ByteBuffer fullSourceBlock,
+      boolean isUncompressed, long cbStartOffset, int chunkLength,
+      LowLevelCache cache, ByteBuffer currentRangeData, ListIterator<DiskRange> ranges,
       List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) {
     // Prepare future cache buffer.
     LlapMemoryBuffer futureAlloc = cache.createUnallocated();
@@ -667,13 +702,13 @@ abstract class InStream extends InputStr
     // Add it to the list of work to decompress.
     long cbEndOffset = cbStartOffset + chunkLength;
     ProcCacheChunk cc = new ProcCacheChunk(
-        cbStartOffset, cbEndOffset, !isUncompressed, data, futureAlloc);
+        cbStartOffset, cbEndOffset, !isUncompressed, fullSourceBlock, futureAlloc);
     toDecompress.add(cc);
     // Adjust the compression block position.
-    compressed.position(compressed.position() + chunkLength);
+    currentRangeData.position(currentRangeData.position() + chunkLength);
     // Finally, put it in the ranges list for future use (if shared between RGs).
     // Before anyone else accesses it, it would have been allocated and decompressed locally.
-    if (compressed.remaining() <= 0) {
+    if (currentRangeData.remaining() <= 0) {
       ranges.set(cc);
     } else {
       ranges.previous();

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Wed Feb  4 02:30:54 2015
@@ -25,11 +25,10 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 
-
 /**
  * A row-by-row iterator for ORC files.
  */
@@ -89,24 +88,24 @@ public interface RecordReader {
 
   // TODO: maybe all of this should be moved to LLAP-specific class
   /**
-   * TODO: this API is subject to change; on one hand, external code should control the threading
-   *       aspects, with ORC method returning one EncodedColumn as it will; on the other, it's
-   *       more efficient for ORC to read stripe at once, apply RG-level sarg, etc., and thus
-   *       return many EncodedColumn-s.
    *  TODO: assumes the reader is for one stripe, otherwise the signature makes no sense.
    *        Also has no columns passed, because that is in ctor.
+   * @param stripeIncludes Includes to use for this call. This method ignores reader's includes.
    * @param colRgs What RGs are to be read. Has # of elements equal to the number of
    *               included columns; then each boolean is rgCount long.
    * @param cache Cache to get/put data and allocate memory.
    * @param consumer Consumer to pass the results too.
    * @throws IOException
    */
-  void readEncodedColumns(int stripeIx, boolean[][] colRgs,
-      LowLevelCache cache, Consumer<EncodedColumn<OrcBatchKey>> consumer) throws IOException;
+  void readEncodedColumns(int stripeIx, boolean[] stripeIncludes, boolean[][] colRgs,
+      LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException;
 
-  RowIndex[] getCurrentRowIndexEntries() throws IOException;
+  RowIndex[] getCurrentRowIndexEntries(boolean[] included) throws IOException;
 
   List<ColumnEncoding> getCurrentColumnEncodings() throws IOException;
 
-  void setRowIndex(RowIndex[] rowIndex);
+  List<OrcProto.Stream> getCurrentStreams() throws IOException;
+
+  void setMetadata(RowIndex[] index,
+      List<ColumnEncoding> encodings, List<OrcProto.Stream> streams);
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Feb  4 02:30:54 2015
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.io.orc
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY;
 
 import java.io.EOFException;
-import java.io.FileDescriptor;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -48,8 +47,8 @@ import org.apache.hadoop.hive.common.Dis
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
@@ -64,6 +63,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream.Kind;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -117,6 +117,8 @@ public class RecordReaderImpl implements
   List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
+  private List<OrcProto.ColumnEncoding> encodings;
+  private List<OrcProto.Stream> streamList;
   private final SargApplier sargApp;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
@@ -2654,22 +2656,17 @@ public class RecordReaderImpl implements
     if (sargApp == null) {
       return null;
     }
-    readRowIndex(currentStripe);
+    readRowIndex(currentStripe, included);
     return sargApp.pickRowGroups(stripes.get(currentStripe), indexes);
   }
 
-  @Override
-  public List<ColumnEncoding> getCurrentColumnEncodings() throws IOException {
-    return stripeFooter.getColumnsList();
-  }
-
   private void clearStreams() throws IOException {
     // explicit close of all streams to de-ref ByteBuffers
     for(InStream is: streams.values()) {
       is.close();
     }
     if(bufferChunks != null) {
-      if(zcr != null) {
+      if (zcr != null) {
         for (DiskRange range : bufferChunks) {
           if (range instanceof BufferChunk) {
             zcr.releaseBuffer(((BufferChunk)range).chunk);
@@ -2789,6 +2786,15 @@ public class RecordReaderImpl implements
 
   public static class CacheChunk extends DiskRange {
     public LlapMemoryBuffer buffer;
+    /** When we get (or allocate+put) memory buffer to cache, it's locked for us once. All is well
+     * if we unlock it once; but if we use the same buffer for 2+ RGs, we need to incRef again,
+     * or track our own separate refcount so we don't unlock more than once. We do the former.
+     * Every time we get or allocate, we put buffer in one cache chunk that is later used by all
+     * future lookups that happen to land within this DiskRange. When they call "setReused", they
+     * get back previous value. If we have not used this range for any RG yet, we don't need to
+     * notify cache; if it's called more than once, we are re-using this buffer and will incref.
+     */
+    private boolean isReused = false;
 
     public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
       super(offset, end);
@@ -2804,6 +2810,12 @@ public class RecordReaderImpl implements
     public ByteBuffer getData() {
       return buffer.byteBuffer;
     }
+
+    public boolean setReused() {
+      boolean result = isReused;
+      isReused = true;
+      return result;
+    }
   }
 
   private static final int BYTE_STREAM_POSITIONS = 1;
@@ -2973,7 +2985,6 @@ public class RecordReaderImpl implements
       long offset, long length, DiskRange lastRange, LinkedList<DiskRange> result) {
     for (int group = 0; group < includedRowGroups.length; ++group) {
       if (!includedRowGroups[group]) continue;
-      // TODO#: this code is relevant
       int posn = getIndexPosition(
           encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
       long start = index.getEntry(group).getPositions(posn);
@@ -3351,18 +3362,8 @@ public class RecordReaderImpl implements
     throw new IllegalArgumentException("Seek after the end of reader range");
   }
 
-  @Override
-  public OrcProto.RowIndex[] getCurrentRowIndexEntries() throws IOException {
-    return readRowIndex(currentStripe);
-  }
-
-  @Override
-  public void setRowIndex(OrcProto.RowIndex[] rowIndex) {
-    assert rowIndex.length == indexes.length;
-    System.arraycopy(rowIndex, 0, indexes, 0, rowIndex.length);
-  }
-
-  protected OrcProto.RowIndex[] readRowIndex(int stripeIndex) throws IOException {
+  protected OrcProto.RowIndex[] readRowIndex(
+      int stripeIndex, boolean[] included) throws IOException {
     long offset = stripes.get(stripeIndex).getOffset();
     OrcProto.StripeFooter stripeFooter;
     OrcProto.RowIndex[] indexes;
@@ -3419,7 +3420,7 @@ public class RecordReaderImpl implements
       currentStripe = rightStripe;
       readStripe();
     }
-    readRowIndex(currentStripe);
+    readRowIndex(currentStripe, included);
 
     // if we aren't to the right row yet, advance in the stripe.
     advanceToNextRow(reader, rowNumber, true);
@@ -3458,14 +3459,15 @@ public class RecordReaderImpl implements
   }
 
   @Override
-  public void readEncodedColumns(int stripeIx, boolean[][] colRgs, LowLevelCache cache,
-      Consumer<EncodedColumn<OrcBatchKey>> consumer) throws IOException {
+  public void readEncodedColumns(int stripeIx, boolean[] includes,  boolean[][] colRgs,
+      LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer) throws IOException {
     // Note: for now we don't have to setError here, caller will setError if we throw.
     // We are also not supposed to call setDone, since we are only part of the operation.
     StripeInformation stripe = stripes.get(currentStripe);
-    // TODO## GET FROM METADATA? same for indexes, remove set... method
-    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
-    List<ColumnEncoding> encodings = stripeFooter.getColumnsList();
+    List<OrcProto.Stream> streamList =
+        this.streamList != null ? this.streamList : stripeFooter.getStreamsList();
+    List<ColumnEncoding> encodings =
+        this.encodings != null ? this.encodings : stripeFooter.getColumnsList();
     LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
     long offset = 0;
     // Figure out which columns have a present stream
@@ -3526,59 +3528,56 @@ public class RecordReaderImpl implements
 
     // Finally, decompress data, map per RG, and return to caller.
     // We go by RG and not by column because that is how data is processed.
-    // TODO# We could build RG x all cols batches here cheaper and avoid building them on higher
-    //       level (except for HL cache data that higher level would add). Esp. useful before we
-    //       implement high-level cache. We could even alloc one return object and not per column!
     int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride);
     for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
       boolean isLastRg = rgIx == rgCount - 1;
-      OrcBatchKey bk = new OrcBatchKey(fileName, stripeIx, rgIx);
+      EncodedColumnBatch<OrcBatchKey> ecb = new EncodedColumnBatch<OrcBatchKey>(
+          new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0);
       for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
         if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) continue; // RG x col filtered.
         ColumnReadContext ctx = colCtxs[colIxMod];
-        EncodedColumn<OrcBatchKey> encodedColumn = new EncodedColumn<OrcBatchKey>(
-            bk, ctx.colIx, ctx.streamCount);
         RowIndexEntry index = indexes[ctx.colIx].getEntry(rgIx);
+        ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           OrcProto.Stream stream = ctx.streams[streamIx];
+          StreamBuffer cb = null;
           if (isStripeLevelStream(stream.getKind(), ctx.encoding.getKind())) {
             // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
             if (ctx.stripeLevelStreams == null) {
               ctx.stripeLevelStreams = new StreamBuffer[ctx.streamCount];
             }
-            StreamBuffer cb = ctx.stripeLevelStreams[streamIx];
+            cb = ctx.stripeLevelStreams[streamIx];
             if (cb == null) {
-              long streamOffset = ctx.streamOffsets[streamIx];
-              cb = ctx.stripeLevelStreams[streamIx] = new StreamBuffer(0, -1);
+              cb = ctx.stripeLevelStreams[streamIx] = new StreamBuffer();
               // We will be using this for each RG while also sending RGs to processing.
               // To avoid buffers being unlocked, run refcount one ahead; we will not increase
               // it when building the last RG, so each RG processing will decref once, and the
-              // last one will unlock the buffers. Cheaper than locking the buffers 500 times.
+              // last one will unlock the buffers.
               cb.incRef();
-              InStream.uncompressStream(fileName, ctx.bufferIters[streamIx],
+              InStream.uncompressStream(fileName, zcr, ctx.bufferIters[streamIx],
                   codec, bufferSize, cache, -1, -1, cb);
               ctx.buffers[streamIx] = null;
             }
             if (!isLastRg) {
               cb.incRef();
             }
-            encodedColumn.streamData[streamIx] = cb;
           } else {
             // This stream can be separated by RG using index. Let's do that.
             // TODO#: determine start offset, end offset from index; nexts can be end of stream.
+            //        Either see getIndexPosition or
+            //        https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-ColumnEncodings
             long cOffset = 0, nextCOffset = 0, nextNextCOffset = 0;
-            int ucOffset = 0, nextUcOffset = 0;
-            StreamBuffer cb = new StreamBuffer(0, -1);
+            int nextUcOffset = 0;
+            cb = new StreamBuffer();
             cb.incRef();
-            cb.firstOffset = ucOffset; // We go by compression block, so this is always true.
             long startCOffset = cOffset;
             long endCOffset = (nextUcOffset == 0) ? nextCOffset : nextNextCOffset;
-            // TODO#: HERE
-            InStream.uncompressStream(fileName,
-                ctx.bufferIters[streamIx], codec, bufferSize, cache, startCOffset, endCOffset, cb);
+            InStream.uncompressStream(fileName, zcr, ctx.bufferIters[streamIx],
+                codec, bufferSize, cache, startCOffset, endCOffset, cb);
           }
+          ecb.setStreamData(colIxMod, streamIx, cb);
         }
-        consumer.consumeData(encodedColumn);
+        consumer.consumeData(ecb);
       }
     }
 
@@ -3593,86 +3592,28 @@ public class RecordReaderImpl implements
         || encoding == ColumnEncoding.Kind.DICTIONARY_V2));
   }
 
-  /* Old prototype code to read stripes one column at a time, with limited output space.
-  /**
-   * Iterator-like context to read ORC as a sequence of column x stripe "cells".
-   * TODO: for this to actually be an iterator-like thing, we need to clone nested reader state.
-   *       As of now, we advance parent's shared column readers separately, which would cause
-   *       other calls (e.g. nextBatch) to break once nextColumnStripe is called. Currently,
-   *       it is always called alone, so this is ok; context is merely a convenience class.
-   * /
-  private static class ColumnReadContext {
-    public ColumnReadContext(StructTreeReader reader) {
-      StructTreeReader structReader = (StructTreeReader)reader;
-      readers = new TreeReader[structReader.getReaderCount()];
-      for (int i = 0; i < readers.length; ++i) {
-        readers[i] = structReader.getColumnReader(i);
-      }
-    }
-    /** Readers for each separate column; no nulls, just the columns being read. * /
-    private final TreeReader[] readers;
-    /** Remembered row offset after a partial read of one column from stripe. * /
-    private long rowInStripe = 0;
-    /** Next column to be read (index into readers). * /
-    private int columnIx = 0;
-    /** Remaining row count for current stripe; same for every column, so don't recompute. * /
-    private long remainingToReadFromStart = -1;
-    /** Whether the next call will be the first for this column x stripe. TODO: derive? * /
-    private boolean firstCall = true;
+  @Override
+  public List<ColumnEncoding> getCurrentColumnEncodings() throws IOException {
+    return stripeFooter.getColumnsList();
   }
 
   @Override
-  public Object prepareColumnRead() {
-    return new ColumnReadContext((StructTreeReader)this.reader);
+  public OrcProto.RowIndex[] getCurrentRowIndexEntries(boolean[] included) throws IOException {
+    return readRowIndex(currentStripe, included);
   }
 
   @Override
-  public void readNextColumnStripe(Object ctxObj) throws IOException {
-    ColumnReadContext ctx = (ColumnReadContext)ctxObj;
-    if (rowInStripe >= rowCountInStripe) {
-      assert ctx.columnIx == 0;
-      currentStripe += 1;
-      readStripe();
-    }
-    long rowInStripeGlobal = rowInStripe; // Remember the global state.
-    rowInStripe = ctx.rowInStripe;
-    if (ctx.columnIx == 0 && ctx.firstCall) {
-      // We are starting a new stripe - remember the number of rows to read (same for all cols).
-      // Doesn't take into account space remaining in ChunkWriter.
-      ctx.remainingToReadFromStart = computeBatchSize(Long.MAX_VALUE);
-    }
-    long remainingToRead =
-        ctx.firstCall ? ctx.remainingToReadFromStart : computeBatchSize(Long.MAX_VALUE);
-    TreeReader columnReader = ctx.readers[ctx.columnIx];
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Calling nextChunk for " + remainingToRead);
-    }
-    long rowsRead = (read data was here)
-    assert rowsRead <= remainingToRead;
-    rowInStripe += rowsRead;
-    boolean doneWithColumnStripe = (rowsRead == remainingToRead); // always true for stripes
-    ctx.firstCall = doneWithColumnStripe; // If we are not done, there will be more calls.
-    if (!doneWithColumnStripe) {
-      // Note that we are only advancing the reader for the current column.
-      boolean hasRows = advanceToNextRow(columnReader, rowInStripe + rowBaseInStripe, false);
-      ctx.rowInStripe = rowInStripe; // Remember the current value for next call.
-      if (!hasRows) {
-        throw new AssertionError("No rows after advance; read "
-            + rowsRead + " out of " + remainingToRead);
-      }
-    } else {
-      // Done with some column + stripe.
-      ++ctx.columnIx;
-      if (ctx.columnIx == ctx.readers.length) {
-        // Done with the last column in this stripe; advance the global rowInStripe.
-        ctx.columnIx = 0;
-        ctx.rowInStripe = rowInStripeGlobal = rowInStripe;
-      } else {
-        // Revert the state back to start of stripe.
-        ctx.rowInStripe = rowInStripeGlobal;
-      }
-    }
-    rowInStripe = rowInStripeGlobal; // Restore global state.
-    return !doneWithColumnStripe;
-  }*/
+  public List<Stream> getCurrentStreams() throws IOException {
+    return stripeFooter.getStreamsList();
+  }
+
+  @Override
+  public void setMetadata(
+      RowIndex[] index, List<ColumnEncoding> encodings, List<Stream> streams) {
+    assert index.length == indexes.length;
+    System.arraycopy(index, 0, indexes, 0, index.length);
+    this.streamList = streams;
+    this.encodings = encodings;
+  }
+
 }

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java?rev=1656977&r1=1656976&r2=1656977&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java Wed Feb  4 02:30:54 2015
@@ -672,7 +672,7 @@ public class TestOrcFile {
     assertEquals(5000, ((StringColumnStatistics)ss3.getColumnStatistics()[2]).getSum());
 
     RecordReaderImpl recordReader = (RecordReaderImpl) reader.rows();
-    OrcProto.RowIndex[] index = recordReader.readRowIndex(0);
+    OrcProto.RowIndex[] index = recordReader.readRowIndex(0, null);
     assertEquals(3, index.length);
     List<OrcProto.RowIndexEntry> items = index[1].getEntryList();
     assertEquals(1, items.size());
@@ -682,7 +682,7 @@ public class TestOrcFile {
     assertEquals(0, items.get(0).getPositions(2));
     assertEquals(1, 
                  items.get(0).getStatistics().getIntStatistics().getMinimum());
-    index = recordReader.readRowIndex(1);
+    index = recordReader.readRowIndex(1, null);
     assertEquals(3, index.length);
     items = index[1].getEntryList();
     assertEquals(2, 



Mime
View raw message