Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B7F7D108F1 for ; Thu, 15 Aug 2013 19:07:53 +0000 (UTC) Received: (qmail 41473 invoked by uid 500); 15 Aug 2013 19:07:26 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 41404 invoked by uid 500); 15 Aug 2013 19:07:20 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 41289 invoked by uid 99); 15 Aug 2013 19:06:12 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Aug 2013 19:06:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Aug 2013 19:06:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 66BFF2388906; Thu, 15 Aug 2013 19:05:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1514438 [1/3] - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ ql/src/java/org/apache/hadoo... Date: Thu, 15 Aug 2013 19:05:36 -0000 To: commits@hive.apache.org From: gunther@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130815190539.66BFF2388906@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gunther Date: Thu Aug 15 19:05:35 2013 New Revision: 1514438 URL: http://svn.apache.org/r1514438 Log: HIVE-4246: Implement predicate pushdown for ORC (Owen O'Malley via Gunther Hagleitner) Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRunLengthByteReader.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRunLengthIntegerReader.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml hive/trunk/ql/src/test/results/compiler/plan/union.q.xml hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Aug 15 19:05:35 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; @@ -241,6 +242,7 @@ public class TableScanOperator extends O // and 2) it will fail some join and union queries if this is added forcibly // into tableScanDesc java.util.ArrayList neededColumnIDs; + List neededColumns; public void setNeededColumnIDs(java.util.ArrayList orign_columns) { neededColumnIDs = orign_columns; @@ -250,6 +252,14 @@ public class TableScanOperator extends O return neededColumnIDs; } + public void setNeededColumns(List columnNames) { + neededColumns = columnNames; + } + + public List getNeededColumns() { + return neededColumns; + } + @Override public OperatorType getType() { return OperatorType.TABLESCAN; Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Aug 15 19:05:35 2013 @@ -428,6 +428,8 @@ public class HiveInputFormat= limitInRange) { + if (currentOffset == length) { + return -1; + } + seek(currentOffset); } - return 0xff & array[offset++]; + currentOffset += 1; + return 0xff & range[offsetInRange++]; } @Override public int read(byte[] data, int offset, int length) { - if (this.offset == limit) { - return -1; + if (offsetInRange >= limitInRange) { + if (currentOffset == this.length) { + return -1; + } + seek(currentOffset); } - int actualLength = Math.min(length, limit - this.offset); - System.arraycopy(array, this.offset, data, offset, actualLength); - this.offset += actualLength; + int actualLength = Math.min(length, limitInRange - offsetInRange); + System.arraycopy(range, offsetInRange, data, offset, actualLength); + offsetInRange += actualLength; + currentOffset += actualLength; return actualLength; } @Override public int available() { - return limit - offset; + if (offsetInRange < limitInRange) { + return limitInRange - offsetInRange; + } + return (int) (length - currentOffset); } @Override public void close() { - array = null; - offset = 0; + currentRange = bytes.length; + currentOffset = length; } @Override public void seek(PositionProvider index) throws IOException { - offset = base + (int) index.getNext(); + seek(index.getNext()); + } + + public void seek(long desired) { + for(int i = 0; i < bytes.length; ++i) { + if (offsets[i] <= desired && + desired - offsets[i] < bytes[i].remaining()) { + currentOffset = desired; + currentRange = i; + this.range = bytes[i].array(); + offsetInRange = bytes[i].arrayOffset() + bytes[i].position(); + limitInRange = bytes[i].arrayOffset() + bytes[i].limit(); + offsetInRange += desired - offsets[i]; + return; + } + } + throw new IllegalArgumentException("Seek in " + name + " to " + + desired + " is outside of the data"); } @Override public String toString() { - return "uncompressed stream " + name + " base: " + base + - " offset: " + offset + " limit: " + limit; + return "uncompressed stream " + name + " position: " + currentOffset + + " length: " + length + " range: " + currentRange + + " offset: " + offsetInRange + " limit: " + limitInRange; } } private static class CompressedStream extends InStream { private final String name; - private byte[] array; + private final ByteBuffer[] bytes; + private final long[] offsets; private final int bufferSize; + private final long length; private ByteBuffer uncompressed = null; private final CompressionCodec codec; - private int offset; - private final int base; - private final int limit; + private byte[] compressed = null; + private long currentOffset; + private int currentRange; + private int offsetInCompressed; + private int limitInCompressed; private boolean isUncompressedOriginal; - public CompressedStream(String name, ByteBuffer input, + public CompressedStream(String name, ByteBuffer[] input, + long[] offsets, long length, CompressionCodec codec, int bufferSize ) { - this.array = input.array(); + this.bytes = input; this.name = name; this.codec = codec; + this.length = length; + this.offsets = offsets; this.bufferSize = bufferSize; - base = input.arrayOffset() + input.position(); - offset = base; - limit = input.arrayOffset() + input.limit(); + currentOffset = 0; + currentRange = 0; + offsetInCompressed = 0; + limitInCompressed = 0; } private void readHeader() throws IOException { - if (limit - offset > OutStream.HEADER_SIZE) { - int chunkLength = ((0xff & array[offset + 2]) << 15) | - ((0xff & array[offset + 1]) << 7) | ((0xff & array[offset]) >> 1); + if (compressed == null || offsetInCompressed >= limitInCompressed) { + seek(currentOffset); + } + if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) { + int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) | + ((0xff & compressed[offsetInCompressed + 1]) << 7) | + ((0xff & compressed[offsetInCompressed]) >> 1); if (chunkLength > bufferSize) { throw new IllegalArgumentException("Buffer size too small. size = " + bufferSize + " needed = " + chunkLength); } - boolean isOriginal = (array[offset] & 0x01) == 1; - offset += OutStream.HEADER_SIZE; + boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1; + offsetInCompressed += OutStream.HEADER_SIZE; if (isOriginal) { isUncompressedOriginal = true; - uncompressed = ByteBuffer.wrap(array, offset, chunkLength); + uncompressed = bytes[currentRange].duplicate(); + uncompressed.position(offsetInCompressed - + bytes[currentRange].arrayOffset()); + uncompressed.limit(offsetInCompressed + chunkLength); } else { if (isUncompressedOriginal) { uncompressed = ByteBuffer.allocate(bufferSize); @@ -125,19 +177,21 @@ abstract class InStream extends InputStr } else { uncompressed.clear(); } - codec.decompress(ByteBuffer.wrap(array, offset, chunkLength), + codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed, + chunkLength), uncompressed); } - offset += chunkLength; + offsetInCompressed += chunkLength; + currentOffset += chunkLength + OutStream.HEADER_SIZE; } else { - throw new IllegalStateException("Can't read header"); + throw new IllegalStateException("Can't read header at " + this); } } @Override public int read() throws IOException { if (uncompressed == null || uncompressed.remaining() == 0) { - if (offset == limit) { + if (currentOffset == length) { return -1; } readHeader(); @@ -148,7 +202,7 @@ abstract class InStream extends InputStr @Override public int read(byte[] data, int offset, int length) throws IOException { if (uncompressed == null || uncompressed.remaining() == 0) { - if (this.offset == this.limit) { + if (currentOffset == this.length) { return -1; } readHeader(); @@ -164,7 +218,7 @@ abstract class InStream extends InputStr @Override public int available() throws IOException { if (uncompressed == null || uncompressed.remaining() == 0) { - if (offset == limit) { + if (currentOffset == length) { return 0; } readHeader(); @@ -174,27 +228,74 @@ abstract class InStream extends InputStr @Override public void close() { - array = null; uncompressed = null; - offset = 0; + currentRange = bytes.length; + offsetInCompressed = 0; + limitInCompressed = 0; + currentOffset = length; } @Override public void seek(PositionProvider index) throws IOException { - offset = base + (int) index.getNext(); - int uncompBytes = (int) index.getNext(); - if (uncompBytes != 0) { + seek(index.getNext()); + long uncompressedBytes = index.getNext(); + if (uncompressedBytes != 0) { readHeader(); - uncompressed.position(uncompressed.position() + uncompBytes); + uncompressed.position(uncompressed.position() + + (int) uncompressedBytes); } else if (uncompressed != null) { + // mark the uncompressed buffer as done uncompressed.position(uncompressed.limit()); } } + private void seek(long desired) throws IOException { + for(int i = 0; i < bytes.length; ++i) { + if (offsets[i] <= desired && + desired - offsets[i] < bytes[i].remaining()) { + currentRange = i; + compressed = bytes[i].array(); + offsetInCompressed = (int) (bytes[i].arrayOffset() + + bytes[i].position() + (desired - offsets[i])); + currentOffset = desired; + limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit(); + return; + } + } + // if they are seeking to the precise end, go ahead and let them go there + int segments = bytes.length; + if (segments != 0 && + desired == offsets[segments - 1] + bytes[segments - 1].remaining()) { + currentRange = segments - 1; + compressed = bytes[currentRange].array(); + offsetInCompressed = bytes[currentRange].arrayOffset() + + bytes[currentRange].limit(); + currentOffset = desired; + limitInCompressed = offsetInCompressed; + return; + } + throw new IOException("Seek outside of data in " + this + " to " + + desired); + } + + private String rangeString() { + StringBuilder builder = new StringBuilder(); + for(int i=0; i < offsets.length; ++i) { + if (i != 0) { + builder.append("; "); + } + builder.append(" range " + i + " = " + offsets[i] + " to " + + bytes[i].remaining()); + } + return builder.toString(); + } + @Override public String toString() { - return "compressed stream " + name + " base: " + base + - " offset: " + offset + " limit: " + limit + + return "compressed stream " + name + " position: " + currentOffset + + " length: " + length + " range: " + currentRange + + " offset: " + offsetInCompressed + " limit: " + limitInCompressed + + rangeString() + (uncompressed == null ? "" : " uncompressed: " + uncompressed.position() + " to " + uncompressed.limit()); @@ -203,14 +304,29 @@ abstract class InStream extends InputStr public abstract void seek(PositionProvider index) throws IOException; + /** + * Create an input stream from a list of buffers. + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream + * @param offsets a list of offsets (the same length as input) that must + * contain the first offset of the each set of bytes in input + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return an input stream + * @throws IOException + */ public static InStream create(String name, - ByteBuffer input, + ByteBuffer[] input, + long[] offsets, + long length, CompressionCodec codec, int bufferSize) throws IOException { if (codec == null) { - return new UncompressedStream(name, input); + return new UncompressedStream(name, input, offsets, length); } else { - return new CompressedStream(name, input, codec, bufferSize); + return new CompressedStream(name, input, offsets, length, codec, + bufferSize); } } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Aug 15 19:05:35 2013 @@ -18,13 +18,22 @@ package org.apache.hadoop.hive.ql.io.orc; +import com.google.common.collect.Lists; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -43,6 +52,8 @@ import java.util.List; public class OrcInputFormat extends FileInputFormat implements InputFormatChecker { + private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); + private static class OrcRecordReader implements RecordReader { private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; @@ -53,14 +64,38 @@ public class OrcInputFormat extends Fil OrcRecordReader(Reader file, Configuration conf, long offset, long length) throws IOException { - this.reader = file.rows(offset, length, - findIncludedColumns(file.getTypes(), conf)); + String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); + String columnNamesString = + conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + String[] columnNames = null; + SearchArgument sarg = null; List types = file.getTypes(); if (types.size() == 0) { numColumns = 0; } else { numColumns = types.get(0).getSubtypesCount(); } + columnNames = new String[types.size()]; + LOG.info("included column ids = " + + conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "null")); + LOG.info("included columns names = " + + conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "null")); + boolean[] includeColumn = findIncludedColumns(types, conf); + if (serializedPushdown != null && columnNamesString != null) { + sarg = SearchArgument.FACTORY.create + (Utilities.deserializeExpression(serializedPushdown, conf)); + LOG.info("ORC pushdown predicate: " + sarg); + String[] neededColumnNames = columnNamesString.split(","); + int i = 0; + for(int columnId: types.get(0).getSubtypesList()) { + if (includeColumn[columnId]) { + columnNames[columnId] = neededColumnNames[i++]; + } + } + } else { + LOG.info("No ORC pushdown predicate"); + } + this.reader = file.rows(offset, length,includeColumn, sarg, columnNames); this.offset = offset; this.length = length; } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Thu Aug 15 19:05:35 2013 @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -38,6 +40,9 @@ import java.util.Properties; * It transparently passes the object to/from the ORC file reader/writer. */ public class OrcSerde implements SerDe { + + private static final Log LOG = LogFactory.getLog(OrcSerde.class); + private final OrcSerdeRow row = new OrcSerdeRow(); private ObjectInspector inspector = null; @@ -129,4 +134,5 @@ public class OrcSerde implements SerDe { public SerDeStats getSerDeStats() { return null; } + } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Thu Aug 15 19:05:35 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import java.io.IOException; @@ -118,8 +119,26 @@ public interface Reader { * @param include true for each column that should be included * @return a new RecordReader that will read the specified rows. * @throws IOException + * @deprecated */ RecordReader rows(long offset, long length, boolean[] include) throws IOException; + /** + * Create a RecordReader that will read a section of a file. It starts reading + * at the first stripe after the offset and continues to the stripe that + * starts at offset + length. It also accepts a list of columns to read and a + * search argument. + * @param offset the minimum offset of the first stripe to read + * @param length the distance from offset of the first address to stop reading + * at + * @param include true for each column that should be included + * @param sarg a search argument that limits the rows that should be read. + * @param neededColumns the names of the included columns + * @return the record reader for the rows + */ + RecordReader rows(long offset, long length, + boolean[] include, SearchArgument sarg, + String[] neededColumns) throws IOException; + } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Thu Aug 15 19:05:35 2013 @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Text; @@ -307,7 +308,8 @@ final class ReaderImpl implements Reader buffer.position(psOffset - footerSize); buffer.limit(psOffset); } - InputStream instream = InStream.create("footer", buffer, codec, bufferSize); + InputStream instream = InStream.create("footer", new ByteBuffer[]{buffer}, + new long[]{0L}, footerSize, codec, bufferSize); footer = OrcProto.Footer.parseFrom(instream); inspector = OrcStruct.createObjectInspector(0, footer.getTypesList()); file.close(); @@ -315,15 +317,22 @@ final class ReaderImpl implements Reader @Override public RecordReader rows(boolean[] include) throws IOException { - return rows(0, Long.MAX_VALUE, include); + return rows(0, Long.MAX_VALUE, include, null, null); } @Override public RecordReader rows(long offset, long length, boolean[] include ) throws IOException { + return rows(offset, length, include, null, null); + } + + @Override + public RecordReader rows(long offset, long length, boolean[] include, + SearchArgument sarg, String[] columnNames + ) throws IOException { return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset, - length, footer.getTypesList(), codec, bufferSize, - include, footer.getRowIndexStride()); + length, footer.getTypesList(), codec, bufferSize, + include, footer.getRowIndexStride(), sarg, columnNames); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Thu Aug 15 19:05:35 2013 @@ -27,12 +27,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -45,6 +51,9 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text; class RecordReaderImpl implements RecordReader { + + private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); + private final FSDataInputStream file; private final long firstRow; private final List stripes = @@ -52,17 +61,25 @@ class RecordReaderImpl implements Record private OrcProto.StripeFooter stripeFooter; private final long totalRowCount; private final CompressionCodec codec; + private final List types; private final int bufferSize; private final boolean[] included; private final long rowIndexStride; private long rowInStripe = 0; - private int currentStripe = 0; + private int currentStripe = -1; private long rowBaseInStripe = 0; private long rowCountInStripe = 0; private final Map streams = new HashMap(); private final TreeReader reader; private final OrcProto.RowIndex[] indexes; + private final SearchArgument sarg; + // the leaf predicates for the sarg + private final List sargLeaves; + // an array the same length as the sargLeaves that map them to column ids + private final int[] filterColumns; + // an array about which row groups aren't skipped + private boolean[] includedRowGroups = null; RecordReaderImpl(Iterable stripes, FileSystem fileSystem, @@ -72,12 +89,27 @@ class RecordReaderImpl implements Record CompressionCodec codec, int bufferSize, boolean[] included, - long strideRate + long strideRate, + SearchArgument sarg, + String[] columnNames ) throws IOException { this.file = fileSystem.open(path); this.codec = codec; + this.types = types; this.bufferSize = bufferSize; this.included = included; + this.sarg = sarg; + if (sarg != null) { + sargLeaves = sarg.getLeaves(); + filterColumns = new int[sargLeaves.size()]; + for(int i=0; i < filterColumns.length; ++i) { + String colName = sargLeaves.get(i).getColumnName(); + filterColumns[i] = findColumns(columnNames, colName); + } + } else { + sargLeaves = null; + filterColumns = null; + } long rows = 0; long skippedRows = 0; for(StripeInformation stripe: stripes) { @@ -94,9 +126,17 @@ class RecordReaderImpl implements Record reader = createTreeReader(path, 0, types, included); indexes = new OrcProto.RowIndex[types.size()]; rowIndexStride = strideRate; - if (this.stripes.size() > 0) { - readStripe(); + advanceToNextRow(0L); + } + + private static int findColumns(String[] columnNames, + String columnName) { + for(int i=0; i < columnNames.length; ++i) { + if (columnName.equals(columnNames[i])) { + return i; + } } + return -1; } private static final class PositionProviderImpl implements PositionProvider { @@ -1418,113 +1458,707 @@ class RecordReaderImpl implements Record ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.seek(offset); file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf, - codec, bufferSize)); + return OrcProto.StripeFooter.parseFrom(InStream.create("footer", + new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec, + bufferSize)); } - private void readStripe() throws IOException { - StripeInformation stripe = stripes.get(currentStripe); - stripeFooter = readStripeFooter(stripe); - long offset = stripe.getOffset(); - streams.clear(); + static enum Location { + BEFORE, MIN, MIDDLE, MAX, AFTER + } - // if we aren't projecting columns, just read the whole stripe - if (included == null) { - byte[] buffer = - new byte[(int) (stripe.getDataLength())]; - file.seek(offset + stripe.getIndexLength()); - file.readFully(buffer, 0, buffer.length); - int sectionOffset = 0; - for(OrcProto.Stream section: stripeFooter.getStreamsList()) { - if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) { - int sectionLength = (int) section.getLength(); - ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset, - sectionLength); - StreamName name = new StreamName(section.getColumn(), - section.getKind()); - streams.put(name, - InStream.create(name.toString(), sectionBuffer, codec, - bufferSize)); - sectionOffset += sectionLength; - } + /** + * Given a point and min and max, determine if the point is before, at the + * min, in the middle, at the max, or after the range. + * @param point the point to test + * @param min the minimum point + * @param max the maximum point + * @param the type of the comparision + * @return the location of the point + */ + static Location compareToRange(Comparable point, T min, T max) { + int minCompare = point.compareTo(min); + if (minCompare < 0) { + return Location.BEFORE; + } else if (minCompare == 0) { + return Location.MIN; + } + int maxCompare = point.compareTo(max); + if (maxCompare > 0) { + return Location.AFTER; + } else if (maxCompare == 0) { + return Location.MAX; + } + return Location.MIDDLE; + } + + /** + * Get the minimum value out of an index entry. + * @param index the index entry + * @return the object for the minimum value or null if there isn't one + */ + static Object getMin(OrcProto.ColumnStatistics index) { + if (index.hasIntStatistics()) { + OrcProto.IntegerStatistics stat = index.getIntStatistics(); + if (stat.hasMinimum()) { + return stat.getMinimum(); } - } else { - List streamList = stripeFooter.getStreamsList(); - // the index of the current section - int currentSection = 0; - while (currentSection < streamList.size() && - StreamName.getArea(streamList.get(currentSection).getKind()) != - StreamName.Area.DATA) { - currentSection += 1; - } - // byte position of the current section relative to the stripe start - long sectionOffset = stripe.getIndexLength(); - while (currentSection < streamList.size()) { - int bytes = 0; - - // find the first section that shouldn't be read - int excluded=currentSection; - while (excluded < streamList.size() && - included[streamList.get(excluded).getColumn()]) { - bytes += streamList.get(excluded).getLength(); - excluded += 1; - } - - // actually read the bytes as a big chunk - if (bytes != 0) { - byte[] buffer = new byte[bytes]; - file.seek(offset + sectionOffset); - file.readFully(buffer, 0, bytes); - sectionOffset += bytes; - - // create the streams for the sections we just read - bytes = 0; - while (currentSection < excluded) { - OrcProto.Stream section = streamList.get(currentSection); - StreamName name = - new StreamName(section.getColumn(), section.getKind()); - this.streams.put(name, - InStream.create(name.toString(), - ByteBuffer.wrap(buffer, bytes, - (int) section.getLength()), codec, bufferSize)); - currentSection += 1; - bytes += section.getLength(); + } + if (index.hasStringStatistics()) { + OrcProto.StringStatistics stat = index.getStringStatistics(); + if (stat.hasMinimum()) { + return stat.getMinimum(); + } + } + if (index.hasDoubleStatistics()) { + OrcProto.DoubleStatistics stat = index.getDoubleStatistics(); + if (stat.hasMinimum()) { + return stat.getMinimum(); + } + } + return null; + } + + /** + * Get the maximum value out of an index entry. + * @param index the index entry + * @return the object for the maximum value or null if there isn't one + */ + static Object getMax(OrcProto.ColumnStatistics index) { + if (index.hasIntStatistics()) { + OrcProto.IntegerStatistics stat = index.getIntStatistics(); + if (stat.hasMaximum()) { + return stat.getMaximum(); + } + } + if (index.hasStringStatistics()) { + OrcProto.StringStatistics stat = index.getStringStatistics(); + if (stat.hasMaximum()) { + return stat.getMaximum(); + } + } + if (index.hasDoubleStatistics()) { + OrcProto.DoubleStatistics stat = index.getDoubleStatistics(); + if (stat.hasMaximum()) { + return stat.getMaximum(); + } + } + return null; + } + + /** + * Evaluate a predicate with respect to the statistics from the column + * that is referenced in the predicate. + * @param index the statistics for the column mentioned in the predicate + * @param predicate the leaf predicate we need to evaluation + * @return the set of truth values that may be returned for the given + * predicate. + */ + static TruthValue evaluatePredicate(OrcProto.ColumnStatistics index, + PredicateLeaf predicate) { + Object minValue = getMin(index); + // if we didn't have any values, everything must have been null + if (minValue == null) { + if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { + return TruthValue.YES; + } else { + return TruthValue.NULL; + } + } + Object maxValue = getMax(index); + Location loc; + switch (predicate.getOperator()) { + case NULL_SAFE_EQUALS: + loc = compareToRange((Comparable) predicate.getLiteral(), + minValue, maxValue); + if (loc == Location.BEFORE || loc == Location.AFTER) { + return TruthValue.NO; + } else { + return TruthValue.YES_NO; + } + case EQUALS: + loc = compareToRange((Comparable) predicate.getLiteral(), + minValue, maxValue); + if (minValue.equals(maxValue) && loc == Location.MIN) { + return TruthValue.YES_NULL; + } else if (loc == Location.BEFORE || loc == Location.AFTER) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; + } + case LESS_THAN: + loc = compareToRange((Comparable) predicate.getLiteral(), + minValue, maxValue); + if (loc == Location.AFTER) { + return TruthValue.YES_NULL; + } else if (loc == Location.BEFORE || loc == Location.MIN) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; + } + case LESS_THAN_EQUALS: + loc = compareToRange((Comparable) predicate.getLiteral(), + minValue, maxValue); + if (loc == Location.AFTER || loc == Location.MAX) { + return TruthValue.YES_NULL; + } else if (loc == Location.BEFORE) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; + } + case IN: + if (minValue.equals(maxValue)) { + // for a single value, look through to see if that value is in the + // set + for(Object arg: predicate.getLiteralList()) { + loc = compareToRange((Comparable) arg, minValue, maxValue); + if (loc == Location.MIN) { + return TruthValue.YES_NULL; + } + } + return TruthValue.NO_NULL; + } else { + // are all of the values outside of the range? + for(Object arg: predicate.getLiteralList()) { + loc = compareToRange((Comparable) arg, minValue, maxValue); + if (loc == Location.MIN || loc == Location.MIDDLE || + loc == Location.MAX) { + return TruthValue.YES_NO_NULL; + } } + return TruthValue.NO_NULL; } - - // skip forward until we get back to a section that we need - while (currentSection < streamList.size() && - !included[streamList.get(currentSection).getColumn()]) { - sectionOffset += streamList.get(currentSection).getLength(); - currentSection += 1; + case BETWEEN: + List args = predicate.getLiteralList(); + loc = compareToRange((Comparable) args.get(0), minValue, maxValue); + if (loc == Location.BEFORE || loc == Location.MIN) { + Location loc2 = compareToRange((Comparable) args.get(1), minValue, + maxValue); + if (loc2 == Location.AFTER || loc2 == Location.MAX) { + return TruthValue.YES_NULL; + } else if (loc2 == Location.BEFORE) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; + } + } else if (loc == Location.AFTER) { + return TruthValue.NO_NULL; + } else { + return TruthValue.YES_NO_NULL; } + case IS_NULL: + return TruthValue.YES_NO; + default: + return TruthValue.YES_NO_NULL; + } + } + + /** + * Pick the row groups that we need to load from the current stripe. + * @return an array with a boolean for each row group or null if all of the + * row groups must be read. + * @throws IOException + */ + private boolean[] pickRowGroups() throws IOException { + // if we don't have a sarg or indexes, we read everything + if (sarg == null || rowIndexStride == 0) { + return null; + } + readRowIndex(); + long rowsInStripe = stripes.get(currentStripe).getNumberOfRows(); + int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / + rowIndexStride); + boolean[] result = new boolean[groupsInStripe]; + TruthValue[] leafValues = new TruthValue[sargLeaves.size()]; + for(int rowGroup=0; rowGroup < result.length; ++rowGroup) { + for(int pred=0; pred < leafValues.length; ++pred) { + OrcProto.ColumnStatistics stats = + indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics(); + leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred)); + if (LOG.isDebugEnabled()) { + LOG.debug("Stats = " + stats); + LOG.debug("Setting " + sargLeaves.get(pred) + " to " + + leafValues[pred]); + } + } + result[rowGroup] = sarg.evaluate(leafValues).isNotNeeded(); + if (LOG.isDebugEnabled()) { + LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " + + (rowIndexStride * (rowGroup+1) - 1) + " is " + + (result[rowGroup] ? "" : "not ") + "included."); } } - reader.startStripe(streams, stripeFooter.getColumnsList()); - rowInStripe = 0; + + // if we found something to skip, use the array. otherwise, return null. + for(boolean b: result) { + if (!b) { + return result; + } + } + return null; + } + + /** + * Read the current stripe into memory. + * @throws IOException + */ + private void readStripe() throws IOException { + StripeInformation stripe = stripes.get(currentStripe); + stripeFooter = readStripeFooter(stripe); + streams.clear(); + // setup the position in the stripe rowCountInStripe = stripe.getNumberOfRows(); + rowInStripe = 0; rowBaseInStripe = 0; for(int i=0; i < currentStripe; ++i) { rowBaseInStripe += stripes.get(i).getNumberOfRows(); } + // reset all of the indexes for(int i=0; i < indexes.length; ++i) { indexes[i] = null; } + includedRowGroups = pickRowGroups(); + + // move forward to the first unskipped row + if (includedRowGroups != null) { + while (rowInStripe < rowCountInStripe && + !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) { + rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride); + } + } + + // if we haven't skipped the whole stripe, read the data + if (rowInStripe < rowCountInStripe) { + // if we aren't projecting columns or filtering rows, just read it all + if (included == null && includedRowGroups == null) { + readAllDataStreams(stripe); + } else { + readPartialDataStreams(stripe); + } + reader.startStripe(streams, stripeFooter.getColumnsList()); + // if we skipped the first row group, move the pointers forward + if (rowInStripe != 0) { + seekToRowEntry((int) (rowInStripe / rowIndexStride)); + } + } + } + + private void readAllDataStreams(StripeInformation stripe + ) throws IOException { + byte[] buffer = + new byte[(int) (stripe.getDataLength())]; + file.seek(stripe.getOffset() + stripe.getIndexLength()); + file.readFully(buffer, 0, buffer.length); + int sectionOffset = 0; + for(OrcProto.Stream section: stripeFooter.getStreamsList()) { + if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) { + int sectionLength = (int) section.getLength(); + ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset, + sectionLength); + StreamName name = new StreamName(section.getColumn(), + section.getKind()); + streams.put(name, + InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer}, + new long[]{0}, sectionLength, codec, bufferSize)); + sectionOffset += sectionLength; + } + } + } + + /** + * The secionts of stripe that we need to read. + */ + static class DiskRange { + /** the first address we need to read. */ + long offset; + /** the first address afterwards. */ + long end; + + DiskRange(long offset, long end) { + this.offset = offset; + this.end = end; + if (end < offset) { + throw new IllegalArgumentException("invalid range " + this); + } + } + + @Override + public boolean equals(Object other) { + if (other == null || other.getClass() != getClass()) { + return false; + } + DiskRange otherR = (DiskRange) other; + return otherR.offset == offset && otherR.end == end; + } + + @Override + public String toString() { + return "range start: " + offset + " end: " + end; + } + } + + private static final int BYTE_STREAM_POSITIONS = 1; + private static final int RUN_LENGTH_BYTE_POSITIONS = + BYTE_STREAM_POSITIONS + 1; + private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1; + private static final int RUN_LENGTH_INT_POSITIONS = + BYTE_STREAM_POSITIONS + 1; + + /** + * Get the offset in the index positions for the column that the given + * stream starts. + * @param encoding the encoding of the column + * @param type the type of the column + * @param stream the kind of the stream + * @param isCompressed is the file compressed + * @param hasNulls does the column have a PRESENT stream? + * @return the number of positions that will be used for that stream + */ + static int getIndexPosition(OrcProto.ColumnEncoding.Kind encoding, + OrcProto.Type.Kind type, + OrcProto.Stream.Kind stream, + boolean isCompressed, + boolean hasNulls) { + if (stream == OrcProto.Stream.Kind.PRESENT) { + return 0; + } + int compressionValue = isCompressed ? 1 : 0; + int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0; + switch (type) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case STRUCT: + case MAP: + case LIST: + case UNION: + return base; + case STRING: + if (encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + return base; + } else { + if (stream == OrcProto.Stream.Kind.DATA) { + return base; + } else { + return base + BYTE_STREAM_POSITIONS + compressionValue; + } + } + case BINARY: + if (stream == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + BYTE_STREAM_POSITIONS + compressionValue; + case DECIMAL: + if (stream == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + BYTE_STREAM_POSITIONS + compressionValue; + case TIMESTAMP: + if (stream == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + RUN_LENGTH_INT_POSITIONS + compressionValue; + default: + throw new IllegalArgumentException("Unknown type " + type); + } + } + + // for uncompressed streams, what is the most overlap with the following set + // of rows (long vint literal group). + static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512; + + /** + * Is this stream part of a dictionary? + * @return is this part of a dictionary? + */ + static boolean isDictionary(OrcProto.Stream.Kind kind, + OrcProto.ColumnEncoding encoding) { + OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind(); + return kind == OrcProto.Stream.Kind.DICTIONARY_DATA || + (kind == OrcProto.Stream.Kind.LENGTH && + (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)); + } + + /** + * Plan the ranges of the file that we need to read given the list of + * columns and row groups. + * @param streamList the list of streams avaiable + * @param indexes the indexes that have been loaded + * @param includedColumns which columns are needed + * @param includedRowGroups which row groups are needed + * @param isCompressed does the file have generic compression + * @param encodings the encodings for each column + * @param types the types of the columns + * @param compressionSize the compression block size + * @return the list of disk ranges that will be loaded + */ + static List planReadPartialDataStreams + (List streamList, + OrcProto.RowIndex[] indexes, + boolean[] includedColumns, + boolean[] includedRowGroups, + boolean isCompressed, + List encodings, + List types, + int compressionSize) { + List result = new ArrayList(); + long offset = 0; + // figure out which columns have a present stream + boolean[] hasNull = new boolean[types.size()]; + for(OrcProto.Stream stream: streamList) { + if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) { + hasNull[stream.getColumn()] = true; + } + } + for(OrcProto.Stream stream: streamList) { + long length = stream.getLength(); + int column = stream.getColumn(); + OrcProto.Stream.Kind streamKind = stream.getKind(); + if (StreamName.getArea(streamKind) == StreamName.Area.DATA && + includedColumns[column]) { + // if we aren't filtering or it is a dictionary, load it. + if (includedRowGroups == null || + isDictionary(streamKind, encodings.get(column))) { + result.add(new DiskRange(offset, offset + length)); + } else { + for(int group=0; group < includedRowGroups.length; ++group) { + if (includedRowGroups[group]) { + int posn = getIndexPosition(encodings.get(column).getKind(), + types.get(column).getKind(), stream.getKind(), isCompressed, + hasNull[column]); + long start = indexes[column].getEntry(group).getPositions(posn); + // figure out the worst case last location + long end = (group == includedRowGroups.length - 1) ? + length : Math.min(length, + indexes[column].getEntry(group + 1) + .getPositions(posn) + + (isCompressed ? + (OutStream.HEADER_SIZE + + compressionSize) : + WORST_UNCOMPRESSED_SLOP)); + result.add(new DiskRange(offset + start, offset + end)); + } + } + } + } + offset += length; + } + return result; + } + + /** + * Update the disk ranges to collapse adjacent or overlapping ranges. It + * assumes that the ranges are sorted. + * @param ranges the list of disk ranges to merge + */ + static void mergeDiskRanges(List ranges) { + DiskRange prev = null; + for(int i=0; i < ranges.size(); ++i) { + DiskRange current = ranges.get(i); + if (prev != null && overlap(prev.offset, prev.end, + current.offset, current.end)) { + prev.offset = Math.min(prev.offset, current.offset); + prev.end = Math.max(prev.end, current.end); + ranges.remove(i); + i -= 1; + } else { + prev = current; + } + } + } + + /** + * Read the list of ranges from the file. + * @param file the file to read + * @param base the base of the stripe + * @param ranges the disk ranges within the stripe to read + * @return the bytes read for each disk range, which is the same length as + * ranges + * @throws IOException + */ + static byte[][] readDiskRanges(FSDataInputStream file, + long base, + List ranges) throws IOException { + byte[][] result = new byte[ranges.size()][]; + int i = 0; + for(DiskRange range: ranges) { + int len = (int) (range.end - range.offset); + result[i] = new byte[len]; + file.seek(base + range.offset); + file.readFully(result[i]); + i += 1; + } + return result; + } + + /** + * Does region A overlap region B? The end points are inclusive on both sides. + * @param leftA A's left point + * @param rightA A's right point + * @param leftB B's left point + * @param rightB B's right point + * @return Does region A overlap region B? + */ + static boolean overlap(long leftA, long rightA, long leftB, long rightB) { + if (leftA <= leftB) { + return rightA >= leftB; + } + return rightB >= leftA; + } + + /** + * Build a string representation of a list of disk ranges. + * @param ranges ranges to stringify + * @return the resulting string + */ + static String stringifyDiskRanges(List ranges) { + StringBuilder buffer = new StringBuilder(); + buffer.append("["); + for(int i=0; i < ranges.size(); ++i) { + if (i != 0) { + buffer.append(", "); + } + buffer.append(ranges.get(i).toString()); + } + buffer.append("]"); + return buffer.toString(); + } + + static void createStreams(List streamDescriptions, + List ranges, + byte[][] bytes, + boolean[] includeColumn, + CompressionCodec codec, + int bufferSize, + Map streams + ) throws IOException { + long offset = 0; + for(OrcProto.Stream streamDesc: streamDescriptions) { + int column = streamDesc.getColumn(); + if (includeColumn[column] && + StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) { + long length = streamDesc.getLength(); + int first = -1; + int last = -2; + for(int i=0; i < bytes.length; ++i) { + DiskRange range = ranges.get(i); + if (overlap(offset, offset+length, range.offset, range.end)) { + if (first == -1) { + first = i; + } + last = i; + } + } + ByteBuffer[] buffers = new ByteBuffer[last - first + 1]; + long[] offsets = new long[last - first + 1]; + for(int i=0; i < buffers.length; ++i) { + DiskRange range = ranges.get(i + first); + long start = Math.max(range.offset, offset); + long end = Math.min(range.end, offset+length); + buffers[i] = ByteBuffer.wrap(bytes[first + i], + Math.max(0, (int) (offset - range.offset)), (int) (end - start)); + offsets[i] = Math.max(0, range.offset - offset); + } + StreamName name = new StreamName(column, streamDesc.getKind()); + streams.put(name, InStream.create(name.toString(), buffers, offsets, + length, codec, bufferSize)); + } + offset += streamDesc.getLength(); + } + } + + private void readPartialDataStreams(StripeInformation stripe + ) throws IOException { + List streamList = stripeFooter.getStreamsList(); + List chunks = + planReadPartialDataStreams(streamList, + indexes, included, includedRowGroups, codec != null, + stripeFooter.getColumnsList(), types, bufferSize); + if (LOG.isDebugEnabled()) { + LOG.debug("chunks = " + stringifyDiskRanges(chunks)); + } + mergeDiskRanges(chunks); + if (LOG.isDebugEnabled()) { + LOG.debug("merge = " + stringifyDiskRanges(chunks)); + } + byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks); + createStreams(streamList, chunks, bytes, included, codec, bufferSize, + streams); } @Override public boolean hasNext() throws IOException { - return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1; + return rowInStripe < rowCountInStripe; } - @Override - public Object next(Object previous) throws IOException { - if (rowInStripe >= rowCountInStripe) { + /** + * Read the next stripe until we find a row that we don't skip. + * @throws IOException + */ + private void advanceStripe() throws IOException { + rowInStripe = rowCountInStripe; + while (rowInStripe >= rowCountInStripe && + currentStripe < stripes.size() - 1) { currentStripe += 1; readStripe(); } + } + + /** + * Skip over rows that we aren't selecting, so that the next row is + * one that we will read. + * @param nextRow the row we want to go to + * @throws IOException + */ + private void advanceToNextRow(long nextRow) throws IOException { + long nextRowInStripe = nextRow - rowBaseInStripe; + // check for row skipping + if (rowIndexStride != 0 && + includedRowGroups != null && + nextRowInStripe < rowCountInStripe) { + int rowGroup = (int) (nextRowInStripe / rowIndexStride); + if (!includedRowGroups[rowGroup]) { + while (rowGroup < includedRowGroups.length && + !includedRowGroups[rowGroup]) { + rowGroup += 1; + } + // if we are off the end of the stripe, just move stripes + if (rowGroup >= includedRowGroups.length) { + advanceStripe(); + return; + } + nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride); + } + } + if (nextRowInStripe < rowCountInStripe) { + if (nextRowInStripe != rowInStripe) { + if (rowIndexStride != 0) { + int rowGroup = (int) (nextRowInStripe / rowIndexStride); + seekToRowEntry(rowGroup); + reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride); + } else { + reader.skipRows(nextRowInStripe - rowInStripe); + } + rowInStripe = nextRowInStripe; + } + } else { + advanceStripe(); + } + } + + @Override + public Object next(Object previous) throws IOException { + Object result = reader.next(previous); + // find the next row rowInStripe += 1; - return reader.next(previous); + advanceToNextRow(rowInStripe + rowBaseInStripe); + return result; } @Override @@ -1548,14 +2182,6 @@ class RecordReaderImpl implements Record } private int findStripe(long rowNumber) { - if (rowNumber < 0) { - throw new IllegalArgumentException("Seek to a negative row number " + - rowNumber); - } else if (rowNumber < firstRow) { - throw new IllegalArgumentException("Seek before reader range " + - rowNumber); - } - rowNumber -= firstRow; for(int i=0; i < stripes.size(); i++) { StripeInformation stripe = stripes.get(i); if (stripe.getNumberOfRows() > rowNumber) { @@ -1576,7 +2202,8 @@ class RecordReaderImpl implements Record file.seek(offset); file.readFully(buffer); indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", - ByteBuffer.wrap(buffer), codec, bufferSize)); + new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0}, + stream.getLength(), codec, bufferSize)); } } offset += stream.getLength(); @@ -1596,19 +2223,25 @@ class RecordReaderImpl implements Record @Override public void seekToRow(long rowNumber) throws IOException { + if (rowNumber < 0) { + throw new IllegalArgumentException("Seek to a negative row number " + + rowNumber); + } else if (rowNumber < firstRow) { + throw new IllegalArgumentException("Seek before reader range " + + rowNumber); + } + // convert to our internal form (rows from the beginning of slice) + rowNumber -= firstRow; + + // move to the right stripe int rightStripe = findStripe(rowNumber); if (rightStripe != currentStripe) { currentStripe = rightStripe; readStripe(); } readRowIndex(); - rowInStripe = rowNumber - rowBaseInStripe - firstRow; - if (rowIndexStride != 0) { - long entry = rowInStripe / rowIndexStride; - seekToRowEntry((int) entry); - reader.skipRows(rowInStripe - entry * rowIndexStride); - } else { - reader.skipRows(rowInStripe); - } + + // if we aren't to the right row yet, advanance in the stripe. + advanceToNextRow(rowNumber); } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Thu Aug 15 19:05:35 2013 @@ -57,7 +57,7 @@ class RunLengthByteReader { while (bytes < numLiterals) { int result = input.read(literals, bytes, numLiterals - bytes); if (result == -1) { - throw new EOFException("Reading RLE byte literal got EOF"); + throw new EOFException("Reading RLE byte literal got EOF in " + this); } bytes += result; } @@ -108,4 +108,10 @@ class RunLengthByteReader { items -= consume; } } + + @Override + public String toString() { + return "byte rle " + (repeat ? "repeat" : "literal") + " used: " + + used + "/" + numLiterals + " from " + input; + } } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java Thu Aug 15 19:05:35 2013 @@ -134,6 +134,21 @@ public interface SearchArgument { throw new IllegalArgumentException("Unknown value: " + this); } } + + /** + * Does the RecordReader need to include this set of records? + * @return true unless none of the rows qualify + */ + public boolean isNotNeeded() { + switch (this) { + case NO: + case NULL: + case NO_NULL: + return false; + default: + return true; + } + } } /** Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Thu Aug 15 19:05:35 2013 @@ -36,16 +36,12 @@ import org.apache.hadoop.hive.ql.udf.gen import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Deque; import java.util.HashMap; @@ -57,7 +53,7 @@ import java.util.Map; */ final class SearchArgumentImpl implements SearchArgument { - private static final class PredicateLeafImpl implements PredicateLeaf { + static final class PredicateLeafImpl implements PredicateLeaf { private final Operator operator; private final Type type; private final String columnName; @@ -270,7 +266,6 @@ final class SearchArgumentImpl implement } static class ExpressionBuilder { - private ExpressionTree expression = null; private final List leaves = new ArrayList(); /** @@ -321,11 +316,11 @@ final class SearchArgumentImpl implement private static Object boxLiteral(ExprNodeConstantDesc lit) { switch (getType(lit)) { case INTEGER: - return new LongWritable(((Number) lit.getValue()).longValue()); + return ((Number) lit.getValue()).longValue(); case STRING: - return new Text(lit.getValue().toString()); + return lit.getValue().toString(); case FLOAT: - return new DoubleWritable(((Number) lit.getValue()).doubleValue()); + return ((Number) lit.getValue()).doubleValue(); default: throw new IllegalArgumentException("Unknown literal " + getType(lit)); } Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Aug 15 19:05:35 2013 @@ -309,6 +309,7 @@ public final class ColumnPrunerProcFacto cppCtx.getPrunedColLists().put((Operator) nd, cols); ArrayList needed_columns = new ArrayList(); + List neededColumnNames = new ArrayList(); RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver(); TableScanDesc desc = scanOp.getConf(); List virtualCols = desc.getVirtualCols(); @@ -339,12 +340,15 @@ public final class ColumnPrunerProcFacto } int position = inputRR.getPosition(cols.get(i)); if (position >=0) { + // get the needed columns by id and name needed_columns.add(position); + neededColumnNames.add(cols.get(i)); } } desc.setVirtualCols(newVirtualCols); scanOp.setNeededColumnIDs(needed_columns); + scanOp.setNeededColumns(neededColumnNames); return null; } } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java Thu Aug 15 19:05:35 2013 @@ -47,7 +47,8 @@ public class TestBitFieldReader { ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); - BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf, + BitFieldReader in = new BitFieldReader(InStream.create("test", + new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), codec, 500), 1); for(int i=0; i < COUNT; ++i) { int x = in.next(); @@ -96,7 +97,8 @@ public class TestBitFieldReader { ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); - BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf, + BitFieldReader in = new BitFieldReader(InStream.create("test", + new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), null, 500), 3); for(int i=0; i < COUNT; ++i) { int x = in.next(); @@ -126,7 +128,8 @@ public class TestBitFieldReader { collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); BitFieldReader in = new BitFieldReader(InStream.create - ("test", inBuf, null, 100), 1); + ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(), + null, 100), 1); for(int i=0; i < COUNT; i += 5) { int x = (int) in.next(); if (i < COUNT/2) { Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java Thu Aug 15 19:05:35 2013 @@ -84,7 +84,11 @@ public class TestBitPack { inBuf.flip(); long[] buff = new long[SIZE]; SerializationUtils.readInts(buff, 0, SIZE, fixedWidth, - InStream.create("test", inBuf, null, SIZE)); + InStream.create("test", + new ByteBuffer[]{inBuf}, + new long[]{0}, + inBuf.remaining(), + null, SIZE)); for(int i = 0; i < SIZE; i++) { buff[i] = SerializationUtils.zigzagDecode(buff[i]); } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java Thu Aug 15 19:05:35 2013 @@ -20,6 +20,9 @@ package org.apache.hadoop.hive.ql.io.orc import org.junit.Test; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -40,7 +43,8 @@ public class TestInStream { } } - static class PositionCollector implements PositionProvider, PositionRecorder { + static class PositionCollector + implements PositionProvider, PositionRecorder { private List positions = new ArrayList(); private int index = 0; @@ -53,6 +57,22 @@ public class TestInStream { public void addPosition(long offset) { positions.add(offset); } + + public void reset() { + index = 0; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("position: "); + for(int i=0; i < positions.size(); ++i) { + if (i != 0) { + builder.append(", "); + } + builder.append(positions.get(i)); + } + return builder.toString(); + } } @Test @@ -73,9 +93,11 @@ public class TestInStream { ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); - InStream in = InStream.create("test", inBuf, null, 100); - assertEquals("uncompressed stream test base: 0 offset: 0 limit: 1024", - in.toString()); + InStream in = InStream.create("test", new ByteBuffer[]{inBuf}, + new long[]{0}, inBuf.remaining(), null, 100); + assertEquals("uncompressed stream test position: 0 length: 1024" + + " range: 0 offset: 0 limit: 0", + in.toString()); for(int i=0; i < 1024; ++i) { int x = in.read(); assertEquals(i & 0xff, x); @@ -103,9 +125,11 @@ public class TestInStream { ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); - InStream in = InStream.create("test", inBuf, codec, 300); - assertEquals("compressed stream test base: 0 offset: 0 limit: 961", - in.toString()); + InStream in = InStream.create("test", new ByteBuffer[]{inBuf}, + new long[]{0}, inBuf.remaining(), codec, 300); + assertEquals("compressed stream test position: 0 length: 961 range: 0" + + " offset: 0 limit: 0 range 0 = 0 to 961", + in.toString()); for(int i=0; i < 1024; ++i) { int x = in.read(); assertEquals(i & 0xff, x); @@ -134,7 +158,8 @@ public class TestInStream { ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); - InStream in = InStream.create("test", inBuf, codec, 100); + InStream in = InStream.create("test", new ByteBuffer[]{inBuf}, + new long[]{0}, inBuf.remaining(), codec, 100); byte[] contents = new byte[1024]; try { in.read(contents); @@ -148,7 +173,8 @@ public class TestInStream { inBuf.put((byte) 32); inBuf.put((byte) 0); inBuf.flip(); - in = InStream.create("test2", inBuf, codec, 300); + in = InStream.create("test2", new ByteBuffer[]{inBuf}, new long[]{0}, + inBuf.remaining(), codec, 300); try { in.read(); fail(); @@ -156,4 +182,132 @@ public class TestInStream { // EXPECTED } } + + @Test + public void testDisjointBuffers() throws Exception { + OutputCollector collect = new OutputCollector(); + CompressionCodec codec = new ZlibCodec(); + OutStream out = new OutStream("test", 400, codec, collect); + PositionCollector[] positions = new PositionCollector[1024]; + DataOutput stream = new DataOutputStream(out); + for(int i=0; i < 1024; ++i) { + positions[i] = new PositionCollector(); + out.getPosition(positions[i]); + stream.writeInt(i); + } + out.flush(); + assertEquals("test", out.toString()); + assertEquals(1674, collect.buffer.size()); + ByteBuffer[] inBuf = new ByteBuffer[3]; + inBuf[0] = ByteBuffer.allocate(500); + inBuf[1] = ByteBuffer.allocate(1200); + inBuf[2] = ByteBuffer.allocate(500); + collect.buffer.setByteBuffer(inBuf[0], 0, 483); + collect.buffer.setByteBuffer(inBuf[1], 483, 1625 - 483); + collect.buffer.setByteBuffer(inBuf[2], 1625, 1674 - 1625); + + for(int i=0; i < inBuf.length; ++i) { + inBuf[i].flip(); + } + InStream in = InStream.create("test", inBuf, + new long[]{0,483, 1625}, 1674, codec, 400); + assertEquals("compressed stream test position: 0 length: 1674 range: 0" + + " offset: 0 limit: 0 range 0 = 0 to 483;" + + " range 1 = 483 to 1142; range 2 = 1625 to 49", + in.toString()); + DataInputStream inStream = new DataInputStream(in); + for(int i=0; i < 1024; ++i) { + int x = inStream.readInt(); + assertEquals(i, x); + } + assertEquals(0, in.available()); + for(int i=1023; i >= 0; --i) { + in.seek(positions[i]); + assertEquals(i, inStream.readInt()); + } + + in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]}, + new long[]{483, 1625}, 1674, codec, 400); + inStream = new DataInputStream(in); + positions[303].reset(); + in.seek(positions[303]); + for(int i=303; i < 1024; ++i) { + assertEquals(i, inStream.readInt()); + } + + in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]}, + new long[]{0, 1625}, 1674, codec, 400); + inStream = new DataInputStream(in); + positions[1001].reset(); + for(int i=0; i < 300; ++i) { + assertEquals(i, inStream.readInt()); + } + in.seek(positions[1001]); + for(int i=1001; i < 1024; ++i) { + assertEquals(i, inStream.readInt()); + } + } + + @Test + public void testUncompressedDisjointBuffers() throws Exception { + OutputCollector collect = new OutputCollector(); + OutStream out = new OutStream("test", 400, null, collect); + PositionCollector[] positions = new PositionCollector[1024]; + DataOutput stream = new DataOutputStream(out); + for(int i=0; i < 1024; ++i) { + positions[i] = new PositionCollector(); + out.getPosition(positions[i]); + stream.writeInt(i); + } + out.flush(); + assertEquals("test", out.toString()); + assertEquals(4096, collect.buffer.size()); + ByteBuffer[] inBuf = new ByteBuffer[3]; + inBuf[0] = ByteBuffer.allocate(1100); + inBuf[1] = ByteBuffer.allocate(2200); + inBuf[2] = ByteBuffer.allocate(1100); + collect.buffer.setByteBuffer(inBuf[0], 0, 1024); + collect.buffer.setByteBuffer(inBuf[1], 1024, 2048); + collect.buffer.setByteBuffer(inBuf[2], 3072, 1024); + + for(int i=0; i < inBuf.length; ++i) { + inBuf[i].flip(); + } + InStream in = InStream.create("test", inBuf, + new long[]{0, 1024, 3072}, 4096, null, 400); + assertEquals("uncompressed stream test position: 0 length: 4096" + + " range: 0 offset: 0 limit: 0", + in.toString()); + DataInputStream inStream = new DataInputStream(in); + for(int i=0; i < 1024; ++i) { + int x = inStream.readInt(); + assertEquals(i, x); + } + assertEquals(0, in.available()); + for(int i=1023; i >= 0; --i) { + in.seek(positions[i]); + assertEquals(i, inStream.readInt()); + } + + in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]}, + new long[]{1024, 3072}, 4096, null, 400); + inStream = new DataInputStream(in); + positions[256].reset(); + in.seek(positions[256]); + for(int i=256; i < 1024; ++i) { + assertEquals(i, inStream.readInt()); + } + + in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]}, + new long[]{0, 3072}, 4096, null, 400); + inStream = new DataInputStream(in); + positions[768].reset(); + for(int i=0; i < 256; ++i) { + assertEquals(i, inStream.readInt()); + } + in.seek(positions[768]); + for(int i=768; i < 1024; ++i) { + assertEquals(i, inStream.readInt()); + } + } } Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff ============================================================================== --- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java (original) +++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java Thu Aug 15 19:05:35 2013 @@ -53,8 +53,11 @@ public class TestIntegerCompressionReade ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); - RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create - ("test", inBuf, codec, 1000), true); + RunLengthIntegerReaderV2 in = + new RunLengthIntegerReaderV2(InStream.create + ("test", new ByteBuffer[]{inBuf}, + new long[]{0}, inBuf.remaining(), + codec, 1000), true); for(int i=0; i < 2048; ++i) { int x = (int) in.next(); if (i < 1024) { @@ -104,8 +107,12 @@ public class TestIntegerCompressionReade ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size()); collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size()); inBuf.flip(); - RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create - ("test", inBuf, null, 100), true); + RunLengthIntegerReaderV2 in = + new RunLengthIntegerReaderV2(InStream.create("test", + new ByteBuffer[]{inBuf}, + new long[]{0}, + inBuf.remaining(), + null, 100), true); for(int i=0; i < 2048; i += 10) { int x = (int) in.next(); if (i < 1024) {