hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1657456 - in /hive/branches/llap: common/src/java/org/apache/hadoop/hive/common/DiskRange.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
Date Thu, 05 Feb 2015 01:14:51 GMT
Author: sershe
Date: Thu Feb  5 01:14:51 2015
New Revision: 1657456

URL: http://svn.apache.org/r1657456
Log:
HIVE-9418p4 : Index reading and more bugfixing (mostly from trunk port)

Modified:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java

Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java?rev=1657456&r1=1657455&r2=1657456&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java Thu Feb
 5 01:14:51 2015
@@ -69,4 +69,9 @@ public class DiskRange {
   public ByteBuffer getData() {
     throw new UnsupportedOperationException();
   }
+
+  public void shiftBy(long baseOffset) {
+    offset -= baseOffset;
+    end -= baseOffset;
+  }
 }
\ No newline at end of file

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1657456&r1=1657455&r2=1657456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Feb
 5 01:14:51 2015
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.llap.io.ap
 import com.google.common.annotations.VisibleForTesting;
 
 abstract class InStream extends InputStream {
+
   private static final Log LOG = LogFactory.getLog(InStream.class);
 
   private static class UncompressedStream extends InStream {
@@ -328,15 +329,14 @@ abstract class InStream extends InputStr
       currentOffset += compressed.remaining();
       len -= compressed.remaining();
       copy.put(compressed);
+      ListIterator<DiskRange> iter = bytes.listIterator(currentRange);
 
-      while (len > 0 && (++currentRange) < bytes.size()) {
+      while (len > 0 && iter.hasNext()) {
+        ++currentRange;
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
         }
-        DiskRange range = bytes.get(currentRange);
-        if (!(range instanceof BufferChunk)) {
-          throw new IOException("Trying to extend compressed block into uncompressed block");
-        }
+        DiskRange range = iter.next();
         compressed = range.getData().duplicate();
         if (compressed.remaining() >= len) {
           slice = compressed.slice();
@@ -358,8 +358,8 @@ abstract class InStream extends InputStr
     }
 
     private void seek(long desired) throws IOException {
-      for(int i = 0; i < bytes.size(); ++i) {
-        DiskRange range = bytes.get(i);
+      int i = 0;
+      for (DiskRange range : bytes) {
         if (range.offset <= desired && desired < range.end) {
           currentRange = i;
           if (range instanceof BufferChunk) {
@@ -379,6 +379,7 @@ abstract class InStream extends InputStr
           currentOffset = desired;
           return;
         }
+        ++i;
       }
       // if they are seeking to the precise end, go ahead and let them go there
       int segments = bytes.size();
@@ -406,13 +407,14 @@ abstract class InStream extends InputStr
 
     private String rangeString() {
       StringBuilder builder = new StringBuilder();
-      for(int i=0; i < bytes.size(); ++i) {
+      int i = 0;
+      for (DiskRange range : bytes) {
         if (i != 0) {
           builder.append("; ");
         }
-        DiskRange range = bytes.get(i);
         builder.append(" range " + i + " = " + range.offset
             + " to " + (range.end - range.offset));
+        ++i;
       }
       return builder.toString();
     }
@@ -537,7 +539,6 @@ abstract class InStream extends InputStr
         // This is a compressed buffer. We need to uncompress it; the buffer can comprise
         // several disk ranges, so we might need to combine them.
         BufferChunk bc = (BufferChunk)current;
-        // TODO#: DOUBLE check the iterator state.
         if (toDecompress == null) {
           toDecompress = new ArrayList<ProcCacheChunk>();
           toRelease = (zcr == null) ? null : new ArrayList<ByteBuffer>();

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1657456&r1=1657455&r2=1657456&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
Thu Feb  5 01:14:51 2015
@@ -99,7 +99,7 @@ public class RecordReaderImpl implements
   private final String fileName;
   private final FSDataInputStream file;
   private final long firstRow;
-  protected final List<StripeInformation> stripes =
+  private final List<StripeInformation> stripes =
     new ArrayList<StripeInformation>();
   private OrcProto.StripeFooter stripeFooter;
   private final long totalRowCount;
@@ -109,7 +109,7 @@ public class RecordReaderImpl implements
   private final boolean[] included;
   private final long rowIndexStride;
   private long rowInStripe = 0;
-  protected int currentStripe = -1;
+  private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final Map<StreamName, InStream> streams =
@@ -319,7 +319,7 @@ public class RecordReaderImpl implements
   private abstract static class TreeReader {
     protected final Path path;
     protected final int columnId;
-    protected BitFieldReader present = null;
+    private BitFieldReader present = null;
     protected boolean valuePresent = false;
     protected final Configuration conf;
 
@@ -2279,7 +2279,7 @@ public class RecordReaderImpl implements
     }
   }
 
-  protected OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
+  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
                                          ) throws IOException {
     long offset = stripe.getOffset() + stripe.getIndexLength() +
         stripe.getDataLength();
@@ -2685,6 +2685,7 @@ public class RecordReaderImpl implements
   private void readStripe() throws IOException {
     StripeInformation stripe = beginReadStripe();
     includedRowGroups = pickRowGroups();
+
     // move forward to the first unskipped row
     if (includedRowGroups != null) {
       while (rowInStripe < rowCountInStripe &&
@@ -2692,6 +2693,7 @@ public class RecordReaderImpl implements
         rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
       }
     }
+
     // if we haven't skipped the whole stripe, read the data
     if (rowInStripe < rowCountInStripe) {
       // if we aren't projecting columns or filtering rows, just read it all
@@ -2771,10 +2773,10 @@ public class RecordReaderImpl implements
     public DiskRange slice(long offset, long end) {
       assert offset <= end && offset >= this.offset && end <= this.end;
       ByteBuffer sliceBuf = chunk.slice();
-      int newPos = (int)(offset - this.offset);
-      int newLen = chunk.limit() - chunk.position() - (int)(this.end - end);
+      int newPos = chunk.position() + (int)(offset - this.offset);
+      int newLimit = chunk.limit() - chunk.position() - (int)(this.end - end);
       sliceBuf.position(newPos);
-      sliceBuf.limit(newPos + newLen);
+      sliceBuf.limit(newLimit);
       return new BufferChunk(sliceBuf, offset);
     }
 
@@ -2818,6 +2820,7 @@ public class RecordReaderImpl implements
     }
   }
 
+
   private static final int BYTE_STREAM_POSITIONS = 1;
   private static final int RUN_LENGTH_BYTE_POSITIONS =
       BYTE_STREAM_POSITIONS + 1;
@@ -2902,6 +2905,7 @@ public class RecordReaderImpl implements
    */
   static boolean isDictionary(OrcProto.Stream.Kind kind,
                               OrcProto.ColumnEncoding encoding) {
+    assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
     OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
     return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
       (kind == OrcProto.Stream.Kind.LENGTH &&
@@ -2989,22 +2993,12 @@ public class RecordReaderImpl implements
           encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
       long start = index.getEntry(group).getPositions(posn);
       final long nextGroupOffset;
-      if (group < includedRowGroups.length - 1) {
-        nextGroupOffset = index.getEntry(group + 1).getPositions(posn);
-      } else {
-        nextGroupOffset = length;
-      }
-      // figure out the worst case last location
+      boolean isLast = group == (includedRowGroups.length - 1);
+      nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
 
-      // if adjacent groups have the same compressed block offset then stretch the slop
-      // by factor of 2 to safely accommodate the next compression block.
-      // One for the current compression block and another for the next compression block.
-      final long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + compressionSize)
-          : WORST_UNCOMPRESSED_SLOP;
-      long end = (group == includedRowGroups.length - 1) ? length : Math.min(length,
-          nextGroupOffset + slop);
       start += offset;
-      end += offset;
+      long end = offset + estimateRgEndOffset(
+          isCompressed, isLast, nextGroupOffset, length, compressionSize);
       if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end))
{
         lastRange.offset = Math.min(lastRange.offset, start);
         lastRange.end = Math.max(lastRange.end, end);
@@ -3016,6 +3010,16 @@ public class RecordReaderImpl implements
     return lastRange;
   }
 
+  private static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
+      long nextGroupOffset, long streamLength, int bufferSize) {
+    // figure out the worst case last location
+    // if adjacent groups have the same compressed block offset then stretch the slop
+    // by factor of 2 to safely accommodate the next compression block.
+    // One for the current compression block and another for the next compression block.
+    long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
+    return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
+  }
+
   /**
    * Update the disk ranges to collapse adjacent or overlapping ranges. It
    * assumes that the ranges are sorted.
@@ -3128,7 +3132,7 @@ public class RecordReaderImpl implements
         streamOffset += streamDesc.getLength();
         continue;
       }
-      List<DiskRange> buffers = getStreamBuffers(ranges, streamOffset, streamDesc);
+      List<DiskRange> buffers = getStreamBuffers(ranges, streamOffset, streamDesc.getLength());
       StreamName name = new StreamName(column, streamDesc.getKind());
       streams.put(name, InStream.create(fileName, name.toString(), buffers,
           streamDesc.getLength(), codec, bufferSize, cache));
@@ -3136,32 +3140,40 @@ public class RecordReaderImpl implements
     }
   }
 
-  private List<DiskRange> getStreamBuffers(List<DiskRange> ranges,
-      long streamOffset, OrcProto.Stream streamDesc) {
+  private List<DiskRange> getStreamBuffers(List<DiskRange> ranges, long offset,
long length) {
     // This assumes sorted ranges (as do many other parts of ORC code.
     ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
-    long streamEnd = streamOffset + streamDesc.getLength();
+    long streamEnd = offset + length;
     boolean inRange = false;
-    for (int i = 0; i < ranges.size(); ++i) {
-      DiskRange range = ranges.get(i);
-      boolean isLast = range.end >= streamEnd;
+    for (DiskRange range : ranges) {
       if (!inRange) {
-        if (range.end >= streamOffset) continue; // Skip until we are in range.
+        if (range.end <= offset) continue; // Skip until we are in range.
         inRange = true;
-        if (range.offset < streamOffset) {
+        if (range.offset < offset) {
           // Partial first buffer, add a slice of it.
-          buffers.add(range.slice(Math.max(range.offset, streamOffset),
-              Math.min(streamEnd, range.end)));
-          if (isLast) break; // Partial first buffer is also partial last buffer.
+          DiskRange partial = range.slice(offset, Math.min(streamEnd, range.end));
+          partial.shiftBy(offset);
+          buffers.add(partial);
+          if (range.end >= streamEnd) break; // Partial first buffer is also partial last
buffer.
           continue;
         }
+      } else if (range.offset >= streamEnd) {
+        break;
       }
       if (range.end > streamEnd) {
         // Partial last buffer (may also be the first buffer), add a slice of it.
-        buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end)));
+        DiskRange partial = range.slice(range.offset, streamEnd);
+        partial.shiftBy(offset);
+        buffers.add(partial);
         break;
       }
-      buffers.add(range); // Normal buffer.
+      // Buffer that belongs entirely to one stream.
+      // TODO: ideally we would want to reuse the object and remove it from the list, but
we cannot
+      //       because bufferChunks is also used by clearStreams for zcr. Create a useless
dup.
+      DiskRange full = range.slice(range.offset, range.end);
+      full.shiftBy(offset);
+      buffers.add(full);
+      if (range.end == streamEnd) break;
     }
     return buffers;
   }
@@ -3362,7 +3374,7 @@ public class RecordReaderImpl implements
     throw new IllegalArgumentException("Seek after the end of reader range");
   }
 
-  protected OrcProto.RowIndex[] readRowIndex(
+  OrcProto.RowIndex[] readRowIndex(
       int stripeIndex, boolean[] included) throws IOException {
     long offset = stripes.get(stripeIndex).getOffset();
     OrcProto.StripeFooter stripeFooter;
@@ -3434,30 +3446,46 @@ public class RecordReaderImpl implements
     beginReadStripe();
   }
 
-  private static class ColumnReadContext {
+  /** Helper context for each column being read */
+  private static final class ColumnReadContext {
     public ColumnReadContext(long offset, int colIx, ColumnEncoding encoding, RowIndex rowIndex)
{
       this.encoding = encoding;
       this.rowIndex = rowIndex;
       this.colIx = colIx;
     }
     public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
-    final long[] streamOffsets = new long[MAX_STREAMS];
-    final OrcProto.Stream[] streams = new OrcProto.Stream[MAX_STREAMS];
-    List<DiskRange>[] buffers;
-    ListIterator<DiskRange>[] bufferIters;
-    StreamBuffer[] stripeLevelStreams;
+    /** The number of streams that are part of this column. */
+    int streamCount = 0;
+    final StreamContext[] streams = new StreamContext[MAX_STREAMS];
+    /** Column encoding. */
     final ColumnEncoding encoding;
+    /** Column rowindex. */
     final OrcProto.RowIndex rowIndex;
+    /** Column index in the file. */
     final int colIx;
-    int streamCount = 0;
 
-    public void addStream(long offset, OrcProto.Stream stream) {
-      streams[streamCount] = stream;
-      streamOffsets[streamCount] = offset;
-      ++streamCount;
+    public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
+      streams[++streamCount] = new StreamContext(stream, offset, indexIx);
     }
   }
 
+  private static final class StreamContext {
+    public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset)
{
+      this.kind = stream.getKind();
+      this.length = stream.getLength();
+      this.offset = streamOffset;
+      this.streamIndexOffset = streamIndexOffset;
+    }
+    /** Offsets of each stream in the column. */
+    public final long offset, length;
+    public final int streamIndexOffset;
+    public final OrcProto.Stream.Kind kind;
+    /** Iterators for the buffers; used to maintain position in per-rg reading. */
+    ListIterator<DiskRange> bufferIter;
+    /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
+    StreamBuffer stripeLevelStream;
+  }
+
   @Override
   public void readEncodedColumns(int stripeIx, boolean[] includes,  boolean[][] colRgs,
       LowLevelCache cache, Consumer<EncodedColumnBatch<OrcBatchKey>> consumer)
throws IOException {
@@ -3479,6 +3507,7 @@ public class RecordReaderImpl implements
     int colRgIx = -1, lastColIx = -1;
     ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
     boolean[] includedRgs = null;
+    boolean isCompressed = (codec != null);
     for (OrcProto.Stream stream : streamList) {
       long length = stream.getLength();
       int colIx = stream.getColumn();
@@ -3487,14 +3516,18 @@ public class RecordReaderImpl implements
         offset += length;
         continue;
       }
+      ColumnReadContext ctx = colCtxs[colRgIx];
       if (lastColIx != colIx) {
+        assert ctx == null;
         ++colRgIx;
         lastColIx = colIx;
         includedRgs = colRgs[colRgIx];
-        colCtxs[colRgIx] = new ColumnReadContext(
+        ctx = colCtxs[colRgIx] = new ColumnReadContext(
             offset, colIx, encodings.get(colIx), indexes[colIx]);
       }
-      colCtxs[colRgIx].addStream(offset, stream);
+      int indexIx = getIndexPosition(ctx.encoding.getKind(),
+          types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
+      colCtxs[colRgIx].addStream(offset, stream, indexIx);
       if (includedRgs == null || isDictionary(streamKind, encodings.get(colIx))) {
         lastRange = addEntireStreamToResult(offset, length, lastRange, rangesToRead);
       } else {
@@ -3518,11 +3551,9 @@ public class RecordReaderImpl implements
     // Separate buffers for each stream from the data we have.
     // TODO: given how we read, we could potentially get rid of this step?
     for (ColumnReadContext colCtx : colCtxs) {
-      colCtx.buffers = new List[colCtx.streamCount];
       for (int i = 0; i < colCtx.streamCount; ++i) {
-        colCtx.buffers[i] = getStreamBuffers(
-            rangesToRead, colCtx.streamOffsets[i], colCtx.streams[i]);
-        colCtx.bufferIters[i] = colCtx.buffers[i].listIterator();
+        StreamContext sctx = colCtx.streams[i];
+        sctx.bufferIter = getStreamBuffers(rangesToRead, sctx.offset, sctx.length).listIterator();
       }
     }
 
@@ -3530,50 +3561,32 @@ public class RecordReaderImpl implements
     // We go by RG and not by column because that is how data is processed.
     int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride);
     for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
-      boolean isLastRg = rgIx == rgCount - 1;
+      boolean isLastRg = rgCount - rgIx - 1 == 0;
+      // Create the batch we will use to return data for this RG.
       EncodedColumnBatch<OrcBatchKey> ecb = new EncodedColumnBatch<OrcBatchKey>(
           new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0);
       for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
         if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) continue; // RG
x col filtered.
         ColumnReadContext ctx = colCtxs[colIxMod];
-        RowIndexEntry index = indexes[ctx.colIx].getEntry(rgIx);
+        RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
+            nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
         ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
         for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
-          OrcProto.Stream stream = ctx.streams[streamIx];
+          StreamContext sctx = ctx.streams[streamIx];
           StreamBuffer cb = null;
-          if (isStripeLevelStream(stream.getKind(), ctx.encoding.getKind())) {
+          if (isDictionary(sctx.kind, ctx.encoding)) {
             // This stream is for entire stripe and needed for every RG; uncompress once
and reuse.
-            if (ctx.stripeLevelStreams == null) {
-              ctx.stripeLevelStreams = new StreamBuffer[ctx.streamCount];
-            }
-            cb = ctx.stripeLevelStreams[streamIx];
-            if (cb == null) {
-              cb = ctx.stripeLevelStreams[streamIx] = new StreamBuffer();
-              // We will be using this for each RG while also sending RGs to processing.
-              // To avoid buffers being unlocked, run refcount one ahead; we will not increase
-              // it when building the last RG, so each RG processing will decref once, and
the
-              // last one will unlock the buffers.
-              cb.incRef();
-              InStream.uncompressStream(fileName, zcr, ctx.bufferIters[streamIx],
-                  codec, bufferSize, cache, -1, -1, cb);
-              ctx.buffers[streamIx] = null;
-            }
-            if (!isLastRg) {
-              cb.incRef();
-            }
+            cb = getStripeLevelStream(sctx, cache, isLastRg);
           } else {
             // This stream can be separated by RG using index. Let's do that.
-            // TODO#: determine start offset, end offset from index; nexts can be end of
stream.
-            //        Either see getIndexPosition or
-            //        https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-ColumnEncodings
-            long cOffset = 0, nextCOffset = 0, nextNextCOffset = 0;
-            int nextUcOffset = 0;
+            long cOffset = index.getPositions(sctx.streamIndexOffset),
+                endCOffset = estimateRgEndOffset(isCompressed, isLastRg, isLastRg
+                    ? sctx.length : nextIndex.getPositions(sctx.streamIndexOffset),
+                    sctx.length, bufferSize);
             cb = new StreamBuffer();
             cb.incRef();
-            long startCOffset = cOffset;
-            long endCOffset = (nextUcOffset == 0) ? nextCOffset : nextNextCOffset;
-            InStream.uncompressStream(fileName, zcr, ctx.bufferIters[streamIx],
-                codec, bufferSize, cache, startCOffset, endCOffset, cb);
+            InStream.uncompressStream(fileName, zcr, sctx.bufferIter,
+                codec, bufferSize, cache, cOffset, endCOffset, cb);
           }
           ecb.setStreamData(colIxMod, streamIx, cb);
         }
@@ -3584,12 +3597,23 @@ public class RecordReaderImpl implements
     throw new UnsupportedOperationException("not implemented");
   }
 
-  private boolean isStripeLevelStream(Kind kind, ColumnEncoding.Kind encoding) {
-    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA
-        || kind == OrcProto.Stream.Kind.DICTIONARY_COUNT
-        || (kind == OrcProto.Stream.Kind.LENGTH 
-        && (encoding == ColumnEncoding.Kind.DICTIONARY
-        || encoding == ColumnEncoding.Kind.DICTIONARY_V2));
+  private StreamBuffer getStripeLevelStream(
+      StreamContext ctx, LowLevelCache cache, boolean isLastRg) throws IOException {
+    if (ctx.stripeLevelStream == null) {
+      ctx.stripeLevelStream = new StreamBuffer();
+      // We will be using this for each RG while also sending RGs to processing.
+      // To avoid buffers being unlocked, run refcount one ahead; we will not increase
+      // it when building the last RG, so each RG processing will decref once, and the
+      // last one will unlock the buffers.
+      ctx.stripeLevelStream.incRef();
+      InStream.uncompressStream(
+          fileName, zcr, ctx.bufferIter, codec, bufferSize, cache, -1, -1, ctx.stripeLevelStream);
+      ctx.bufferIter = null;
+    }
+    if (!isLastRg) {
+      ctx.stripeLevelStream.incRef();
+    }
+    return ctx.stripeLevelStream;
   }
 
   @Override



Mime
View raw message