hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1661297 - in /hive/branches/llap: common/src/java/org/apache/hadoop/hive/common/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/test/org/apache/hadoop/...
Date Sat, 21 Feb 2015 08:01:06 GMT
Author: sershe
Date: Sat Feb 21 08:01:06 2015
New Revision: 1661297

URL: http://svn.apache.org/r1661297
Log:
Change the way DiskRange-s are managed, and fix decref for cache, some more bugfixes

Added:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
    hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java

Added: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java?rev=1661297&view=auto
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java (added)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java Sat Feb 21 08:01:06 2015
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** Java linked list iterator interface is convoluted, and moreover concurrent modifications
+ * of the same list by multiple iterators are impossible. Hence, this.
+ * Java also doesn't support multiple inheritance, so this cannot be done as "aspect"... */
+public class DiskRangeList extends DiskRange {
+  private static final Log LOG = LogFactory.getLog(DiskRangeList.class);
+  public DiskRangeList prev, next;
+
+  public DiskRangeList(long offset, long end) {
+    super(offset, end);
+  }
+
+  /** Replaces this element with another in the list; returns the new element. */
+  public DiskRangeList replaceSelfWith(DiskRangeList other) {
+    other.prev = this.prev;
+    other.next = this.next;
+    if (this.prev != null) {
+      this.prev.next = other;
+    }
+    if (this.next != null) {
+      this.next.prev = other;
+    }
+    this.next = this.prev = null;
+    return other;
+  }
+
+  /** Inserts an element before current in the list; returns the new element. */
+  public DiskRangeList insertBefore(DiskRangeList other) {
+    other.prev = this.prev;
+    other.next = this;
+    if (this.prev != null) {
+      this.prev.next = other;
+    }
+    this.prev = other;
+    return other;
+  }
+
+  /** Inserts an element after current in the list; returns the new element. */
+  public DiskRangeList insertAfter(DiskRangeList other) {
+    other.next = this.next;
+    other.prev = this;
+    if (this.next != null) {
+      this.next.prev = other;
+    }
+    this.next = other;
+    return other;
+  }
+
+  /** Removes an element after current from the list. */
+  public void removeAfter() {
+    DiskRangeList other = this.next;
+    this.next = other.next;
+    if (this.next != null) {
+      this.next.prev = this;
+    }
+    other.next = other.prev = null;
+  }
+
+  /** Removes the current element from the list. */
+  public void removeSelf() {
+    if (this.prev != null) {
+      this.prev.next = this.next;
+    }
+    if (this.next != null) {
+      this.next.prev = this.prev;
+    }
+    this.next = this.prev = null;
+  }
+
+  /** Splits current element in the list, using DiskRange::slice */
+  public DiskRangeList split(long cOffset) {
+    insertAfter((DiskRangeList)this.slice(cOffset, end));
+    return replaceSelfWith((DiskRangeList)this.slice(offset, cOffset));
+  }
+
+  @VisibleForTesting
+  public int listSize() {
+    int result = 1;
+    DiskRangeList current = this.next;
+    while (current != null) {
+      ++result;
+      current = current.next;
+    }
+    return result;
+  }
+
+  @VisibleForTesting
+  public DiskRangeList[] listToArray() {
+    DiskRangeList[] result = new DiskRangeList[listSize()];
+    int i = 0;
+    DiskRangeList current = this.next;
+    while (current != null) {
+      result[i] = current;
+      ++i;
+      current = current.next;
+    }
+    return result;
+  }
+
+  public static class DiskRangeListCreateHelper {
+    private DiskRangeList tail = null, head;
+    public DiskRangeListCreateHelper() {
+    }
+
+    public DiskRangeList getTail() {
+      return tail;
+    }
+
+    public void addOrMerge(long offset, long end, boolean doMerge, boolean doLogNew) {
+      if (doMerge && tail != null && overlap(tail.offset, tail.end, offset, end)) {
+        tail.offset = Math.min(tail.offset, offset);
+        tail.end = Math.max(tail.end, end);
+      } else {
+        if (doLogNew) {
+          LOG.info("Creating new range; last range (which can include some previous adds) was "
+              + tail);
+        }
+        DiskRangeList node = new DiskRangeList(offset, end);
+        if (tail == null) {
+          head = tail = node;
+        } else {
+          tail = tail.insertAfter(node);
+        }
+      }
+    }
+
+    private static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+      if (leftA <= leftB) {
+        return rightA >= leftB;
+      }
+      return rightB >= leftA;
+    }
+
+    public DiskRangeList get() {
+      return head;
+    }
+
+    public DiskRangeList extract() {
+      DiskRangeList result = head;
+      head = null;
+      return result;
+    }
+  }
+
+  /**
+   * List in-place mutation helper - a bogus first element that is inserted before list head,
+   * and thus remains constant even if head is replaced with some new range via in-place list
+   * mutation. extract() can be used to obtain the modified list.
+   */
+  public static class DiskRangeListMutateHelper extends DiskRangeList {
+    public DiskRangeListMutateHelper(DiskRangeList head) {
+      super(-1, -1);
+      assert head != null;
+      assert head.prev == null;
+      this.next = head;
+      head.prev = this;
+    }
+
+    public DiskRangeList get() {
+      return next;
+    }
+
+    public DiskRangeList extract() {
+      DiskRangeList result = this.next;
+      assert result != null;
+      this.next = result.prev = null;
+      return result;
+    }
+  }
+}
\ No newline at end of file

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Sat Feb 21 08:01:06 2015
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.hive.llap.io.api.cache;
 
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
 
 
 public interface LowLevelCache {
@@ -29,8 +29,9 @@ public interface LowLevelCache {
    * Gets file data for particular offsets. Null entries mean no data.
    * @param file File name; MUST be interned.
    * @param base base offset for the ranges (stripe offset in case of ORC).
+   * @return 
    */
-  void getFileData(String fileName, LinkedList<DiskRange> ranges, long base);
+  DiskRangeList getFileData(String fileName, DiskRangeList range, long baseOffset);
 
   /**
    * Puts file data into cache.

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Sat Feb 21 08:01:06 2015
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
@@ -71,22 +73,30 @@ public class LowLevelCacheImpl implement
   }
 
   @Override
-  public void getFileData(String fileName, LinkedList<DiskRange> ranges, long baseOffset) {
+  public DiskRangeList getFileData(String fileName, DiskRangeList ranges, long baseOffset) {
+    if (ranges == null) return null;
     FileCache subCache = cache.get(fileName);
-    if (subCache == null || !subCache.incRef()) return;
+    if (subCache == null || !subCache.incRef()) return ranges;
     try {
-      ListIterator<DiskRange> dr = ranges.listIterator();
-      while (dr.hasNext()) {
-        getOverlappingRanges(baseOffset, dr, subCache.cache);
+      DiskRangeList prev = ranges.prev;
+      if (prev == null) {
+        prev = new DiskRangeListMutateHelper(ranges);
       }
+      DiskRangeList current = ranges;
+      while (current != null) {
+        // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
+        DiskRangeList next = current.next;
+        getOverlappingRanges(baseOffset, current, subCache.cache);
+        current = next;
+      }
+      return prev.next;
     } finally {
       subCache.decRef();
     }
   }
 
-  private void getOverlappingRanges(long baseOffset, ListIterator<DiskRange> drIter,
+  private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCached,
       ConcurrentSkipListMap<Long, LlapCacheableBuffer> cache) {
-    DiskRange currentNotCached = drIter.next();
     Iterator<Map.Entry<Long, LlapCacheableBuffer>> matches = cache.subMap(
         currentNotCached.offset + baseOffset, currentNotCached.end + baseOffset)
         .entrySet().iterator();
@@ -108,46 +118,46 @@ public class LowLevelCacheImpl implement
       }
       cacheEnd = cacheOffset + buffer.declaredLength;
       CacheChunk currentCached = new CacheChunk(buffer, cacheOffset, cacheEnd);
-      currentNotCached = addCachedBufferToIter(drIter, currentNotCached, currentCached);
+      currentNotCached = addCachedBufferToIter(currentNotCached, currentCached);
       // Now that we've added it into correct position, we can adjust it by base offset.
       currentCached.shiftBy(-baseOffset);
     }
   }
 
-  private DiskRange addCachedBufferToIter(ListIterator<DiskRange> drIter,
-      DiskRange currentNotCached, CacheChunk currentCached) {
+  /**
+   * Adds cached buffer to buffer list.
+   * @param currentNotCached Pointer to the list node where we are inserting.
+   * @param currentCached The cached buffer found for this node, to insert.
+   * @return The new currentNotCached pointer, following the cached buffer insertion.
+   */
+  private DiskRangeList addCachedBufferToIter(
+      DiskRangeList currentNotCached, CacheChunk currentCached) {
     // Both currentNotCached and currentCached already include baseOffset.
     if (currentNotCached.offset == currentCached.offset) {
       if (currentNotCached.end <= currentCached.end) {  // we assume it's always "==" now
         // Replace the entire current DiskRange with new cached range.
-        drIter.set(currentCached);
-        currentNotCached = null;
+        currentNotCached.replaceSelfWith(currentCached);
+        return null;
       } else {
         // Insert the new cache range before the disk range.
         currentNotCached.offset = currentCached.end;
-        drIter.previous();
-        drIter.add(currentCached); 
-        DiskRange dr = drIter.next();
-        assert dr == currentNotCached;
+        currentNotCached.insertBefore(currentCached);
+        return currentNotCached;
       }
     } else {
       assert currentNotCached.offset < currentCached.offset;
       long originalEnd = currentNotCached.end;
       currentNotCached.end = currentCached.offset;
-      drIter.add(currentCached);
+      currentNotCached.insertAfter(currentCached);
       if (originalEnd <= currentCached.end) { // we assume it's always "==" now
-        // We have reached the end of the range and truncated the last non-cached range.
-        currentNotCached = null;
+        return null;  // No more matches expected...
       } else {
         // Insert the new disk range after the cache range. TODO: not strictly necessary yet?
-        currentNotCached = new DiskRange(currentCached.end, originalEnd);
-        drIter.add(currentNotCached);
-        DiskRange dr = drIter.previous();
-        assert dr == currentNotCached;
-        drIter.next();
+        currentNotCached = new DiskRangeList(currentCached.end, originalEnd);
+        currentCached.insertAfter(currentNotCached);
+        return currentNotCached;
       }
     }
-    return currentNotCached;
   }
 
   private boolean lockBuffer(LlapCacheableBuffer buffer) {

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Sat Feb 21 08:01:06 2015
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -31,6 +29,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
@@ -106,8 +107,8 @@ public class TestLowLevelCacheImpl {
   }
 
   private void verifyCacheGet(LowLevelCacheImpl cache, String fileName, Object... stuff) {
-    LinkedList<DiskRange> input = new LinkedList<DiskRange>();
-    Iterator<DiskRange> iter = null;
+    DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+    DiskRangeList iter = null;
     int intCount = 0, lastInt = -1;
     int resultCount = stuff.length;
     for (Object obj : stuff) {
@@ -118,26 +119,24 @@ public class TestLowLevelCacheImpl {
           lastInt = (Integer)obj;
           intCount = 1;
         } else {
-          input.add(new DiskRange(lastInt, (Integer)obj));
+          list.addOrMerge(lastInt, (Integer)obj, true, true);
           intCount = 0;
         }
         continue;
       } else if (intCount >= 0) {
         assertTrue(intCount == 0);
-        assertFalse(input.isEmpty());
         intCount = -1;
-        cache.getFileData(fileName, input, 0);
-        assertEquals(resultCount, input.size());
-        iter = input.iterator();
+        iter = cache.getFileData(fileName, list.get(), 0);
+        assertEquals(resultCount, iter.listSize());
       }
-      assertTrue(iter.hasNext());
-      DiskRange next = iter.next();
+      assertTrue(iter != null);
       if (obj instanceof LlapMemoryBuffer) {
-        assertTrue(next instanceof CacheChunk);
-        assertSame(obj, ((CacheChunk)next).buffer);
+        assertTrue(iter instanceof CacheChunk);
+        assertSame(obj, ((CacheChunk)iter).buffer);
       } else {
-        assertTrue(next.equals(obj));
+        assertTrue(iter.equals(obj));
       }
+      iter = iter.next;
     }
   }
 
@@ -217,25 +216,36 @@ public class TestLowLevelCacheImpl {
             String fileName = isFn1 ? fn1 : fn2;
             int fileIndex = isFn1 ? 1 : 2;
             int count = rdm.nextInt(offsetsToUse);
-            LinkedList<DiskRange> input = new LinkedList<DiskRange>();
-            int[] offsets = new int[count];
-            for (int j = 0; j < count; ++j) {
-              int next = rdm.nextInt(offsetsToUse);
-              input.add(dr(next, next + 1));
-              offsets[j] = next;
-            }
             if (isGet) {
-              cache.getFileData(fileName, input, 0);
+              DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+              int[] offsets = new int[count];
+              for (int j = 0; j < count; ++j) {
+                int next = rdm.nextInt(offsetsToUse);
+                list.addOrMerge(next, next + 1, true, false);
+                offsets[j] = next;
+              }
+              DiskRangeList iter = cache.getFileData(fileName, list.get(), 0);
               int j = -1;
-              for (DiskRange dr : input) {
+              while (iter != null) {
                 ++j;
-                if (!(dr instanceof CacheChunk)) continue;
+                if (!(iter instanceof CacheChunk)) {
+                  iter = iter.next;
+                  continue;
+                }
                 ++gets;
-                LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)dr).buffer;
+                LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)iter).buffer;
                 assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
                 cache.releaseBuffer(result);
+                iter = iter.next;
               }
             } else {
+              DiskRange[] ranges = new DiskRange[count];
+              int[] offsets = new int[count];
+              for (int j = 0; j < count; ++j) {
+                int next = rdm.nextInt(offsetsToUse);
+                ranges[j] = dr(next, next + 1);
+                offsets[j] = next;
+              }
               LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count];
               for (int j = 0; j < offsets.length; ++j) {
                 LlapCacheableBuffer buf = LowLevelCacheImpl.allocateFake();
@@ -243,8 +253,7 @@ public class TestLowLevelCacheImpl {
                 buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
                 buffers[j] = buf;
               }
-              long[] mask = cache.putFileData(
-                  fileName, input.toArray(new DiskRange[count]), buffers, 0);
+              long[] mask = cache.putFileData(fileName, ranges, buffers, 0);
               puts += buffers.length;
               long maskVal = 0;
               if (mask != null) {
@@ -276,18 +285,15 @@ public class TestLowLevelCacheImpl {
       public Integer call() {
         boolean isFirstFile = false;
         Random rdm = new Random(1234 + Thread.currentThread().getId());
-        LinkedList<DiskRange> input = new LinkedList<DiskRange>();
-        DiskRange allOffsets = new DiskRange(0, offsetsToUse + 1);
         int evictions = 0;
         syncThreadStart(cdlIn, cdlOut);
         while (rdmsDone.get() < 3) {
-          input.clear();
-          input.add(allOffsets);
+          DiskRangeList head = new DiskRangeList(0, offsetsToUse + 1);
           isFirstFile = !isFirstFile;
           String fileName = isFirstFile ? fn1 : fn2;
-          cache.getFileData(fileName, input, 0);
-          DiskRange[] results = input.toArray(new DiskRange[input.size()]);
-          int startIndex = rdm.nextInt(input.size()), index = startIndex;
+          head = cache.getFileData(fileName, head, 0);
+          DiskRange[] results = head.listToArray();
+          int startIndex = rdm.nextInt(results.length), index = startIndex;
           LlapCacheableBuffer victim = null;
           do {
             DiskRange r = results[index];
@@ -371,8 +377,8 @@ public class TestLowLevelCacheImpl {
     return fake;
   }
 
-  private DiskRange dr(int from, int to) {
-    return new DiskRange(from, to);
+  private DiskRangeList dr(int from, int to) {
+    return new DiskRangeList(from, to);
   }
 
   private DiskRange[] drs(int... offsets) {

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java Sat Feb 21 08:01:06 2015
@@ -28,6 +28,9 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hive.ql.io.orc.
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
 import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
 
@@ -129,7 +133,7 @@ public class EncodedReaderImpl implement
     public final int streamIndexOffset;
     public final OrcProto.Stream.Kind kind;
     /** Iterators for the buffers; used to maintain position in per-rg reading. */
-    ListIterator<DiskRange> bufferIter;
+    DiskRangeList bufferIter;
     /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
     StreamBuffer stripeLevelStream;
 
@@ -152,14 +156,12 @@ public class EncodedReaderImpl implement
     // We are also not supposed to call setDone, since we are only part of the operation.
     long stripeOffset = stripe.getOffset();
     // 1. Figure out what we have to read.
-    LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
     long offset = 0; // Stream offset in relation to the stripe.
     // 1.1. Figure out which columns have a present stream
     boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
     if (DebugUtils.isTraceOrcEnabled()) {
       LOG.info("The following columns have PRESENT streams: " + DebugUtils.toString(hasNull));
     }
-    DiskRange lastRange = null;
 
     // We assume stream list is sorted by column and that non-data
     // streams do not interleave data streams for the same column.
@@ -168,6 +170,8 @@ public class EncodedReaderImpl implement
     ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
     boolean[] includedRgs = null;
     boolean isCompressed = (codec != null);
+
+    DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper();
     for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
       int colIx = stream.getColumn();
@@ -202,71 +206,61 @@ public class EncodedReaderImpl implement
             + ", " + length + ", index position " + indexIx);
       }
       if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
-        lastRange = RecordReaderUtils.addEntireStreamToRanges(
-            offset, length, lastRange, rangesToRead);
+        RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true);
         if (DebugUtils.isTraceOrcEnabled()) {
-          LOG.info("Will read whole stream " + streamKind + "; added to " + lastRange);
+          LOG.info("Will read whole stream " + streamKind + "; added to " + listToRead.getTail());
         }
       } else {
-        lastRange = RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
+        RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
             codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx),
-            bufferSize, hasNull[colIx], offset, length, lastRange, rangesToRead);
+            bufferSize, hasNull[colIx], offset, length, listToRead, true);
       }
       offset += length;
     }
 
     // 2. Now, read all of the ranges from cache or disk.
