Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1ACCE10034 for ; Wed, 20 Nov 2013 21:06:28 +0000 (UTC) Received: (qmail 62705 invoked by uid 500); 20 Nov 2013 21:06:28 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 62674 invoked by uid 500); 20 Nov 2013 21:06:27 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 62666 invoked by uid 99); 20 Nov 2013 21:06:27 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 21:06:27 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Nov 2013 21:06:25 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id CBFC7238883D; Wed, 20 Nov 2013 21:06:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1543947 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc: InStream.java RecordReaderImpl.java Date: Wed, 20 Nov 2013 21:06:05 -0000 To: commits@hive.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131120210605.CBFC7238883D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: hashutosh Date: Wed Nov 20 21:06:05 2013 New Revision: 1543947 URL: http://svn.apache.org/r1543947 Log: HIVE-5663 : Refactor ORC RecordReader to operate on direct & wrapped ByteBuffers (Gopal V via Ashutosh Chauhan) Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1543947&r1=1543946&r2=1543947&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Wed Nov 20 21:06:05 2013 @@ -29,10 +29,8 @@ abstract class InStream extends InputStr private final long[] offsets; private final long length; private long currentOffset; - private byte[] range; + private ByteBuffer range; private int currentRange; - private int offsetInRange; - private int limitInRange; public UncompressedStream(String name, ByteBuffer[] input, long[] offsets, long length) { @@ -41,42 +39,39 @@ abstract class InStream extends InputStr this.offsets = offsets; this.length = length; currentRange = 0; - offsetInRange = 0; - limitInRange = 0; currentOffset = 0; } @Override public int read() { - if (offsetInRange >= limitInRange) { + if (range == null || range.remaining() == 0) { if (currentOffset == length) { return -1; } seek(currentOffset); } currentOffset += 1; - return 0xff & range[offsetInRange++]; + return 0xff & range.get(); } @Override public int read(byte[] data, int offset, int length) { - if (offsetInRange >= limitInRange) { + if (range == null || range.remaining() == 0) { if (currentOffset == this.length) { return -1; } seek(currentOffset); } - int actualLength = Math.min(length, limitInRange - offsetInRange); - System.arraycopy(range, offsetInRange, data, offset, actualLength); - offsetInRange += actualLength; + int actualLength = Math.min(length, range.remaining()); + range.get(data, offset, actualLength); currentOffset += actualLength; return actualLength; } @Override public int available() { - if (offsetInRange < limitInRange) { - return limitInRange - offsetInRange; + if (range != null && range.remaining() > 0) { + return range.remaining(); } return (int) (length - currentOffset); } @@ -85,6 +80,10 @@ abstract class InStream extends InputStr public void close() { currentRange = bytes.length; currentOffset = length; + // explicit de-ref of bytes[] + for(int i = 0; i < bytes.length; i++) { + bytes[i] = null; + } } @Override @@ -98,10 +97,10 @@ abstract class InStream extends InputStr desired - offsets[i] < bytes[i].remaining()) { currentOffset = desired; currentRange = i; - this.range = bytes[i].array(); - offsetInRange = bytes[i].arrayOffset() + bytes[i].position(); - limitInRange = bytes[i].arrayOffset() + bytes[i].limit(); - offsetInRange += desired - offsets[i]; + this.range = bytes[i].duplicate(); + int pos = range.position(); + pos += (int)(desired - offsets[i]); // this is why we duplicate + this.range.position(pos); return; } } @@ -113,7 +112,7 @@ abstract class InStream extends InputStr public String toString() { return "uncompressed stream " + name + " position: " + currentOffset + " length: " + length + " range: " + currentRange + - " offset: " + offsetInRange + " limit: " + limitInRange; + " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit()); } } @@ -123,14 +122,13 @@ abstract class InStream extends InputStr private final long[] offsets; private final int bufferSize; private final long length; - private ByteBuffer uncompressed = null; + private ByteBuffer uncompressed; private final CompressionCodec codec; - private byte[] compressed = null; + private ByteBuffer compressed; private long currentOffset; private int currentRange; - private int offsetInCompressed; - private int limitInCompressed; private boolean isUncompressedOriginal; + private boolean isDirect = false; public CompressedStream(String name, ByteBuffer[] input, long[] offsets, long length, @@ -140,49 +138,59 @@ abstract class InStream extends InputStr this.name = name; this.codec = codec; this.length = length; + if(this.length > 0) { + isDirect = this.bytes[0].isDirect(); + } this.offsets = offsets; this.bufferSize = bufferSize; currentOffset = 0; currentRange = 0; - offsetInCompressed = 0; - limitInCompressed = 0; + } + + private ByteBuffer allocateBuffer(int size) { + // TODO: use the same pool as the ORC readers + if(isDirect == true) { + return ByteBuffer.allocateDirect(size); + } else { + return ByteBuffer.allocate(size); + } } private void readHeader() throws IOException { - if (compressed == null || offsetInCompressed >= limitInCompressed) { + if (compressed == null || compressed.remaining() <= 0) { seek(currentOffset); } - if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) { - int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) | - ((0xff & compressed[offsetInCompressed + 1]) << 7) | - ((0xff & compressed[offsetInCompressed]) >> 1); + if (compressed.remaining() > OutStream.HEADER_SIZE) { + int b0 = compressed.get() & 0xff; + int b1 = compressed.get() & 0xff; + int b2 = compressed.get() & 0xff; + boolean isOriginal = (b0 & 0x01) == 1; + int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1); + if (chunkLength > bufferSize) { throw new IllegalArgumentException("Buffer size too small. size = " + bufferSize + " needed = " + chunkLength); } - boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1; - offsetInCompressed += OutStream.HEADER_SIZE; + // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always + assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream"; + currentOffset += OutStream.HEADER_SIZE; + + ByteBuffer slice = this.slice(chunkLength); + if (isOriginal) { + uncompressed = slice; isUncompressedOriginal = true; - uncompressed = bytes[currentRange].duplicate(); - uncompressed.position(offsetInCompressed - - bytes[currentRange].arrayOffset()); - uncompressed.limit(offsetInCompressed + chunkLength); } else { if (isUncompressedOriginal) { - uncompressed = ByteBuffer.allocate(bufferSize); + uncompressed = allocateBuffer(bufferSize); isUncompressedOriginal = false; } else if (uncompressed == null) { - uncompressed = ByteBuffer.allocate(bufferSize); + uncompressed = allocateBuffer(bufferSize); } else { uncompressed.clear(); } - codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed, - chunkLength), - uncompressed); + codec.decompress(slice, uncompressed); } - offsetInCompressed += chunkLength; - currentOffset += chunkLength + OutStream.HEADER_SIZE; } else { throw new IllegalStateException("Can't read header at " + this); } @@ -208,10 +216,7 @@ abstract class InStream extends InputStr readHeader(); } int actualLength = Math.min(length, uncompressed.remaining()); - System.arraycopy(uncompressed.array(), - uncompressed.arrayOffset() + uncompressed.position(), data, - offset, actualLength); - uncompressed.position(uncompressed.position() + actualLength); + uncompressed.get(data, offset, actualLength); return actualLength; } @@ -229,10 +234,12 @@ abstract class InStream extends InputStr @Override public void close() { uncompressed = null; + compressed = null; currentRange = bytes.length; - offsetInCompressed = 0; - limitInCompressed = 0; currentOffset = length; + for(int i = 0; i < bytes.length; i++) { + bytes[i] = null; + } } @Override @@ -249,16 +256,62 @@ abstract class InStream extends InputStr } } + /* slices a read only contigous buffer of chunkLength */ + private ByteBuffer slice(int chunkLength) throws IOException { + int len = chunkLength; + final long oldOffset = currentOffset; + ByteBuffer slice; + if (compressed.remaining() >= len) { + slice = compressed.slice(); + // simple case + slice.limit(len); + currentOffset += len; + compressed.position(compressed.position() + len); + return slice; + } else if (currentRange >= (bytes.length - 1)) { + // nothing has been modified yet + throw new IOException("EOF in " + this + " while trying to read " + + chunkLength + " bytes"); + } + + // we need to consolidate 2 or more buffers into 1 + // first clear out compressed buffers + ByteBuffer copy = allocateBuffer(chunkLength); + currentOffset += compressed.remaining(); + len -= compressed.remaining(); + copy.put(compressed); + + while (len > 0 && (++currentRange) < bytes.length) { + compressed = bytes[currentRange].duplicate(); + if (compressed.remaining() >= len) { + slice = compressed.slice(); + slice.limit(len); + copy.put(slice); + currentOffset += len; + compressed.position(compressed.position() + len); + return copy; + } + currentOffset += compressed.remaining(); + len -= compressed.remaining(); + copy.put(compressed); + } + + // restore offsets for exception clarity + seek(oldOffset); + throw new IOException("EOF in " + this + " while trying to read " + + chunkLength + " bytes"); + } + private void seek(long desired) throws IOException { for(int i = 0; i < bytes.length; ++i) { if (offsets[i] <= desired && desired - offsets[i] < bytes[i].remaining()) { currentRange = i; - compressed = bytes[i].array(); - offsetInCompressed = (int) (bytes[i].arrayOffset() + - bytes[i].position() + (desired - offsets[i])); + compressed = bytes[i].duplicate(); + int pos = compressed.position(); + pos += (int)(desired - offsets[i]); + compressed.position(pos); currentOffset = desired; - limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit(); return; } } @@ -267,11 +320,9 @@ abstract class InStream extends InputStr if (segments != 0 && desired == offsets[segments - 1] + bytes[segments - 1].remaining()) { currentRange = segments - 1; - compressed = bytes[currentRange].array(); - offsetInCompressed = bytes[currentRange].arrayOffset() + - bytes[currentRange].limit(); + compressed = bytes[currentRange].duplicate(); + compressed.position(compressed.limit()); currentOffset = desired; - limitInCompressed = offsetInCompressed; return; } throw new IOException("Seek outside of data in " + this + " to " + @@ -294,7 +345,7 @@ abstract class InStream extends InputStr public String toString() { return "compressed stream " + name + " position: " + currentOffset + " length: " + length + " range: " + currentRange + - " offset: " + offsetInCompressed + " limit: " + limitInCompressed + + " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) + rangeString() + (uncompressed == null ? "" : " uncompressed: " + uncompressed.position() + " to " + Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1543947&r1=1543946&r2=1543947&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Nov 20 21:06:05 2013 @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -75,6 +76,7 @@ class RecordReaderImpl implements Record private long rowCountInStripe = 0; private final Map streams = new HashMap(); + List bufferChunks = new ArrayList(0); private final TreeReader reader; private final OrcProto.RowIndex[] indexes; private final SearchArgument sarg; @@ -125,6 +127,7 @@ class RecordReaderImpl implements Record rows += stripe.getNumberOfRows(); } } + firstRow = skippedRows; totalRowCount = rows; reader = createTreeReader(path, 0, types, included); @@ -2176,6 +2179,17 @@ class RecordReaderImpl implements Record return null; } + private void clearStreams() throws IOException { + // explicit close of all streams to de-ref ByteBuffers + for(InStream is: streams.values()) { + is.close(); + } + if(bufferChunks != null) { + bufferChunks.clear(); + } + streams.clear(); + } + /** * Read the current stripe into memory. * @throws IOException @@ -2183,7 +2197,7 @@ class RecordReaderImpl implements Record private void readStripe() throws IOException { StripeInformation stripe = stripes.get(currentStripe); stripeFooter = readStripeFooter(stripe); - streams.clear(); + clearStreams(); // setup the position in the stripe rowCountInStripe = stripe.getNumberOfRows(); rowInStripe = 0; @@ -2223,28 +2237,17 @@ class RecordReaderImpl implements Record private void readAllDataStreams(StripeInformation stripe ) throws IOException { - byte[] buffer = - new byte[(int) (stripe.getDataLength())]; - file.seek(stripe.getOffset() + stripe.getIndexLength()); - file.readFully(buffer, 0, buffer.length); - int sectionOffset = 0; - for(OrcProto.Stream section: stripeFooter.getStreamsList()) { - if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) { - int sectionLength = (int) section.getLength(); - ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset, - sectionLength); - StreamName name = new StreamName(section.getColumn(), - section.getKind()); - streams.put(name, - InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer}, - new long[]{0}, sectionLength, codec, bufferSize)); - sectionOffset += sectionLength; - } - } + long start = stripe.getIndexLength(); + long end = start + stripe.getDataLength(); + // explicitly trigger 1 big read + DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)}; + bufferChunks = readDiskRanges(file, stripe.getOffset(), Arrays.asList(ranges)); + List streamDescriptions = stripeFooter.getStreamsList(); + createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams); } /** - * The secionts of stripe that we need to read. + * The sections of stripe that we need to read. */ static class DiskRange { /** the first address we need to read. */ @@ -2275,6 +2278,30 @@ class RecordReaderImpl implements Record } } + /** + * The sections of stripe that we have read. + * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries. + */ + static class BufferChunk { + final ByteBuffer chunk; + /** the first address we need to read. */ + final long offset; + /** end of the buffer **/ + final long end; + + BufferChunk(ByteBuffer chunk, long offset) { + this.offset = offset; + this.chunk = chunk; + end = offset + chunk.remaining(); + } + + @Override + public final String toString() { + return "range start: " + offset + " size: " + chunk.remaining() + " type: " + + (chunk.isDirect() ? "direct" : "array-backed"); + } + } + private static final int BYTE_STREAM_POSITIONS = 1; private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1; @@ -2460,17 +2487,17 @@ class RecordReaderImpl implements Record * ranges * @throws IOException */ - static byte[][] readDiskRanges(FSDataInputStream file, + List readDiskRanges(FSDataInputStream file, long base, List ranges) throws IOException { - byte[][] result = new byte[ranges.size()][]; - int i = 0; + ArrayList result = new ArrayList(ranges.size()); for(DiskRange range: ranges) { int len = (int) (range.end - range.offset); - result[i] = new byte[len]; - file.seek(base + range.offset); - file.readFully(result[i]); - i += 1; + long off = range.offset; + file.seek(base + off); + byte[] buffer = new byte[len]; + file.readFully(buffer, 0, buffer.length); + result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset)); } return result; } @@ -2509,8 +2536,7 @@ class RecordReaderImpl implements Record } static void createStreams(List streamDescriptions, - List ranges, - byte[][] bytes, + List ranges, boolean[] includeColumn, CompressionCodec codec, int bufferSize, @@ -2519,13 +2545,13 @@ class RecordReaderImpl implements Record long offset = 0; for(OrcProto.Stream streamDesc: streamDescriptions) { int column = streamDesc.getColumn(); - if (includeColumn[column] && + if ((includeColumn == null || includeColumn[column]) && StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) { long length = streamDesc.getLength(); int first = -1; int last = -2; - for(int i=0; i < bytes.length; ++i) { - DiskRange range = ranges.get(i); + for(int i=0; i < ranges.size(); ++i) { + BufferChunk range = ranges.get(i); if (overlap(offset, offset+length, range.offset, range.end)) { if (first == -1) { first = i; @@ -2536,12 +2562,24 @@ class RecordReaderImpl implements Record ByteBuffer[] buffers = new ByteBuffer[last - first + 1]; long[] offsets = new long[last - first + 1]; for(int i=0; i < buffers.length; ++i) { - DiskRange range = ranges.get(i + first); + BufferChunk range = ranges.get(i + first); long start = Math.max(range.offset, offset); long end = Math.min(range.end, offset+length); - buffers[i] = ByteBuffer.wrap(bytes[first + i], - Math.max(0, (int) (offset - range.offset)), (int) (end - start)); - offsets[i] = Math.max(0, range.offset - offset); + buffers[i] = range.chunk.slice(); + assert range.chunk.position() == 0; // otherwise we'll mix up positions + /* + * buffers are positioned in-wards if the offset > range.offset + * offsets[i] == range.offset - offset, except if offset > range.offset + */ + if(offset > range.offset) { + buffers[i].position((int)(offset - range.offset)); + buffers[i].limit((int)(end - range.offset)); + offsets[i] = 0; + } else { + buffers[i].position(0); + buffers[i].limit((int)(end - range.offset)); + offsets[i] = (range.offset - offset); + } } StreamName name = new StreamName(column, streamDesc.getKind()); streams.put(name, InStream.create(name.toString(), buffers, offsets, @@ -2565,8 +2603,8 @@ class RecordReaderImpl implements Record if (LOG.isDebugEnabled()) { LOG.debug("merge = " + stringifyDiskRanges(chunks)); } - byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks); - createStreams(streamList, chunks, bytes, included, codec, bufferSize, + bufferChunks = readDiskRanges(file, stripe.getOffset(), chunks); + createStreams(streamList, bufferChunks, included, codec, bufferSize, streams); } @@ -2666,6 +2704,7 @@ class RecordReaderImpl implements Record @Override public void close() throws IOException { + clearStreams(); file.close(); }