Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7BACD1764A for ; Thu, 5 Feb 2015 01:14:51 +0000 (UTC) Received: (qmail 25643 invoked by uid 500); 5 Feb 2015 01:14:52 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 25596 invoked by uid 500); 5 Feb 2015 01:14:52 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 25584 invoked by uid 99); 5 Feb 2015 01:14:52 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Feb 2015 01:14:52 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 13669AC003E; Thu, 5 Feb 2015 01:14:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1657456 - in /hive/branches/llap: common/src/java/org/apache/hadoop/hive/common/DiskRange.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Date: Thu, 05 Feb 2015 01:14:51 -0000 To: commits@hive.apache.org From: sershe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150205011452.13669AC003E@hades.apache.org> Author: sershe Date: Thu Feb 5 01:14:51 2015 New Revision: 1657456 URL: http://svn.apache.org/r1657456 Log: HIVE-9418p4 : Index reading and more bugfixing (mostly from trunk port) Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java?rev=1657456&r1=1657455&r2=1657456&view=diff ============================================================================== --- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java (original) +++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java Thu Feb 5 01:14:51 2015 @@ -69,4 +69,9 @@ public class DiskRange { public ByteBuffer getData() { throw new UnsupportedOperationException(); } + + public void shiftBy(long baseOffset) { + offset -= baseOffset; + end -= baseOffset; + } } \ No newline at end of file Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1657456&r1=1657455&r2=1657456&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Feb 5 01:14:51 2015 @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.llap.io.ap import com.google.common.annotations.VisibleForTesting; abstract class InStream extends InputStream { + private static final Log LOG = LogFactory.getLog(InStream.class); private static class UncompressedStream extends InStream { @@ -328,15 +329,14 @@ abstract class InStream extends InputStr currentOffset += compressed.remaining(); len -= compressed.remaining(); copy.put(compressed); + ListIterator iter = bytes.listIterator(currentRange); - while (len > 0 && (++currentRange) < bytes.size()) { + while (len > 0 && iter.hasNext()) { + ++currentRange; if (LOG.isDebugEnabled()) { LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString())); } - DiskRange range = bytes.get(currentRange); - if (!(range instanceof BufferChunk)) { - throw new IOException("Trying to extend compressed block into uncompressed block"); - } + DiskRange range = iter.next(); compressed = range.getData().duplicate(); if (compressed.remaining() >= len) { slice = compressed.slice(); @@ -358,8 +358,8 @@ abstract class InStream extends InputStr } private void seek(long desired) throws IOException { - for(int i = 0; i < bytes.size(); ++i) { - DiskRange range = bytes.get(i); + int i = 0; + for (DiskRange range : bytes) { if (range.offset <= desired && desired < range.end) { currentRange = i; if (range instanceof BufferChunk) { @@ -379,6 +379,7 @@ abstract class InStream extends InputStr currentOffset = desired; return; } + ++i; } // if they are seeking to the precise end, go ahead and let them go there int segments = bytes.size(); @@ -406,13 +407,14 @@ abstract class InStream extends InputStr private String rangeString() { StringBuilder builder = new StringBuilder(); - for(int i=0; i < bytes.size(); ++i) { + int i = 0; + for (DiskRange range : bytes) { if (i != 0) { builder.append("; "); } - DiskRange range = bytes.get(i); builder.append(" range " + i + " = " + range.offset + " to " + (range.end - range.offset)); + ++i; } return builder.toString(); } @@ -537,7 +539,6 @@ abstract class InStream extends InputStr // This is a compressed buffer. We need to uncompress it; the buffer can comprise // several disk ranges, so we might need to combine them. BufferChunk bc = (BufferChunk)current; - // TODO#: DOUBLE check the iterator state. if (toDecompress == null) { toDecompress = new ArrayList(); toRelease = (zcr == null) ? null : new ArrayList(); Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1657456&r1=1657455&r2=1657456&view=diff ============================================================================== --- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Thu Feb 5 01:14:51 2015 @@ -99,7 +99,7 @@ public class RecordReaderImpl implements private final String fileName; private final FSDataInputStream file; private final long firstRow; - protected final List stripes = + private final List stripes = new ArrayList(); private OrcProto.StripeFooter stripeFooter; private final long totalRowCount; @@ -109,7 +109,7 @@ public class RecordReaderImpl implements private final boolean[] included; private final long rowIndexStride; private long rowInStripe = 0; - protected int currentStripe = -1; + private int currentStripe = -1; private long rowBaseInStripe = 0; private long rowCountInStripe = 0; private final Map streams = @@ -319,7 +319,7 @@ public class RecordReaderImpl implements private abstract static class TreeReader { protected final Path path; protected final int columnId; - protected BitFieldReader present = null; + private BitFieldReader present = null; protected boolean valuePresent = false; protected final Configuration conf; @@ -2279,7 +2279,7 @@ public class RecordReaderImpl implements } } - protected OrcProto.StripeFooter readStripeFooter(StripeInformation stripe + OrcProto.StripeFooter readStripeFooter(StripeInformation stripe ) throws IOException { long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength(); @@ -2685,6 +2685,7 @@ public class RecordReaderImpl implements private void readStripe() throws IOException { StripeInformation stripe = beginReadStripe(); includedRowGroups = pickRowGroups(); + // move forward to the first unskipped row if (includedRowGroups != null) { while (rowInStripe < rowCountInStripe && @@ -2692,6 +2693,7 @@ public class RecordReaderImpl implements rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride); } } + // if we haven't skipped the whole stripe, read the data if (rowInStripe < rowCountInStripe) { // if we aren't projecting columns or filtering rows, just read it all @@ -2771,10 +2773,10 @@ public class RecordReaderImpl implements public DiskRange slice(long offset, long end) { assert offset <= end && offset >= this.offset && end <= this.end; ByteBuffer sliceBuf = chunk.slice(); - int newPos = (int)(offset - this.offset); - int newLen = chunk.limit() - chunk.position() - (int)(this.end - end); + int newPos = chunk.position() + (int)(offset - this.offset); + int newLimit = chunk.limit() - chunk.position() - (int)(this.end - end); sliceBuf.position(newPos); - sliceBuf.limit(newPos + newLen); + sliceBuf.limit(newLimit); return new BufferChunk(sliceBuf, offset); } @@ -2818,6 +2820,7 @@ public class RecordReaderImpl implements } } + private static final int BYTE_STREAM_POSITIONS = 1; private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1; @@ -2902,6 +2905,7 @@ public class RecordReaderImpl implements */ static boolean isDictionary(OrcProto.Stream.Kind kind, OrcProto.ColumnEncoding encoding) { + assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT; OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind(); return kind == OrcProto.Stream.Kind.DICTIONARY_DATA || (kind == OrcProto.Stream.Kind.LENGTH && @@ -2989,22 +2993,12 @@ public class RecordReaderImpl implements encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull); long start = index.getEntry(group).getPositions(posn); final long nextGroupOffset; - if (group < includedRowGroups.length - 1) { - nextGroupOffset = index.getEntry(group + 1).getPositions(posn); - } else { - nextGroupOffset = length; - } - // figure out the worst case last location + boolean isLast = group == (includedRowGroups.length - 1); + nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn); - // if adjacent groups have the same compressed block offset then stretch the slop - // by factor of 2 to safely accommodate the next compression block. - // One for the current compression block and another for the next compression block. - final long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + compressionSize) - : WORST_UNCOMPRESSED_SLOP; - long end = (group == includedRowGroups.length - 1) ? length : Math.min(length, - nextGroupOffset + slop); start += offset; - end += offset; + long end = offset + estimateRgEndOffset( + isCompressed, isLast, nextGroupOffset, length, compressionSize); if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) { lastRange.offset = Math.min(lastRange.offset, start); lastRange.end = Math.max(lastRange.end, end); @@ -3016,6 +3010,16 @@ public class RecordReaderImpl implements return lastRange; } + private static long estimateRgEndOffset(boolean isCompressed, boolean isLast, + long nextGroupOffset, long streamLength, int bufferSize) { + // figure out the worst case last location + // if adjacent groups have the same compressed block offset then stretch the slop + // by factor of 2 to safely accommodate the next compression block. + // One for the current compression block and another for the next compression block. + long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP; + return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop); + } + /** * Update the disk ranges to collapse adjacent or overlapping ranges. It * assumes that the ranges are sorted. @@ -3128,7 +3132,7 @@ public class RecordReaderImpl implements streamOffset += streamDesc.getLength(); continue; } - List buffers = getStreamBuffers(ranges, streamOffset, streamDesc); + List buffers = getStreamBuffers(ranges, streamOffset, streamDesc.getLength()); StreamName name = new StreamName(column, streamDesc.getKind()); streams.put(name, InStream.create(fileName, name.toString(), buffers, streamDesc.getLength(), codec, bufferSize, cache)); @@ -3136,32 +3140,40 @@ public class RecordReaderImpl implements } } - private List getStreamBuffers(List ranges, - long streamOffset, OrcProto.Stream streamDesc) { + private List getStreamBuffers(List ranges, long offset, long length) { // This assumes sorted ranges (as do many other parts of ORC code. ArrayList buffers = new ArrayList(); - long streamEnd = streamOffset + streamDesc.getLength(); + long streamEnd = offset + length; boolean inRange = false; - for (int i = 0; i < ranges.size(); ++i) { - DiskRange range = ranges.get(i); - boolean isLast = range.end >= streamEnd; + for (DiskRange range : ranges) { if (!inRange) { - if (range.end >= streamOffset) continue; // Skip until we are in range. + if (range.end <= offset) continue; // Skip until we are in range. inRange = true; - if (range.offset < streamOffset) { + if (range.offset < offset) { // Partial first buffer, add a slice of it. - buffers.add(range.slice(Math.max(range.offset, streamOffset), - Math.min(streamEnd, range.end))); - if (isLast) break; // Partial first buffer is also partial last buffer. + DiskRange partial = range.slice(offset, Math.min(streamEnd, range.end)); + partial.shiftBy(offset); + buffers.add(partial); + if (range.end >= streamEnd) break; // Partial first buffer is also partial last buffer. continue; } + } else if (range.offset >= streamEnd) { + break; } if (range.end > streamEnd) { // Partial last buffer (may also be the first buffer), add a slice of it. - buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end))); + DiskRange partial = range.slice(range.offset, streamEnd); + partial.shiftBy(offset); + buffers.add(partial); break; } - buffers.add(range); // Normal buffer. + // Buffer that belongs entirely to one stream. + // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot + // because bufferChunks is also used by clearStreams for zcr. Create a useless dup. + DiskRange full = range.slice(range.offset, range.end); + full.shiftBy(offset); + buffers.add(full); + if (range.end == streamEnd) break; } return buffers; } @@ -3362,7 +3374,7 @@ public class RecordReaderImpl implements throw new IllegalArgumentException("Seek after the end of reader range"); } - protected OrcProto.RowIndex[] readRowIndex( + OrcProto.RowIndex[] readRowIndex( int stripeIndex, boolean[] included) throws IOException { long offset = stripes.get(stripeIndex).getOffset(); OrcProto.StripeFooter stripeFooter; @@ -3434,30 +3446,46 @@ public class RecordReaderImpl implements beginReadStripe(); } - private static class ColumnReadContext { + /** Helper context for each column being read */ + private static final class ColumnReadContext { public ColumnReadContext(long offset, int colIx, ColumnEncoding encoding, RowIndex rowIndex) { this.encoding = encoding; this.rowIndex = rowIndex; this.colIx = colIx; } public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE; - final long[] streamOffsets = new long[MAX_STREAMS]; - final OrcProto.Stream[] streams = new OrcProto.Stream[MAX_STREAMS]; - List[] buffers; - ListIterator[] bufferIters; - StreamBuffer[] stripeLevelStreams; + /** The number of streams that are part of this column. */ + int streamCount = 0; + final StreamContext[] streams = new StreamContext[MAX_STREAMS]; + /** Column encoding. */ final ColumnEncoding encoding; + /** Column rowindex. */ final OrcProto.RowIndex rowIndex; + /** Column index in the file. */ final int colIx; - int streamCount = 0; - public void addStream(long offset, OrcProto.Stream stream) { - streams[streamCount] = stream; - streamOffsets[streamCount] = offset; - ++streamCount; + public void addStream(long offset, OrcProto.Stream stream, int indexIx) { + streams[++streamCount] = new StreamContext(stream, offset, indexIx); } } + private static final class StreamContext { + public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) { + this.kind = stream.getKind(); + this.length = stream.getLength(); + this.offset = streamOffset; + this.streamIndexOffset = streamIndexOffset; + } + /** Offsets of each stream in the column. */ + public final long offset, length; + public final int streamIndexOffset; + public final OrcProto.Stream.Kind kind; + /** Iterators for the buffers; used to maintain position in per-rg reading. */ + ListIterator bufferIter; + /** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */ + StreamBuffer stripeLevelStream; + } + @Override public void readEncodedColumns(int stripeIx, boolean[] includes, boolean[][] colRgs, LowLevelCache cache, Consumer> consumer) throws IOException { @@ -3479,6 +3507,7 @@ public class RecordReaderImpl implements int colRgIx = -1, lastColIx = -1; ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length]; boolean[] includedRgs = null; + boolean isCompressed = (codec != null); for (OrcProto.Stream stream : streamList) { long length = stream.getLength(); int colIx = stream.getColumn(); @@ -3487,14 +3516,18 @@ public class RecordReaderImpl implements offset += length; continue; } + ColumnReadContext ctx = colCtxs[colRgIx]; if (lastColIx != colIx) { + assert ctx == null; ++colRgIx; lastColIx = colIx; includedRgs = colRgs[colRgIx]; - colCtxs[colRgIx] = new ColumnReadContext( + ctx = colCtxs[colRgIx] = new ColumnReadContext( offset, colIx, encodings.get(colIx), indexes[colIx]); } - colCtxs[colRgIx].addStream(offset, stream); + int indexIx = getIndexPosition(ctx.encoding.getKind(), + types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]); + colCtxs[colRgIx].addStream(offset, stream, indexIx); if (includedRgs == null || isDictionary(streamKind, encodings.get(colIx))) { lastRange = addEntireStreamToResult(offset, length, lastRange, rangesToRead); } else { @@ -3518,11 +3551,9 @@ public class RecordReaderImpl implements // Separate buffers for each stream from the data we have. // TODO: given how we read, we could potentially get rid of this step? for (ColumnReadContext colCtx : colCtxs) { - colCtx.buffers = new List[colCtx.streamCount]; for (int i = 0; i < colCtx.streamCount; ++i) { - colCtx.buffers[i] = getStreamBuffers( - rangesToRead, colCtx.streamOffsets[i], colCtx.streams[i]); - colCtx.bufferIters[i] = colCtx.buffers[i].listIterator(); + StreamContext sctx = colCtx.streams[i]; + sctx.bufferIter = getStreamBuffers(rangesToRead, sctx.offset, sctx.length).listIterator(); } } @@ -3530,50 +3561,32 @@ public class RecordReaderImpl implements // We go by RG and not by column because that is how data is processed. int rgCount = (int)Math.ceil((double)rowCountInStripe / rowIndexStride); for (int rgIx = 0; rgIx < rgCount; ++rgIx) { - boolean isLastRg = rgIx == rgCount - 1; + boolean isLastRg = rgCount - rgIx - 1 == 0; + // Create the batch we will use to return data for this RG. EncodedColumnBatch ecb = new EncodedColumnBatch( new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0); for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) { if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) continue; // RG x col filtered. ColumnReadContext ctx = colCtxs[colIxMod]; - RowIndexEntry index = indexes[ctx.colIx].getEntry(rgIx); + RowIndexEntry index = ctx.rowIndex.getEntry(rgIx), + nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1); ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount); for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { - OrcProto.Stream stream = ctx.streams[streamIx]; + StreamContext sctx = ctx.streams[streamIx]; StreamBuffer cb = null; - if (isStripeLevelStream(stream.getKind(), ctx.encoding.getKind())) { + if (isDictionary(sctx.kind, ctx.encoding)) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. - if (ctx.stripeLevelStreams == null) { - ctx.stripeLevelStreams = new StreamBuffer[ctx.streamCount]; - } - cb = ctx.stripeLevelStreams[streamIx]; - if (cb == null) { - cb = ctx.stripeLevelStreams[streamIx] = new StreamBuffer(); - // We will be using this for each RG while also sending RGs to processing. - // To avoid buffers being unlocked, run refcount one ahead; we will not increase - // it when building the last RG, so each RG processing will decref once, and the - // last one will unlock the buffers. - cb.incRef(); - InStream.uncompressStream(fileName, zcr, ctx.bufferIters[streamIx], - codec, bufferSize, cache, -1, -1, cb); - ctx.buffers[streamIx] = null; - } - if (!isLastRg) { - cb.incRef(); - } + cb = getStripeLevelStream(sctx, cache, isLastRg); } else { // This stream can be separated by RG using index. Let's do that. - // TODO#: determine start offset, end offset from index; nexts can be end of stream. - // Either see getIndexPosition or - // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-ColumnEncodings - long cOffset = 0, nextCOffset = 0, nextNextCOffset = 0; - int nextUcOffset = 0; + long cOffset = index.getPositions(sctx.streamIndexOffset), + endCOffset = estimateRgEndOffset(isCompressed, isLastRg, isLastRg + ? sctx.length : nextIndex.getPositions(sctx.streamIndexOffset), + sctx.length, bufferSize); cb = new StreamBuffer(); cb.incRef(); - long startCOffset = cOffset; - long endCOffset = (nextUcOffset == 0) ? nextCOffset : nextNextCOffset; - InStream.uncompressStream(fileName, zcr, ctx.bufferIters[streamIx], - codec, bufferSize, cache, startCOffset, endCOffset, cb); + InStream.uncompressStream(fileName, zcr, sctx.bufferIter, + codec, bufferSize, cache, cOffset, endCOffset, cb); } ecb.setStreamData(colIxMod, streamIx, cb); } @@ -3584,12 +3597,23 @@ public class RecordReaderImpl implements throw new UnsupportedOperationException("not implemented"); } - private boolean isStripeLevelStream(Kind kind, ColumnEncoding.Kind encoding) { - return kind == OrcProto.Stream.Kind.DICTIONARY_DATA - || kind == OrcProto.Stream.Kind.DICTIONARY_COUNT - || (kind == OrcProto.Stream.Kind.LENGTH - && (encoding == ColumnEncoding.Kind.DICTIONARY - || encoding == ColumnEncoding.Kind.DICTIONARY_V2)); + private StreamBuffer getStripeLevelStream( + StreamContext ctx, LowLevelCache cache, boolean isLastRg) throws IOException { + if (ctx.stripeLevelStream == null) { + ctx.stripeLevelStream = new StreamBuffer(); + // We will be using this for each RG while also sending RGs to processing. + // To avoid buffers being unlocked, run refcount one ahead; we will not increase + // it when building the last RG, so each RG processing will decref once, and the + // last one will unlock the buffers. + ctx.stripeLevelStream.incRef(); + InStream.uncompressStream( + fileName, zcr, ctx.bufferIter, codec, bufferSize, cache, -1, -1, ctx.stripeLevelStream); + ctx.bufferIter = null; + } + if (!isLastRg) { + ctx.stripeLevelStream.incRef(); + } + return ctx.stripeLevelStream; } @Override