+    DiskRangeListMutateHelper toRead = new DiskRangeListMutateHelper(listToRead.get());
     if (DebugUtils.isTraceOrcEnabled()) {
-      LOG.info("Resulting disk ranges to read: "
-          + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+      LOG.info("Resulting disk ranges to read: " + RecordReaderUtils.stringifyDiskRanges(toRead));
     }
     if (cache != null) {
-      cache.getFileData(fileName, rangesToRead, stripeOffset);
+      cache.getFileData(fileName, toRead.next, stripeOffset);
       if (DebugUtils.isTraceOrcEnabled()) {
         LOG.info("Disk ranges after cache (base offset " + stripeOffset
-            + "): " + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+            + "): " + RecordReaderUtils.stringifyDiskRanges(toRead));
       }
     }
+
     // Force direct buffers if we will be decompressing to direct cache.
-    RecordReaderUtils.readDiskRanges(file, zcr, stripeOffset, rangesToRead, cache.isDirectAlloc());
+    RecordReaderUtils.readDiskRanges(file, zcr, stripeOffset, toRead.next, cache.isDirectAlloc());
 
-    // 2.1. Separate buffers (relative to stream offset) for each stream from the data we have.
-    // TODO: given how we read, we could potentially get rid of this step?
-    for (ColumnReadContext colCtx : colCtxs) {
-      for (int i = 0; i < colCtx.streamCount; ++i) {
-        StreamContext sctx = colCtx.streams[i];
-        List<DiskRange> sb = RecordReaderUtils.getStreamBuffers(
-            rangesToRead, sctx.offset, sctx.length);
-        sctx.bufferIter = sb.listIterator();
-        if (DebugUtils.isTraceOrcEnabled()) {
-          LOG.info("Column " + colCtx.colIx + " stream " + sctx.kind + " at " + sctx.offset + ","
-              + sctx.length + " got ranges (relative to stream) "
-              + RecordReaderUtils.stringifyDiskRanges(sb));
-        }
-      }
+    if (DebugUtils.isTraceOrcEnabled()) {
+      LOG.info("Disk ranges after disk read  (base offset " + stripeOffset
+            + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
     }
 
     // 3. Finally, decompress data, map per RG, and return to caller.
     // We go by RG and not by column because that is how data is processed.
     int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
+    DiskRangeList iter = toRead.next; // Keep "toRead" list for future use, don't extract().
     for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
-      boolean isLastRg = rgCount - rgIx - 1 == 0;
+      boolean isLastRg = rgIx == rgCount - 1;
       // Create the batch we will use to return data for this RG.
       EncodedColumnBatch<OrcBatchKey> ecb = new EncodedColumnBatch<OrcBatchKey>(
           new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0);
       boolean isRGSelected = true;
       for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
         if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
+          // RG x col filtered.
           isRGSelected = false;
-          continue;
-        } // RG x col filtered.
+          continue; // TODO#: this would be invalid with HL cache, where RG x col can be excluded.
+        }
         ColumnReadContext ctx = colCtxs[colIxMod];
         RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
             nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
         ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
           StreamContext sctx = ctx.streams[streamIx];
-          long absStreamOffset = stripeOffset + sctx.offset;
           StreamBuffer cb = null;
           if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
             // This stream is for entire stripe and needed for every RG; uncompress once and reuse.
@@ -274,13 +268,26 @@ public class EncodedReaderImpl implement
               LOG.info("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
                   + " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
             }
-            cb = getStripeLevelStream(absStreamOffset, sctx, cache, isLastRg);
+            if (sctx.stripeLevelStream == null) {
+              sctx.stripeLevelStream = new StreamBuffer(sctx.kind.getNumber());
+              // We will be using this for each RG while also sending RGs to processing.
+              // To avoid buffers being unlocked, run refcount one ahead; we will not increase
+              // it when building the last RG, so each RG processing will decref once, and the
+              // last one will unlock the buffers.
+              sctx.stripeLevelStream.incRef();
+              iter = InStream.uncompressStream(fileName, stripeOffset, iter, sctx.offset,
+                  sctx.offset + sctx.length, zcr, codec, bufferSize, cache, sctx.stripeLevelStream);
+            }
+            if (!isLastRg) {
+              sctx.stripeLevelStream.incRef();
+            }
+            cb = sctx.stripeLevelStream;
           } else {
             // This stream can be separated by RG using index. Let's do that.
-            long cOffset = index.getPositions(sctx.streamIndexOffset),
-                endCOffset = RecordReaderUtils.estimateRgEndOffset(isCompressed, isLastRg,
-                    isLastRg ? sctx.length : nextIndex.getPositions(sctx.streamIndexOffset),
-                    sctx.length, bufferSize);
+            long cOffset = index.getPositions(sctx.streamIndexOffset) + sctx.offset,
+                nextCOffset = isLastRg ? sctx.length : nextIndex.getPositions(sctx.streamIndexOffset),
+                endCOffset = RecordReaderUtils.estimateRgEndOffset(
+                    isCompressed, isLastRg, nextCOffset, sctx.length, bufferSize) + sctx.offset;
             cb = new StreamBuffer(sctx.kind.getNumber());
             cb.incRef();
             if (DebugUtils.isTraceOrcEnabled()) {
@@ -289,8 +296,11 @@ public class EncodedReaderImpl implement
                   + sctx.length + " index position " + sctx.streamIndexOffset + ": compressed ["
                   + cOffset + ", " + endCOffset + ")");
             }
-            InStream.uncompressStream(fileName, absStreamOffset, zcr, sctx.bufferIter,
-                codec, bufferSize, cache, cOffset, endCOffset, cb);
+            boolean isStartOfStream = sctx.bufferIter == null;
+            DiskRangeList range = isStartOfStream ? iter : sctx.bufferIter;
+            DiskRangeList next = InStream.uncompressStream(fileName, stripeOffset, range, cOffset,
+                endCOffset, zcr, codec, bufferSize, cache, cb);
+            sctx.bufferIter = iter = next; // Reset iter just to ensure it's valid
           }
           ecb.setStreamData(colIxMod, streamIx, cb);
         }
@@ -299,31 +309,14 @@ public class EncodedReaderImpl implement
         consumer.consumeData(ecb);
       }
     }
-    // TODO: WE NEED TO DECREF ALL THE CACHE BUFFERS ONCE
-  }
 
-  /**
-   * Reads the entire stream for a column (e.g. a dictionary stream), or gets it from context.
-   * @param isLastRg Whether the stream is being read for last RG in stripe.
-   * @return StreamBuffer that contains the entire stream.
-   */
-  private StreamBuffer getStripeLevelStream(long baseOffset, StreamContext ctx,
-      LowLevelCache cache, boolean isLastRg) throws IOException {
-    if (ctx.stripeLevelStream == null) {
-      ctx.stripeLevelStream = new StreamBuffer(ctx.kind.getNumber());
-      // We will be using this for each RG while also sending RGs to processing.
-      // To avoid buffers being unlocked, run refcount one ahead; we will not increase
-      // it when building the last RG, so each RG processing will decref once, and the
-      // last one will unlock the buffers.
-      ctx.stripeLevelStream.incRef();
-      InStream.uncompressStream(fileName, baseOffset, zcr,
-          ctx.bufferIter, codec, bufferSize, cache, -1, -1, ctx.stripeLevelStream);
-      ctx.bufferIter = null;
-    }
-    if (!isLastRg) {
-      ctx.stripeLevelStream.incRef();
+    DiskRangeList toFree = toRead.next;
+    while (toFree != null) {
+      if (toFree instanceof CacheChunk) {
+        cache.releaseBuffer(((CacheChunk)toFree).buffer);
+      }
+      toFree = toFree.next;
     }
-    return ctx.stripeLevelStream;
   }
 
   @Override

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Sat Feb 21 08:01:06 2015
@@ -21,12 +21,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.LogLevels;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
@@ -142,7 +144,7 @@ public abstract class InStream extends I
             (desired - curRange.offset) < curRange.getLength()) {
           currentOffset = desired;
           currentRange = i;
-          this.range = curRange.getData();
+          this.range = curRange.getData().duplicate();
           int pos = range.position();
           pos += (int)(desired - curRange.offset); // this is why we duplicate
           this.range.position(pos);
@@ -549,13 +551,22 @@ public abstract class InStream extends I
   /**
    * Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
    * and remove what we have returned. We will keep iterator as a "hint" point.
-   * TODO: Java LinkedList and iter have a really stupid interface. Replace with own simple one?
-   * @param zcr
+   * @param fileName File name for cache keys.
+   * @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys.
+   * @param start Ordered ranges containing file data. Helpful if they point close to cOffset.
+   * @param cOffset Start offset to decompress.
+   * @param endCOffset End offset to decompress; estimate, partial CBs will be ignored.
+   * @param zcr Zero-copy reader, if any, to release discarded buffers.
+   * @param codec Compression codec.
+   * @param bufferSize Compressed buffer (CB) size.
+   * @param cache Low-level cache to cache new data.
+   * @param streamBuffer Stream buffer, to add the results.
+   * @return Last buffer cached during decomrpession. Cache buffers are never removed from
+   *         the master list, so they are safe to keep as iterators for various streams.
    */
-  public static void uncompressStream(String fileName, long baseOffset,
-      ZeroCopyReaderShim zcr, ListIterator<DiskRange> ranges,
-      CompressionCodec codec, int bufferSize, LowLevelCache cache,
-      long cOffset, long endCOffset, StreamBuffer streamBuffer)
+  public static DiskRangeList uncompressStream(String fileName, long baseOffset,
+      DiskRangeList start, long cOffset, long endCOffset, ZeroCopyReaderShim zcr,
+      CompressionCodec codec,int bufferSize, LowLevelCache cache, StreamBuffer streamBuffer)
           throws IOException {
     streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
     List<ProcCacheChunk> toDecompress = null;
@@ -564,20 +575,20 @@ public abstract class InStream extends I
     // 1. Find our bearings in the stream. Normally, iter will already point either to where we
     // want to be, or just before. However, RGs can overlap due to encoding, so we may have
     // to return to a previous block.
-    DiskRange current = findCompressedPosition(ranges, cOffset);
+    DiskRangeList current = findCompressedPosition(start, cOffset);
     if (DebugUtils.isTraceOrcEnabled()) {
       LOG.info("Starting uncompressStream for [" + cOffset + "," + endCOffset + ") at " + current);
     }
 
     // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
-    if (cOffset >= 0 && cOffset != current.offset) {
-      // We adjust offsets as we decompress, we expect to decompress sequentially, and we cache and
-      // decompress entire CBs (obviously). Therefore the offset in the next DiskRange should
-      // always be the start offset of a CB. TODO: what about at start?
-      throw new AssertionError("Unexpected offset - for " + cOffset + ", got " + current.offset);
+    if (cOffset > current.offset) {
+      // Target compression block is in the middle of the range; slice the range in two.
+      current = current.split(cOffset).next;
     }
     long currentCOffset = cOffset;
+    DiskRangeList lastCached = null;
     while (true) {
+      DiskRangeList next = null;
       if (current instanceof CacheChunk) {
         // 2a. This is a cached compression buffer, add as is.
         CacheChunk cc = (CacheChunk)current;
@@ -587,6 +598,8 @@ public abstract class InStream extends I
         if (DebugUtils.isTraceOrcEnabled()) {
           LOG.info("Adding an already-uncompressed buffer " + cc.buffer);
         }
+        lastCached = current;
+        next = current.next;
       } else {
         // 2b. This is a compressed buffer. We need to uncompress it; the buffer can comprise
         // several disk ranges, so we might need to combine them.
@@ -596,18 +609,21 @@ public abstract class InStream extends I
           toRelease = (zcr == null) ? null : new ArrayList<ByteBuffer>();
         }
         long originalOffset = bc.offset;
-        int compressedBytesConsumed = addOneCompressionBuffer(bc, ranges, zcr, bufferSize,
+        next = addOneCompressionBuffer(bc, zcr, bufferSize,
             cache, streamBuffer.cacheBuffers, toDecompress, toRelease);
-        if (compressedBytesConsumed == -1) {
-          // endCOffset is an estimate; we have a partially-read compression block, ignore it
-          break;
+        if (next != null) {
+          currentCOffset = next.offset;
+          lastCached = next.prev;
+          // addOne... always adds one CC and returns next range after it
+          assert lastCached instanceof CacheChunk;
+        } else {
+          currentCOffset = originalOffset;
         }
-        currentCOffset = originalOffset + compressedBytesConsumed;
       }
-      if ((endCOffset >= 0 && currentCOffset >= endCOffset) || !ranges.hasNext()) {
+      if ((endCOffset >= 0 && currentCOffset >= endCOffset) || next == null) {
         break;
       }
-      current = ranges.next();
+      current = next;
     }
 
     // 3. Allocate the buffers, prepare cache keys.
@@ -615,7 +631,7 @@ public abstract class InStream extends I
     // data and some unallocated membufs for decompression. toDecompress contains all the work we
     // need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
     // has also been adjusted to point to these buffers instead of compressed data for the ranges.
-    if (toDecompress == null) return; // Nothing to decompress.
+    if (toDecompress == null) return lastCached; // Nothing to decompress.
 
     LlapMemoryBuffer[] targetBuffers = new LlapMemoryBuffer[toDecompress.size()];
     DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
@@ -650,36 +666,23 @@ public abstract class InStream extends I
 
     // 6. Finally, put data to cache.
     cache.putFileData(fileName, cacheKeys, targetBuffers, baseOffset);
+    return lastCached;
   }
 
 
   /** Finds compressed offset in a stream and makes sure iter points to its position.
      This may be necessary for obscure combinations of compression and encoding boundaries. */
-  private static DiskRange findCompressedPosition(
-      ListIterator<DiskRange> ranges, long cOffset) {
-    if (cOffset < 0) return ranges.next();
-    DiskRange current = null;
-    boolean doCallNext = false;
-    if (ranges.hasNext()) {
-      current = ranges.next();
-    } else if (ranges.hasPrevious()) {
-      current = ranges.previous();
-      doCallNext = true;
-    }
+  private static DiskRangeList findCompressedPosition(
+      DiskRangeList ranges, long cOffset) {
+    if (cOffset < 0) return ranges;
     // We expect the offset to be valid TODO: rather, validate
-    while (current.end <= cOffset) {
-      current = ranges.next();
-      doCallNext = false;
-    }
-    while (current.offset > cOffset) {
-      current = ranges.previous();
-      doCallNext = true;
-    }
-    if (doCallNext) {
-      // TODO: WTF?
-      ranges.next(); // We called previous, make sure next is the real next and not current.
+    while (ranges.end <= cOffset) {
+      ranges = ranges.next;
     }
-    return current;
+    while (ranges.offset > cOffset) {
+      ranges = ranges.prev;
+    }
+    return ranges;
   }
 
 
@@ -695,9 +698,8 @@ public abstract class InStream extends I
    * @param toRelease The list of buffers to release to zcr because they are no longer in use.
    * @return The total number of compressed bytes consumed.
    */
-  private static int addOneCompressionBuffer(BufferChunk current,
-      ListIterator<DiskRange> ranges, ZeroCopyReaderShim zcr, int bufferSize,
-      LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
+  private static DiskRangeList addOneCompressionBuffer(BufferChunk current, ZeroCopyReaderShim zcr,
+      int bufferSize, LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
       List<ProcCacheChunk> toDecompress, List<ByteBuffer> toRelease) throws IOException {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.chunk;
@@ -721,19 +723,15 @@ public abstract class InStream extends I
       // Simple case - CB fits entirely in the disk range.
       slice = compressed.slice();
       slice.limit(chunkLength);
-      addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset, cbEndOffset,
-          chunkLength, ranges, current, cache, toDecompress, cacheBuffers);
-      if (DebugUtils.isTraceOrcEnabled()) {
-        LOG.info("Adjusting " + current + " to consume " + consumedLength);
-      }
-      current.offset += consumedLength;
+      DiskRangeList next = addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset,
+          cbEndOffset, chunkLength, consumedLength, current, cache, toDecompress, cacheBuffers);
       if (compressed.remaining() <= 0 && zcr != null) {
         toRelease.add(compressed);
       }
-      return consumedLength;
+      return next;
     }
-    if (current.end < cbEndOffset && !ranges.hasNext()) {
-      return -1; // This is impossible to read from this chunk.
+    if (current.end < cbEndOffset && current.next == null) {
+      return null; // This is impossible to read from this chunk.
     }
 
     // TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
@@ -742,10 +740,11 @@ public abstract class InStream extends I
     int remaining = chunkLength - compressed.remaining();
     int originalPos = compressed.position();
     copy.put(compressed);
-    ranges.remove();
     if (DebugUtils.isTraceOrcEnabled()) {
       LOG.info("Removing " + current + " from ranges");
     }
+    DiskRangeList next = current.next;
+    current.removeSelf();
     if (zcr != null) {
       if (originalPos == 0) {
         zcr.releaseBuffer(compressed); // We copied the entire buffer.
@@ -754,40 +753,40 @@ public abstract class InStream extends I
       }
     }
 
-    DiskRange nextRange = null;
-    while (ranges.hasNext()) {
-      nextRange = ranges.next();
-      if (!(nextRange instanceof BufferChunk)) {
+    while (next != null) {
+      if (!(next instanceof BufferChunk)) {
         throw new IOException("Trying to extend compressed block into uncompressed block");
       }
-      compressed = nextRange.getData();
+      compressed = next.getData();
       if (compressed.remaining() >= remaining) {
         // This is the last range for this compression block. Yay!
         slice = compressed.slice();
         slice.limit(remaining);
         copy.put(slice);
-        addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
-            cbEndOffset, remaining, ranges, current, cache, toDecompress, cacheBuffers);
+        next = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset, cbEndOffset,
+            remaining, remaining, (BufferChunk)next, cache, toDecompress, cacheBuffers);
         if (DebugUtils.isTraceOrcEnabled()) {
-          LOG.info("Adjusting " + nextRange + " to consume " + remaining);
+          LOG.info("Adjusting " + next + " to consume " + remaining);
         }
-        nextRange.offset += remaining;
         if (compressed.remaining() <= 0 && zcr != null) {
           zcr.releaseBuffer(compressed); // We copied the entire buffer.
         }
-        return consumedLength;
+        return next;
       }
       remaining -= compressed.remaining();
       copy.put(compressed);
       if (zcr != null) {
         zcr.releaseBuffer(compressed); // We copied the entire buffer.
       }
+      DiskRangeList tmp = next;
       if (DebugUtils.isTraceOrcEnabled()) {
-        LOG.info("Removing " + nextRange + " from ranges");
+        LOG.info("Removing " + tmp + " from ranges");
       }
-      ranges.remove();
+      next = next.next;
+      tmp.removeSelf();
     }
-    return -1; // This is impossible to read from this chunk.
+    return null; // This is impossible to read from this chunk.
+    // TODO: dbl check this is valid; we just did a bunch of changes to the list.
   }
 
   /**
@@ -797,17 +796,18 @@ public abstract class InStream extends I
    * @param cbStartOffset Compressed start offset of the fCB.
    * @param cbEndOffset Compressed end offset of the fCB.
    * @param lastRange The buffer from which the last (or all) bytes of fCB come.
-   * @param lastPartLength The number of bytes consumed from lastRange into fCB.
+   * @param lastPartChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock.
+   * @param lastPartConsumedLength The number of compressed bytes consumed from last *range* into fullCompressionBlock.
+   *                               Can be different from lastPartChunkLength due to header.
    * @param ranges The iterator of all compressed ranges for the stream, pointing at lastRange.
    * @param lastChunk 
    * @param toDecompress See addOneCompressionBuffer.
    * @param cacheBuffers See addOneCompressionBuffer.
    */
-  private static void addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
-      boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastPartLength,
-      ListIterator<DiskRange> ranges, BufferChunk lastChunk,
-      LowLevelCache cache, List<ProcCacheChunk> toDecompress,
-      List<LlapMemoryBuffer> cacheBuffers) {
+  private static DiskRangeList addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
+      boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastPartChunkLength,
+      int lastPartConsumedLength, BufferChunk lastChunk, LowLevelCache cache,
+      List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) {
     // Prepare future cache buffer.
     LlapMemoryBuffer futureAlloc = cache.createUnallocated();
     // Add it to result in order we are processing.
@@ -817,21 +817,27 @@ public abstract class InStream extends I
         cbStartOffset, cbEndOffset, !isUncompressed, fullCompressionBlock, futureAlloc);
     toDecompress.add(cc);
     // Adjust the compression block position.
-    lastChunk.chunk.position(lastChunk.chunk.position() + lastPartLength);
+    if (DebugUtils.isTraceOrcEnabled()) {
+      LOG.info("Adjusting " + lastChunk + " to consume " + lastPartChunkLength
+          + " compressed / " + lastPartConsumedLength + " total bytes");
+    }
+    lastChunk.chunk.position(lastChunk.chunk.position() + lastPartChunkLength);
+    lastChunk.offset += lastPartConsumedLength;
     // Finally, put it in the ranges list for future use (if shared between RGs).
     // Before anyone else accesses it, it would have been allocated and decompressed locally.
     if (lastChunk.chunk.remaining() <= 0) {
       if (DebugUtils.isTraceOrcEnabled()) {
         LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
       }
-      ranges.set(cc);
+      assert lastChunk.offset == lastChunk.end;
+      lastChunk.replaceSelfWith(cc);
+      return cc.next;
     } else {
-      DiskRange before = ranges.previous();
       if (DebugUtils.isTraceOrcEnabled()) {
-        LOG.info("Adding " + cc + " before " + before + " in the buffers");
+        LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
       }
-      ranges.add(cc);
-      // At this point, next() should return before, which is the 2nd part of the split buffer.
+      lastChunk.insertBefore(cc);
+      return lastChunk;
     }
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Sat Feb 21 08:01:06 2015
@@ -43,6 +43,8 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
@@ -103,7 +105,7 @@ public class RecordReaderImpl implements
   private long rowCountInStripe = 0;
   private final Map<StreamName, InStream> streams =
       new HashMap<StreamName, InStream>();
-  List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
+  DiskRangeList bufferChunks = null;
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
   private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
@@ -2956,15 +2958,17 @@ public class RecordReaderImpl implements
     for(InStream is: streams.values()) {
       is.close();
     }
-    if(bufferChunks != null) {
+    if (bufferChunks != null) {
       if (zcr != null) {
-        for (DiskRange range : bufferChunks) {
+        DiskRangeList range = bufferChunks;
+        while (range != null) {
           if (range instanceof BufferChunk) {
             zcr.releaseBuffer(((BufferChunk)range).chunk);
           }
+          range = range.next;
         }
       }
-      bufferChunks.clear();
+      bufferChunks = null;
     }
     streams.clear();
   }
@@ -3019,18 +3023,15 @@ public class RecordReaderImpl implements
     return stripe;
   }
 
-  private void readAllDataStreams(StripeInformation stripe
-                                  ) throws IOException {
+  private void readAllDataStreams(StripeInformation stripe) throws IOException {
     long start = stripe.getIndexLength();
     long end = start + stripe.getDataLength();
     // explicitly trigger 1 big read
-    LinkedList<DiskRange> rangesToRead = Lists.newLinkedList();
-    rangesToRead.add(new DiskRange(start, end));
+    DiskRangeList toRead = new DiskRangeList(start, end);
     if (this.cache != null) {
-      cache.getFileData(fileName, rangesToRead, stripe.getOffset());
+      toRead = cache.getFileData(fileName, toRead, stripe.getOffset());
     }
-    RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead, false);
-    bufferChunks = rangesToRead;
+    bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false);
     List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
     createStreams(
         streamDescriptions, bufferChunks, null, codec, bufferSize, streams, cache);
@@ -3041,7 +3042,7 @@ public class RecordReaderImpl implements
    * The sections of stripe that we have read.
    * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
    */
-  public static class BufferChunk extends DiskRange {
+  public static class BufferChunk extends DiskRangeList {
     final ByteBuffer chunk;
 
     BufferChunk(ByteBuffer chunk, long offset) {
@@ -3065,8 +3066,8 @@ public class RecordReaderImpl implements
     public DiskRange slice(long offset, long end) {
       assert offset <= end && offset >= this.offset && end <= this.end;
       ByteBuffer sliceBuf = chunk.slice();
-      int newPos = chunk.position() + (int)(offset - this.offset);
-      int newLimit = chunk.limit() - chunk.position() - (int)(this.end - end);
+      int newPos = (int)(offset - this.offset);
+      int newLimit = newPos + (int)(end - offset);
       sliceBuf.position(newPos);
       sliceBuf.limit(newLimit);
       return new BufferChunk(sliceBuf, offset);
@@ -3078,7 +3079,7 @@ public class RecordReaderImpl implements
     }
   }
 
-  public static class CacheChunk extends DiskRange {
+  public static class CacheChunk extends DiskRangeList {
     public LlapMemoryBuffer buffer;
 
     public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
@@ -3115,7 +3116,7 @@ public class RecordReaderImpl implements
    * @param compressionSize the compression block size
    * @return the list of disk ranges that will be loaded
    */
-  static LinkedList<DiskRange> planReadPartialDataStreams
+  static DiskRangeList planReadPartialDataStreams
       (List<OrcProto.Stream> streamList,
        OrcProto.RowIndex[] indexes,
        boolean[] includedColumns,
@@ -3123,12 +3124,12 @@ public class RecordReaderImpl implements
        boolean isCompressed,
        List<OrcProto.ColumnEncoding> encodings,
        List<OrcProto.Type> types,
-       int compressionSize) {
-    LinkedList<DiskRange> result = new LinkedList<DiskRange>();
+       int compressionSize,
+       boolean doMergeBuffers) {
     long offset = 0;
     // figure out which columns have a present stream
     boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
-    DiskRange lastRange = null;
+    DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
     for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
       int column = stream.getColumn();
@@ -3140,16 +3141,16 @@ public class RecordReaderImpl implements
         // if we aren't filtering or it is a dictionary, load it.
         if (includedRowGroups == null
             || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
-          lastRange = RecordReaderUtils.addEntireStreamToRanges(offset, length, lastRange, result);
+          RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
         } else {
-          lastRange = RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
+          RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
               isCompressed, indexes[column], encodings.get(column), types.get(column),
-              compressionSize, hasNull[column], offset, length, lastRange, result);
+              compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
         }
       }
       offset += length;
     }
-    return result;
+    return list.extract();
   }
 
   /**
@@ -3157,24 +3158,21 @@ public class RecordReaderImpl implements
    * assumes that the ranges are sorted.
    * @param ranges the list of disk ranges to merge
    */
-  static void mergeDiskRanges(List<DiskRange> ranges) {
-    DiskRange prev = null;
-    for(int i=0; i < ranges.size(); ++i) {
-      DiskRange current = ranges.get(i);
-      if (prev != null && RecordReaderUtils.overlap(prev.offset, prev.end,
-          current.offset, current.end)) {
-        prev.offset = Math.min(prev.offset, current.offset);
-        prev.end = Math.max(prev.end, current.end);
-        ranges.remove(i);
-        i -= 1;
+  static void mergeDiskRanges(DiskRangeList range) {
+    while (range != null && range.next != null) {
+      DiskRangeList next = range.next;
+      if (RecordReaderUtils.overlap(range.offset, range.end, next.offset, next.end)) {
+        range.offset = Math.min(range.offset, next.offset);
+        range.end = Math.max(range.end, next.end);
+        range.removeAfter();
       } else {
-        prev = current;
+        range = next;
       }
     }
   }
 
   void createStreams(List<OrcProto.Stream> streamDescriptions,
-                            List<DiskRange> ranges,
+                            DiskRangeList ranges,
                             boolean[] includeColumn,
                             CompressionCodec codec,
                             int bufferSize,
@@ -3205,21 +3203,19 @@ public class RecordReaderImpl implements
 
   private void readPartialDataStreams(StripeInformation stripe) throws IOException {
     List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
-    LinkedList<DiskRange> rangesToRead =
-        planReadPartialDataStreams(streamList,
+    DiskRangeList toRead = planReadPartialDataStreams(streamList,
             indexes, included, includedRowGroups, codec != null,
-            stripeFooter.getColumnsList(), types, bufferSize);
+            stripeFooter.getColumnsList(), types, bufferSize, true);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+      LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
     }
-    mergeDiskRanges(rangesToRead);
+    mergeDiskRanges(toRead);
     if (this.cache != null) {
-      cache.getFileData(fileName, rangesToRead, stripe.getOffset());
+      toRead = cache.getFileData(fileName, toRead, stripe.getOffset());
     }
-    RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead, false);
-    bufferChunks = rangesToRead;
+    bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(), toRead, false);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+      LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
     }
 
     createStreams(streamList, bufferChunks, included, codec, bufferSize, streams, cache);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java Sat Feb 21 08:01:06 2015
@@ -21,16 +21,16 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.DiskRange;
-import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
@@ -68,24 +68,15 @@ public class RecordReaderUtils {
     return rightB >= leftA;
   }
 
-
-  static DiskRange addEntireStreamToRanges(long offset, long length,
-      DiskRange lastRange, LinkedList<DiskRange> result) {
-    long end = offset + length;
-    if (lastRange != null && overlap(lastRange.offset, lastRange.end, offset, end)) {
-      lastRange.offset = Math.min(lastRange.offset, offset);
-      lastRange.end = Math.max(lastRange.end, end);
-    } else {
-      lastRange = new DiskRange(offset, end);
-      result.add(lastRange);
-    }
-    return lastRange;
+  static void addEntireStreamToRanges(
+      long offset, long length, DiskRangeListCreateHelper list, boolean doMergeBuffers) {
+    list.addOrMerge(offset, offset + length, doMergeBuffers, false);
   }
 
-  static DiskRange addRgFilteredStreamToRanges(OrcProto.Stream stream,
+  static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
       boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
       OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
-      long offset, long length, DiskRange lastRange, LinkedList<DiskRange> result) {
+      long offset, long length, DiskRangeListCreateHelper list, boolean doMergeBuffers) {
     for (int group = 0; group < includedRowGroups.length; ++group) {
       if (!includedRowGroups[group]) continue;
       int posn = getIndexPosition(
@@ -98,19 +89,8 @@ public class RecordReaderUtils {
       start += offset;
       long end = offset + estimateRgEndOffset(
           isCompressed, isLast, nextGroupOffset, length, compressionSize);
-      if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) {
-        lastRange.offset = Math.min(lastRange.offset, start);
-        lastRange.end = Math.max(lastRange.end, end);
-      } else {
-        if (DebugUtils.isTraceOrcEnabled()) {
-          RecordReaderImpl.LOG.info("Creating new range for RG read; last range (which can "
-              + "include some previous RGs) was " + lastRange);
-        }
-        lastRange = new DiskRange(start, end);
-        result.add(lastRange);
-      }
+      list.addOrMerge(start, end, doMergeBuffers, true);
     }
-    return lastRange;
   }
 
   static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
@@ -218,14 +198,17 @@ public class RecordReaderUtils {
    * @param ranges ranges to stringify
    * @return the resulting string
    */
-  static String stringifyDiskRanges(List<DiskRange> ranges) {
+  static String stringifyDiskRanges(DiskRangeList range) {
     StringBuilder buffer = new StringBuilder();
     buffer.append("[");
-    for(int i=0; i < ranges.size(); ++i) {
-      if (i != 0) {
+    boolean isFirst = true;
+    while (range != null) {
+      if (!isFirst) {
         buffer.append(", ");
       }
-      buffer.append(ranges.get(i).toString());
+      isFirst = false;
+      buffer.append(range.toString());
+      range = range.next;
     }
     buffer.append("]");
     return buffer.toString();
@@ -240,15 +223,21 @@ public class RecordReaderUtils {
    *    ranges
    * @throws IOException
    */
-  static void readDiskRanges(FSDataInputStream file,
+  static DiskRangeList readDiskRanges(FSDataInputStream file,
                                  ZeroCopyReaderShim zcr,
                                  long base,
-                                 LinkedList<DiskRange> ranges,
+                                 DiskRangeList range,
                                  boolean doForceDirect) throws IOException {
-    ListIterator<DiskRange> rangeIter = ranges.listIterator();
-    while (rangeIter.hasNext()) {
-      DiskRange range = rangeIter.next();
-      if (range.hasData()) continue;
+    if (range == null) return null;
+    DiskRangeList prev = range.prev;
+    if (prev == null) {
+      prev = new DiskRangeListMutateHelper(range);
+    }
+    while (range != null) {
+      if (range.hasData()) {
+        range = range.next;
+        continue;
+      }
       int len = (int) (range.end - range.offset);
       long off = range.offset;
       file.seek(base + off);
@@ -258,11 +247,12 @@ public class RecordReaderUtils {
           ByteBuffer partial = zcr.readBuffer(len, false);
           BufferChunk bc = new BufferChunk(partial, off);
           if (!hasReplaced) {
-            rangeIter.set(bc);
+            range.replaceSelfWith(bc);
             hasReplaced = true;
           } else {
-            rangeIter.add(bc);
+            range.insertAfter(bc);
           }
+          range = bc;
           int read = partial.remaining();
           len -= read;
           off += read;
@@ -282,24 +272,29 @@ public class RecordReaderUtils {
           directBuf.put(buffer);
         }
         directBuf.position(0);
-        rangeIter.set(new BufferChunk(directBuf, range.offset));
+        range = range.replaceSelfWith(new BufferChunk(directBuf, range.offset));
       } else {
         byte[] buffer = new byte[len];
         file.readFully(buffer, 0, buffer.length);
-        rangeIter.set(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+        range = range.replaceSelfWith(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
       }
+      range = range.next;
     }
+    return prev.next;
   }
 
 
-  static List<DiskRange> getStreamBuffers(List<DiskRange> ranges, long offset, long length) {
+  static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
     // This assumes sorted ranges (as do many other parts of ORC code.
     ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
     long streamEnd = offset + length;
     boolean inRange = false;
-    for (DiskRange range : ranges) {
+    while (range != null) {
       if (!inRange) {
-        if (range.end <= offset) continue; // Skip until we are in range.
+        if (range.end <= offset) {
+          range = range.next;
+          continue; // Skip until we are in range.
+        }
         inRange = true;
         if (range.offset < offset) {
           // Partial first buffer, add a slice of it.
@@ -307,6 +302,7 @@ public class RecordReaderUtils {
           partial.shiftBy(-offset);
           buffers.add(partial);
           if (range.end >= streamEnd) break; // Partial first buffer is also partial last buffer.
+          range = range.next;
           continue;
         }
       } else if (range.offset >= streamEnd) {
@@ -326,6 +322,7 @@ public class RecordReaderUtils {
       full.shiftBy(-offset);
       buffers.add(full);
       if (range.end == streamEnd) break;
+      range = range.next;
     }
     return buffers;
   }

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Sat Feb 21 08:01:06 2015
@@ -20,8 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import static junit.framework.Assert.assertEquals;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,7 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.io.filters.BloomFilter;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
@@ -788,21 +787,22 @@ public class TestRecordReaderImpl {
     assertTrue(!RecordReaderUtils.overlap(0, 10, 11, 12));
   }
 
-  private static List<DiskRange> diskRanges(Integer... points) {
-    List<DiskRange> result =
-        new ArrayList<DiskRange>();
-    for(int i=0; i < points.length; i += 2) {
-      result.add(new DiskRange(points[i], points[i+1]));
+  private static DiskRangeList diskRanges(Integer... points) {
+    DiskRangeList head = null, tail = null;
+    for(int i = 0; i < points.length; i += 2) {
+      DiskRangeList range = new DiskRangeList(points[i], points[i+1]);
+      if (tail == null) {
+        head = tail = range;
+      } else {
+        tail = tail.insertAfter(range);
+      }
     }
-    return result;
+    return head;
   }
 
   @Test
   public void testMergeDiskRanges() throws Exception {
-    List<DiskRange> list = diskRanges();
-    RecordReaderImpl.mergeDiskRanges(list);
-    assertThat(list, is(diskRanges()));
-    list = diskRanges(100, 200, 300, 400, 500, 600);
+    DiskRangeList list = diskRanges(100, 200, 300, 400, 500, 600);
     RecordReaderImpl.mergeDiskRanges(list);
     assertThat(list, is(diskRanges(100, 200, 300, 400, 500, 600)));
     list = diskRanges(100, 200, 150, 300, 400, 500);
@@ -879,7 +879,7 @@ public class TestRecordReaderImpl {
 
   @Test
   public void testPartialPlan() throws Exception {
-    List<DiskRange> result;
+    DiskRangeList result;
 
     // set the streams
     List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -948,30 +948,41 @@ public class TestRecordReaderImpl {
 
     // filter by rows and groups
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, rowGroups, false, encodings, types, 32768);
+        columns, rowGroups, false, encodings, types, 32768, false);
     assertThat(result, is(diskRanges(0, 1000, 100, 1000, 400, 1000,
         1000, 11000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
         11000, 21000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
         41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
+    result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+        columns, rowGroups, false, encodings, types, 32768, true);
+    assertThat(result, is(diskRanges(0, 21000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
+        41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
 
     // if we read no rows, don't read any bytes
     rowGroups = new boolean[]{false, false, false, false, false, false};
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, rowGroups, false, encodings, types, 32768);
-    assertThat(result, is(diskRanges()));
+        columns, rowGroups, false, encodings, types, 32768, false);
+    assertNull(result);
 
     // all rows, but only columns 0 and 2.
     rowGroups = null;
     columns = new boolean[]{true, false, true};
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, null, false, encodings, types, 32768);
+        columns, null, false, encodings, types, 32768, false);
     assertThat(result, is(diskRanges(100000, 102000, 102000, 200000)));
+    result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+        columns, null, false, encodings, types, 32768, true);
+    assertThat(result, is(diskRanges(100000, 200000)));
 
     rowGroups = new boolean[]{false, true, false, false, false, false};
     indexes[2] = indexes[1];
     indexes[1] = null;
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, rowGroups, false, encodings, types, 32768);
+        columns, rowGroups, false, encodings, types, 32768, false);
+    assertThat(result, is(diskRanges(100100, 102000,
+        112000, 122000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
+    result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+        columns, rowGroups, false, encodings, types, 32768, true);
     assertThat(result, is(diskRanges(100100, 102000,
         112000, 122000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
 
@@ -979,7 +990,11 @@ public class TestRecordReaderImpl {
     indexes[1] = indexes[2];
     columns = new boolean[]{true, true, true};
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, rowGroups, false, encodings, types, 32768);
+        columns, rowGroups, false, encodings, types, 32768, false);
+    assertThat(result, is(diskRanges(500, 1000, 51000, 100000, 100500, 102000,
+        152000, 200000)));
+    result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+        columns, rowGroups, false, encodings, types, 32768, true);
     assertThat(result, is(diskRanges(500, 1000, 51000, 100000, 100500, 102000,
         152000, 200000)));
   }
@@ -987,7 +1002,7 @@ public class TestRecordReaderImpl {
 
   @Test
   public void testPartialPlanCompressed() throws Exception {
-    List<DiskRange> result;
+    DiskRangeList result;
 
     // set the streams
     List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -1056,20 +1071,20 @@ public class TestRecordReaderImpl {
 
     // filter by rows and groups
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, rowGroups, true, encodings, types, 32768);
+        columns, rowGroups, true, encodings, types, 32768, false);
     assertThat(result, is(diskRanges(0, 1000, 100, 1000,
         400, 1000, 1000, 11000+(2*32771),
         11000, 21000+(2*32771), 41000, 100000)));
 
     rowGroups = new boolean[]{false, false, false, false, false, true};
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, rowGroups, true, encodings, types, 32768);
+        columns, rowGroups, true, encodings, types, 32768, false);
     assertThat(result, is(diskRanges(500, 1000, 51000, 100000)));
   }
 
   @Test
   public void testPartialPlanString() throws Exception {
-    List<DiskRange> result;
+    DiskRangeList result;
 
     // set the streams
     List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -1144,7 +1159,7 @@ public class TestRecordReaderImpl {
 
     // filter by rows and groups
     result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
-        columns, rowGroups, false, encodings, types, 32768);
+        columns, rowGroups, false, encodings, types, 32768, false);
     assertThat(result, is(diskRanges(100, 1000, 400, 1000, 500, 1000,
         11000, 21000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
         41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,



Mime
View raw message