Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4590A10113 for ; Mon, 12 Aug 2013 15:04:15 +0000 (UTC) Received: (qmail 15595 invoked by uid 500); 12 Aug 2013 15:04:14 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 15541 invoked by uid 500); 12 Aug 2013 15:04:14 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 15168 invoked by uid 99); 12 Aug 2013 15:04:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Aug 2013 15:04:08 +0000 X-ASF-Spam-Status: No, hits=-1999.0 required=5.0 tests=ALL_TRUSTED,FRT_ADOBE2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Aug 2013 15:03:55 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 32FA32388860; Mon, 12 Aug 2013 15:03:32 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1513155 [1/3] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/ ql/... Date: Mon, 12 Aug 2013 15:03:31 -0000 To: commits@hive.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130812150332.32FA32388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Mon Aug 12 15:03:30 2013 New Revision: 1513155 URL: http://svn.apache.org/r1513155 Log: HIVE-4123 Improved ORC integer RLE version 2. (Prasanth Jayachandran via omalley) Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestNewIntegerEncoding.java Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java hive/trunk/ql/src/protobuf/org/apache/hadoop/hive/ql/io/orc/orc_proto.proto hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcNullOptimization.java hive/trunk/ql/src/test/resources/orc-file-dump-dictionary-threshold.out hive/trunk/ql/src/test/resources/orc-file-dump.out Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1513155&r1=1513154&r2=1513155&view=diff ============================================================================== --- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Aug 12 15:03:30 2013 @@ -502,6 +502,9 @@ public class HiveConf extends Configurat // Maximum fraction of heap that can be used by ORC file writers HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f), // 50% + // use 0.11 version of RLE encoding. if this conf is not defined or any + // other value specified, ORC will use the new RLE encoding + HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", "0.11"), HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f), Modified: hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java?rev=1513155&r1=1513154&r2=1513155&view=diff ============================================================================== --- hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java (original) +++ hive/trunk/ql/src/gen/protobuf/gen-java/org/apache/hadoop/hive/ql/io/orc/OrcProto.java Mon Aug 12 15:03:30 2013 @@ -5695,10 +5695,14 @@ public final class OrcProto { implements com.google.protobuf.ProtocolMessageEnum { DIRECT(0, 0), DICTIONARY(1, 1), + DIRECT_V2(2, 2), + DICTIONARY_V2(3, 3), ; public static final int DIRECT_VALUE = 0; public static final int DICTIONARY_VALUE = 1; + public static final int DIRECT_V2_VALUE = 2; + public static final int DICTIONARY_V2_VALUE = 3; public final int getNumber() { return value; } @@ -5707,6 +5711,8 @@ public final class OrcProto { switch (value) { case 0: return DIRECT; case 1: return DICTIONARY; + case 2: return DIRECT_V2; + case 3: return DICTIONARY_V2; default: return null; } } @@ -5737,7 +5743,7 @@ public final class OrcProto { } private static final Kind[] VALUES = { - DIRECT, DICTIONARY, + DIRECT, DICTIONARY, DIRECT_V2, DICTIONARY_V2, }; public static Kind valueOf( @@ -11117,42 +11123,42 @@ public final class OrcProto { "eam.Kind\022\016\n\006column\030\002 \001(\r\022\016\n\006length\030\003 \001(\004", "\"r\n\004Kind\022\013\n\007PRESENT\020\000\022\010\n\004DATA\020\001\022\n\n\006LENGT" + "H\020\002\022\023\n\017DICTIONARY_DATA\020\003\022\024\n\020DICTIONARY_C" + - "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\221\001\n" + + "OUNT\020\004\022\r\n\tSECONDARY\020\005\022\r\n\tROW_INDEX\020\006\"\263\001\n" + "\016ColumnEncoding\022C\n\004kind\030\001 \002(\01625.org.apac" + "he.hadoop.hive.ql.io.orc.ColumnEncoding." + - "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"\"\n\004Kind\022\n\n\006" + - "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\"\214\001\n\014StripeFoote" + - "r\0229\n\007streams\030\001 \003(\0132(.org.apache.hadoop.h" + - "ive.ql.io.orc.Stream\022A\n\007columns\030\002 \003(\01320." + - "org.apache.hadoop.hive.ql.io.orc.ColumnE", - "ncoding\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apa" + - "che.hadoop.hive.ql.io.orc.Type.Kind\022\024\n\010s" + - "ubtypes\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001" + - "\n\004Kind\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002" + - "\022\007\n\003INT\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE" + - "\020\006\022\n\n\006STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020" + - "\t\022\010\n\004LIST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNIO" + - "N\020\r\022\013\n\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInfo" + - "rmation\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002" + - " \001(\004\022\022\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength", - "\030\004 \001(\004\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMeta" + - "dataItem\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002" + - "\n\006Footer\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rconten" + - "tLength\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apa" + - "che.hadoop.hive.ql.io.orc.StripeInformat" + - "ion\0225\n\005types\030\004 \003(\0132&.org.apache.hadoop.h" + - "ive.ql.io.orc.Type\022D\n\010metadata\030\005 \003(\01322.o" + - "rg.apache.hadoop.hive.ql.io.orc.UserMeta" + - "dataItem\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatis" + - "tics\030\007 \003(\01322.org.apache.hadoop.hive.ql.i", - "o.orc.ColumnStatistics\022\026\n\016rowIndexStride" + - "\030\010 \001(\r\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 " + - "\001(\004\022F\n\013compression\030\002 \001(\01621.org.apache.ha" + - "doop.hive.ql.io.orc.CompressionKind\022\034\n\024c" + - "ompressionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003" + - "(\rB\002\020\001\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKin" + - "d\022\010\n\004NONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO" + - "\020\003" + "Kind\022\026\n\016dictionarySize\030\002 \001(\r\"D\n\004Kind\022\n\n\006" + + "DIRECT\020\000\022\016\n\nDICTIONARY\020\001\022\r\n\tDIRECT_V2\020\002\022" + + "\021\n\rDICTIONARY_V2\020\003\"\214\001\n\014StripeFooter\0229\n\007s" + + "treams\030\001 \003(\0132(.org.apache.hadoop.hive.ql" + + ".io.orc.Stream\022A\n\007columns\030\002 \003(\01320.org.ap", + "ache.hadoop.hive.ql.io.orc.ColumnEncodin" + + "g\"\250\002\n\004Type\0229\n\004kind\030\001 \002(\0162+.org.apache.ha" + + "doop.hive.ql.io.orc.Type.Kind\022\024\n\010subtype" + + "s\030\002 \003(\rB\002\020\001\022\022\n\nfieldNames\030\003 \003(\t\"\272\001\n\004Kind" + + "\022\013\n\007BOOLEAN\020\000\022\010\n\004BYTE\020\001\022\t\n\005SHORT\020\002\022\007\n\003IN" + + "T\020\003\022\010\n\004LONG\020\004\022\t\n\005FLOAT\020\005\022\n\n\006DOUBLE\020\006\022\n\n\006" + + "STRING\020\007\022\n\n\006BINARY\020\010\022\r\n\tTIMESTAMP\020\t\022\010\n\004L" + + "IST\020\n\022\007\n\003MAP\020\013\022\n\n\006STRUCT\020\014\022\t\n\005UNION\020\r\022\013\n" + + "\007DECIMAL\020\016\022\010\n\004DATE\020\017\"x\n\021StripeInformatio" + + "n\022\016\n\006offset\030\001 \001(\004\022\023\n\013indexLength\030\002 \001(\004\022\022", + "\n\ndataLength\030\003 \001(\004\022\024\n\014footerLength\030\004 \001(\004" + + "\022\024\n\014numberOfRows\030\005 \001(\004\"/\n\020UserMetadataIt" + + "em\022\014\n\004name\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"\356\002\n\006Foot" + + "er\022\024\n\014headerLength\030\001 \001(\004\022\025\n\rcontentLengt" + + "h\030\002 \001(\004\022D\n\007stripes\030\003 \003(\01323.org.apache.ha" + + "doop.hive.ql.io.orc.StripeInformation\0225\n" + + "\005types\030\004 \003(\0132&.org.apache.hadoop.hive.ql" + + ".io.orc.Type\022D\n\010metadata\030\005 \003(\01322.org.apa" + + "che.hadoop.hive.ql.io.orc.UserMetadataIt" + + "em\022\024\n\014numberOfRows\030\006 \001(\004\022F\n\nstatistics\030\007", + " \003(\01322.org.apache.hadoop.hive.ql.io.orc." + + "ColumnStatistics\022\026\n\016rowIndexStride\030\010 \001(\r" + + "\"\255\001\n\nPostScript\022\024\n\014footerLength\030\001 \001(\004\022F\n" + + "\013compression\030\002 \001(\01621.org.apache.hadoop.h" + + "ive.ql.io.orc.CompressionKind\022\034\n\024compres" + + "sionBlockSize\030\003 \001(\004\022\023\n\007version\030\004 \003(\rB\002\020\001" + + "\022\016\n\005magic\030\300> \001(\t*:\n\017CompressionKind\022\010\n\004N" + + "ONE\020\000\022\010\n\004ZLIB\020\001\022\n\n\006SNAPPY\020\002\022\007\n\003LZO\020\003" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1513155&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Mon Aug 12 15:03:30 2013 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for reading integers. + */ +interface IntegerReader { + + /** + * Seek to the position provided by index. + * @param index + * @throws IOException + */ + void seek(PositionProvider index) throws IOException; + + /** + * Skip number of specified rows. + * @param numValues + * @throws IOException + */ + void skip(long numValues) throws IOException; + + /** + * Check if there are any more values left. + * @return + * @throws IOException + */ + boolean hasNext() throws IOException; + + /** + * Return the next available value. + * @return + * @throws IOException + */ + long next() throws IOException; +} Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java?rev=1513155&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerWriter.java Mon Aug 12 15:03:30 2013 @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; + +/** + * Interface for writing integers. + */ +interface IntegerWriter { + + /** + * Get position from the stream. + * @param recorder + * @throws IOException + */ + void getPosition(PositionRecorder recorder) throws IOException; + + /** + * Write the integer value + * @param value + * @throws IOException + */ + void write(long value) throws IOException; + + /** + * Flush the buffer + * @throws IOException + */ + void flush() throws IOException; +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1513155&r1=1513154&r2=1513155&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Aug 12 15:03:30 2013 @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSDataInputS import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; @@ -130,6 +131,21 @@ class RecordReaderImpl implements Record } } + IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind, + InStream in, + boolean signed) throws IOException { + switch (kind) { + case DIRECT_V2: + case DICTIONARY_V2: + return new RunLengthIntegerReaderV2(in, signed); + case DIRECT: + case DICTIONARY: + return new RunLengthIntegerReader(in, signed); + default: + throw new IllegalArgumentException("Unknown encoding " + kind); + } + } + void startStripe(Map streams, List encoding ) throws IOException { @@ -266,20 +282,29 @@ class RecordReaderImpl implements Record } private static class ShortTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; ShortTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -310,20 +335,29 @@ class RecordReaderImpl implements Record } private static class IntTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; IntTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -354,20 +388,29 @@ class RecordReaderImpl implements Record } private static class LongTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; LongTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -492,13 +535,22 @@ class RecordReaderImpl implements Record private static class BinaryTreeReader extends TreeReader{ private InStream stream; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; BinaryTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { @@ -506,9 +558,8 @@ class RecordReaderImpl implements Record StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); - lengths = new RunLengthIntegerReader(streams.get(new - StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), - false); + lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false); } @Override @@ -555,22 +606,33 @@ class RecordReaderImpl implements Record } private static class TimestampTreeReader extends TreeReader{ - private RunLengthIntegerReader data; - private RunLengthIntegerReader nanos; + private IntegerReader data = null; + private IntegerReader nanos = null; TimestampTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.DATA)), true); - nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.SECONDARY)), false); + data = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); } @Override @@ -625,20 +687,29 @@ class RecordReaderImpl implements Record } private static class DateTreeReader extends TreeReader{ - private RunLengthIntegerReader reader = null; + private IntegerReader reader = null; DateTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), true); + reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true); } @Override @@ -670,20 +741,29 @@ class RecordReaderImpl implements Record private static class DecimalTreeReader extends TreeReader{ private InStream valueStream; - private RunLengthIntegerReader scaleStream; + private IntegerReader scaleStream = null; DecimalTreeReader(Path path, int columnId) { super(path, columnId); } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); valueStream = streams.get(new StreamName(columnId, OrcProto.Stream.Kind.DATA)); - scaleStream = new RunLengthIntegerReader(streams.get( + scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get( new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); } @@ -726,12 +806,9 @@ class RecordReaderImpl implements Record super(path, columnId); } + @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY && - encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { - throw new IOException("Unknown encoding " + encoding + " in column " + - columnId + " of " + path); - } + reader.checkEncoding(encoding); } @Override @@ -742,9 +819,11 @@ class RecordReaderImpl implements Record // reader switch (encodings.get(columnId).getKind()) { case DIRECT: + case DIRECT_V2: reader = new StringDirectTreeReader(path, columnId); break; case DICTIONARY: + case DICTIONARY_V2: reader = new StringDictionaryTreeReader(path, columnId); break; default: @@ -776,7 +855,7 @@ class RecordReaderImpl implements Record */ private static class StringDirectTreeReader extends TreeReader { private InStream stream; - private RunLengthIntegerReader lengths; + private IntegerReader lengths; StringDirectTreeReader(Path path, int columnId) { super(path, columnId); @@ -784,7 +863,11 @@ class RecordReaderImpl implements Record @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - // PASS + if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT && + encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } } @Override @@ -795,8 +878,8 @@ class RecordReaderImpl implements Record StreamName name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); stream = streams.get(name); - lengths = new RunLengthIntegerReader(streams.get(new - StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + lengths = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false); } @@ -851,7 +934,7 @@ class RecordReaderImpl implements Record private static class StringDictionaryTreeReader extends TreeReader { private DynamicByteArray dictionaryBuffer; private int[] dictionaryOffsets; - private RunLengthIntegerReader reader; + private IntegerReader reader; StringDictionaryTreeReader(Path path, int columnId) { super(path, columnId); @@ -859,7 +942,11 @@ class RecordReaderImpl implements Record @Override void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { - // PASS + if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY && + encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } } @Override @@ -884,7 +971,8 @@ class RecordReaderImpl implements Record // read the lengths name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); in = streams.get(name); - RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false); + IntegerReader lenReader = createIntegerReader(encodings.get(columnId) + .getKind(), in, false); int offset = 0; if (dictionaryOffsets == null || dictionaryOffsets.length < dictionarySize + 1) { @@ -899,7 +987,8 @@ class RecordReaderImpl implements Record // set up the row reader name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); - reader = new RunLengthIntegerReader(streams.get(name), false); + reader = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(name), false); } @Override @@ -1101,7 +1190,7 @@ class RecordReaderImpl implements Record private static class ListTreeReader extends TreeReader { private final TreeReader elementReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; ListTreeReader(Path path, int columnId, List types, @@ -1150,12 +1239,22 @@ class RecordReaderImpl implements Record } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + lengths = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); if (elementReader != null) { elementReader.startStripe(streams, encodings); } @@ -1175,7 +1274,7 @@ class RecordReaderImpl implements Record private static class MapTreeReader extends TreeReader { private final TreeReader keyReader; private final TreeReader valueReader; - private RunLengthIntegerReader lengths; + private IntegerReader lengths = null; MapTreeReader(Path path, int columnId, @@ -1228,12 +1327,22 @@ class RecordReaderImpl implements Record } @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) && + (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override void startStripe(Map streams, List encodings ) throws IOException { super.startStripe(streams, encodings); - lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, - OrcProto.Stream.Kind.LENGTH)), false); + lengths = createIntegerReader(encodings.get(columnId).getKind(), + streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); if (keyReader != null) { keyReader.startStripe(streams, encodings); } Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig?rev=1513155&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java.orig Mon Aug 12 15:03:30 2013 @@ -0,0 +1,1505 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +class RecordReaderImpl implements RecordReader { + private final FSDataInputStream file; + private final long firstRow; + private final List stripes = + new ArrayList(); + private OrcProto.StripeFooter stripeFooter; + private final long totalRowCount; + private final CompressionCodec codec; + private final int bufferSize; + private final boolean[] included; + private final long rowIndexStride; + private long rowInStripe = 0; + private int currentStripe = 0; + private long rowBaseInStripe = 0; + private long rowCountInStripe = 0; + private final Map streams = + new HashMap(); + private final TreeReader reader; + private final OrcProto.RowIndex[] indexes; + + RecordReaderImpl(Iterable stripes, + FileSystem fileSystem, + Path path, + long offset, long length, + List types, + CompressionCodec codec, + int bufferSize, + boolean[] included, + long strideRate + ) throws IOException { + this.file = fileSystem.open(path); + this.codec = codec; + this.bufferSize = bufferSize; + this.included = included; + long rows = 0; + long skippedRows = 0; + for(StripeInformation stripe: stripes) { + long stripeStart = stripe.getOffset(); + if (offset > stripeStart) { + skippedRows += stripe.getNumberOfRows(); + } else if (stripeStart < offset + length) { + this.stripes.add(stripe); + rows += stripe.getNumberOfRows(); + } + } + firstRow = skippedRows; + totalRowCount = rows; + reader = createTreeReader(path, 0, types, included); + indexes = new OrcProto.RowIndex[types.size()]; + rowIndexStride = strideRate; + if (this.stripes.size() > 0) { + readStripe(); + } + } + + private static final class PositionProviderImpl implements PositionProvider { + private final OrcProto.RowIndexEntry entry; + private int index = 0; + + PositionProviderImpl(OrcProto.RowIndexEntry entry) { + this.entry = entry; + } + + @Override + public long getNext() { + return entry.getPositions(index++); + } + } + + private abstract static class TreeReader { + protected final Path path; + protected final int columnId; + private BitFieldReader present = null; + protected boolean valuePresent = false; + + TreeReader(Path path, int columnId) { + this.path = path; + this.columnId = columnId; + } + + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + void startStripe(Map streams, + List encoding + ) throws IOException { + checkEncoding(encoding.get(columnId)); + InStream in = streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.PRESENT)); + if (in == null) { + present = null; + valuePresent = true; + } else { + present = new BitFieldReader(in, 1); + } + } + + /** + * Seek to the given position. + * @param index the indexes loaded from the file + * @throws IOException + */ + void seek(PositionProvider[] index) throws IOException { + if (present != null) { + present.seek(index[columnId]); + } + } + + protected long countNonNulls(long rows) throws IOException { + if (present != null) { + long result = 0; + for(long c=0; c < rows; ++c) { + if (present.next() == 1) { + result += 1; + } + } + return result; + } else { + return rows; + } + } + + abstract void skipRows(long rows) throws IOException; + + Object next(Object previous) throws IOException { + if (present != null) { + valuePresent = present.next() == 1; + } + return previous; + } + } + + private static class BooleanTreeReader extends TreeReader{ + private BitFieldReader reader = null; + + BooleanTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + reader = new BitFieldReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), 1); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + reader.seek(index[columnId]); + } + + @Override + void skipRows(long items) throws IOException { + reader.skip(countNonNulls(items)); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + BooleanWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new BooleanWritable(); + } else { + result = (BooleanWritable) previous; + } + result.set(reader.next() == 1); + } + return result; + } + } + + private static class ByteTreeReader extends TreeReader{ + private RunLengthByteReader reader = null; + + ByteTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + reader = new RunLengthByteReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA))); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + reader.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + ByteWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new ByteWritable(); + } else { + result = (ByteWritable) previous; + } + result.set(reader.next()); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + reader.skip(countNonNulls(items)); + } + } + + private static class ShortTreeReader extends TreeReader{ + private RunLengthIntegerReader reader = null; + + ShortTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + reader = new RunLengthIntegerReader(streams.get(name), true); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + reader.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + ShortWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new ShortWritable(); + } else { + result = (ShortWritable) previous; + } + result.set((short) reader.next()); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + reader.skip(countNonNulls(items)); + } + } + + private static class IntTreeReader extends TreeReader{ + private RunLengthIntegerReader reader = null; + + IntTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + reader = new RunLengthIntegerReader(streams.get(name), true); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + reader.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + IntWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new IntWritable(); + } else { + result = (IntWritable) previous; + } + result.set((int) reader.next()); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + reader.skip(countNonNulls(items)); + } + } + + private static class LongTreeReader extends TreeReader{ + private RunLengthIntegerReader reader = null; + + LongTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + reader = new RunLengthIntegerReader(streams.get(name), true); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + reader.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + LongWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new LongWritable(); + } else { + result = (LongWritable) previous; + } + result.set(reader.next()); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + reader.skip(countNonNulls(items)); + } + } + + private static class FloatTreeReader extends TreeReader{ + private InStream stream; + + FloatTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + stream = streams.get(name); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + stream.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + FloatWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new FloatWritable(); + } else { + result = (FloatWritable) previous; + } + result.set(SerializationUtils.readFloat(stream)); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + for(int i=0; i < items; ++i) { + SerializationUtils.readFloat(stream); + } + } + } + + private static class DoubleTreeReader extends TreeReader{ + private InStream stream; + + DoubleTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = + new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + stream = streams.get(name); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + stream.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + DoubleWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new DoubleWritable(); + } else { + result = (DoubleWritable) previous; + } + result.set(SerializationUtils.readDouble(stream)); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + stream.skip(items * 8); + } + } + + private static class BinaryTreeReader extends TreeReader{ + private InStream stream; + private RunLengthIntegerReader lengths; + + BinaryTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + stream = streams.get(name); + lengths = new RunLengthIntegerReader(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + stream.seek(index[columnId]); + lengths.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + BytesWritable result = null; + if (valuePresent) { + if (previous == null) { + result = new BytesWritable(); + } else { + result = (BytesWritable) previous; + } + int len = (int) lengths.next(); + result.setSize(len); + int offset = 0; + while (len > 0) { + int written = stream.read(result.getBytes(), offset, len); + if (written < 0) { + throw new EOFException("Can't finish byte read from " + stream); + } + len -= written; + offset += written; + } + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + long lengthToSkip = 0; + for(int i=0; i < items; ++i) { + lengthToSkip += lengths.next(); + } + stream.skip(lengthToSkip); + } + } + + private static class TimestampTreeReader extends TreeReader{ + private RunLengthIntegerReader data; + private RunLengthIntegerReader nanos; + + TimestampTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)), true); + nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.SECONDARY)), false); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + data.seek(index[columnId]); + nanos.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + Timestamp result = null; + if (valuePresent) { + if (previous == null) { + result = new Timestamp(0); + } else { + result = (Timestamp) previous; + } + long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * + WriterImpl.MILLIS_PER_SECOND; + int newNanos = parseNanos(nanos.next()); + // fix the rounding when we divided by 1000. + if (millis >= 0) { + millis += newNanos / 1000000; + } else { + millis -= newNanos / 1000000; + } + result.setTime(millis); + result.setNanos(newNanos); + } + return result; + } + + private static int parseNanos(long serialized) { + int zeros = 7 & (int) serialized; + int result = (int) serialized >>> 3; + if (zeros != 0) { + for(int i =0; i <= zeros; ++i) { + result *= 10; + } + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + data.skip(items); + nanos.skip(items); + } + } + + private static class DateTreeReader extends TreeReader{ + private RunLengthIntegerReader reader = null; + + DateTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + reader = new RunLengthIntegerReader(streams.get(name), true); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + reader.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + Date result = null; + if (valuePresent) { + if (previous == null) { + result = new Date(0); + } else { + result = (Date) previous; + } + result.setTime(DateWritable.daysToMillis((int) reader.next())); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + reader.skip(countNonNulls(items)); + } + } + + private static class DecimalTreeReader extends TreeReader{ + private InStream valueStream; + private RunLengthIntegerReader scaleStream; + + DecimalTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + valueStream = streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA)); + scaleStream = new RunLengthIntegerReader(streams.get( + new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + valueStream.seek(index[columnId]); + scaleStream.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + if (valuePresent) { + return new HiveDecimal(SerializationUtils.readBigInteger(valueStream), + (int) scaleStream.next()); + } + return null; + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + for(int i=0; i < items; i++) { + SerializationUtils.readBigInteger(valueStream); + } + scaleStream.skip(items); + } + } + + /** + * A tree reader that will read string columns. At the start of the + * stripe, it creates an internal reader based on whether a direct or + * dictionary encoding was used. + */ + private static class StringTreeReader extends TreeReader { + private TreeReader reader; + + StringTreeReader(Path path, int columnId) { + super(path, columnId); + } + + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY && + encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { + throw new IOException("Unknown encoding " + encoding + " in column " + + columnId + " of " + path); + } + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + // For each stripe, checks the encoding and initializes the appropriate + // reader + switch (encodings.get(columnId).getKind()) { + case DIRECT: + reader = new StringDirectTreeReader(path, columnId); + break; + case DICTIONARY: + reader = new StringDictionaryTreeReader(path, columnId); + break; + default: + throw new IllegalArgumentException("Unsupported encoding " + + encodings.get(columnId).getKind()); + } + reader.startStripe(streams, encodings); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + reader.seek(index); + } + + @Override + Object next(Object previous) throws IOException { + return reader.next(previous); + } + + @Override + void skipRows(long items) throws IOException { + reader.skipRows(items); + } + } + + /** + * A reader for string columns that are direct encoded in the current + * stripe. + */ + private static class StringDirectTreeReader extends TreeReader { + private InStream stream; + private RunLengthIntegerReader lengths; + + StringDirectTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + // PASS + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DATA); + stream = streams.get(name); + lengths = new RunLengthIntegerReader(streams.get(new + StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), + false); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + stream.seek(index[columnId]); + lengths.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + Text result = null; + if (valuePresent) { + if (previous == null) { + result = new Text(); + } else { + result = (Text) previous; + } + int len = (int) lengths.next(); + int offset = 0; + byte[] bytes = new byte[len]; + while (len > 0) { + int written = stream.read(bytes, offset, len); + if (written < 0) { + throw new EOFException("Can't finish byte read from " + stream); + } + len -= written; + offset += written; + } + result.set(bytes); + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + long lengthToSkip = 0; + for(int i=0; i < items; ++i) { + lengthToSkip += lengths.next(); + } + stream.skip(lengthToSkip); + } + } + + /** + * A reader for string columns that are dictionary encoded in the current + * stripe. + */ + private static class StringDictionaryTreeReader extends TreeReader { + private DynamicByteArray dictionaryBuffer; + private int[] dictionaryOffsets; + private RunLengthIntegerReader reader; + + StringDictionaryTreeReader(Path path, int columnId) { + super(path, columnId); + } + + @Override + void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { + // PASS + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + + // read the dictionary blob + int dictionarySize = encodings.get(columnId).getDictionarySize(); + StreamName name = new StreamName(columnId, + OrcProto.Stream.Kind.DICTIONARY_DATA); + InStream in = streams.get(name); + if (in.available() > 0) { + dictionaryBuffer = new DynamicByteArray(64, in.available()); + dictionaryBuffer.readAll(in); + } else { + dictionaryBuffer = null; + } + in.close(); + + // read the lengths + name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); + in = streams.get(name); + RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false); + int offset = 0; + if (dictionaryOffsets == null || + dictionaryOffsets.length < dictionarySize + 1) { + dictionaryOffsets = new int[dictionarySize + 1]; + } + for(int i=0; i < dictionarySize; ++i) { + dictionaryOffsets[i] = offset; + offset += (int) lenReader.next(); + } + dictionaryOffsets[dictionarySize] = offset; + in.close(); + + // set up the row reader + name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); + reader = new RunLengthIntegerReader(streams.get(name), false); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + reader.seek(index[columnId]); + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + Text result = null; + if (valuePresent) { + int entry = (int) reader.next(); + if (previous == null) { + result = new Text(); + } else { + result = (Text) previous; + } + int offset = dictionaryOffsets[entry]; + int length; + // if it isn't the last entry, subtract the offsets otherwise use + // the buffer length. + if (entry < dictionaryOffsets.length - 1) { + length = dictionaryOffsets[entry + 1] - offset; + } else { + length = dictionaryBuffer.size() - offset; + } + // If the column is just empty strings, the size will be zero, + // so the buffer will be null, in that case just return result + // as it will default to empty + if (dictionaryBuffer != null) { + dictionaryBuffer.setText(result, offset, length); + } else { + result.clear(); + } + } + return result; + } + + @Override + void skipRows(long items) throws IOException { + reader.skip(countNonNulls(items)); + } + } + + private static class StructTreeReader extends TreeReader { + private final TreeReader[] fields; + private final String[] fieldNames; + + StructTreeReader(Path path, int columnId, + List types, + boolean[] included) throws IOException { + super(path, columnId); + OrcProto.Type type = types.get(columnId); + int fieldCount = type.getFieldNamesCount(); + this.fields = new TreeReader[fieldCount]; + this.fieldNames = new String[fieldCount]; + for(int i=0; i < fieldCount; ++i) { + int subtype = type.getSubtypes(i); + if (included == null || included[subtype]) { + this.fields[i] = createTreeReader(path, subtype, types, included); + } + this.fieldNames[i] = type.getFieldNames(i); + } + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + for(TreeReader kid: fields) { + if (kid != null) { + kid.seek(index); + } + } + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + OrcStruct result = null; + if (valuePresent) { + if (previous == null) { + result = new OrcStruct(fields.length); + } else { + result = (OrcStruct) previous; + + // If the input format was initialized with a file with a + // different number of fields, the number of fields needs to + // be updated to the correct number + if (result.getNumFields() != fields.length) { + result.setNumFields(fields.length); + } + } + for(int i=0; i < fields.length; ++i) { + if (fields[i] != null) { + result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); + } + } + } + return result; + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + for(TreeReader field: fields) { + if (field != null) { + field.startStripe(streams, encodings); + } + } + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + for(TreeReader field: fields) { + if (field != null) { + field.skipRows(items); + } + } + } + } + + private static class UnionTreeReader extends TreeReader { + private final TreeReader[] fields; + private RunLengthByteReader tags; + + UnionTreeReader(Path path, int columnId, + List types, + boolean[] included) throws IOException { + super(path, columnId); + OrcProto.Type type = types.get(columnId); + int fieldCount = type.getSubtypesCount(); + this.fields = new TreeReader[fieldCount]; + for(int i=0; i < fieldCount; ++i) { + int subtype = type.getSubtypes(i); + if (included == null || included[subtype]) { + this.fields[i] = createTreeReader(path, subtype, types, included); + } + } + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + tags.seek(index[columnId]); + for(TreeReader kid: fields) { + kid.seek(index); + } + } + + @Override + Object next(Object previous) throws IOException { + super.next(previous); + OrcUnion result = null; + if (valuePresent) { + if (previous == null) { + result = new OrcUnion(); + } else { + result = (OrcUnion) previous; + } + byte tag = tags.next(); + Object previousVal = result.getObject(); + result.set(tag, fields[tag].next(tag == result.getTag() ? + previousVal : null)); + } + return result; + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + tags = new RunLengthByteReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.DATA))); + for(TreeReader field: fields) { + if (field != null) { + field.startStripe(streams, encodings); + } + } + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + long[] counts = new long[fields.length]; + for(int i=0; i < items; ++i) { + counts[tags.next()] += 1; + } + for(int i=0; i < counts.length; ++i) { + fields[i].skipRows(counts[i]); + } + } + } + + private static class ListTreeReader extends TreeReader { + private final TreeReader elementReader; + private RunLengthIntegerReader lengths; + + ListTreeReader(Path path, int columnId, + List types, + boolean[] included) throws IOException { + super(path, columnId); + OrcProto.Type type = types.get(columnId); + elementReader = createTreeReader(path, type.getSubtypes(0), types, + included); + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + lengths.seek(index[columnId]); + elementReader.seek(index); + } + + @Override + @SuppressWarnings("unchecked") + Object next(Object previous) throws IOException { + super.next(previous); + List result = null; + if (valuePresent) { + if (previous == null) { + result = new ArrayList(); + } else { + result = (ArrayList) previous; + } + int prevLength = result.size(); + int length = (int) lengths.next(); + // extend the list to the new length + for(int i=prevLength; i < length; ++i) { + result.add(null); + } + // read the new elements into the array + for(int i=0; i< length; i++) { + result.set(i, elementReader.next(i < prevLength ? + result.get(i) : null)); + } + // remove any extra elements + for(int i=prevLength - 1; i >= length; --i) { + result.remove(i); + } + } + return result; + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + if (elementReader != null) { + elementReader.startStripe(streams, encodings); + } + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + long childSkip = 0; + for(long i=0; i < items; ++i) { + childSkip += lengths.next(); + } + elementReader.skipRows(childSkip); + } + } + + private static class MapTreeReader extends TreeReader { + private final TreeReader keyReader; + private final TreeReader valueReader; + private RunLengthIntegerReader lengths; + + MapTreeReader(Path path, + int columnId, + List types, + boolean[] included) throws IOException { + super(path, columnId); + OrcProto.Type type = types.get(columnId); + int keyColumn = type.getSubtypes(0); + int valueColumn = type.getSubtypes(1); + if (included == null || included[keyColumn]) { + keyReader = createTreeReader(path, keyColumn, types, included); + } else { + keyReader = null; + } + if (included == null || included[valueColumn]) { + valueReader = createTreeReader(path, valueColumn, types, included); + } else { + valueReader = null; + } + } + + @Override + void seek(PositionProvider[] index) throws IOException { + super.seek(index); + lengths.seek(index[columnId]); + keyReader.seek(index); + valueReader.seek(index); + } + + @Override + @SuppressWarnings("unchecked") + Object next(Object previous) throws IOException { + super.next(previous); + Map result = null; + if (valuePresent) { + if (previous == null) { + result = new HashMap(); + } else { + result = (HashMap) previous; + } + // for now just clear and create new objects + result.clear(); + int length = (int) lengths.next(); + // read the new elements into the array + for(int i=0; i< length; i++) { + result.put(keyReader.next(null), valueReader.next(null)); + } + } + return result; + } + + @Override + void startStripe(Map streams, + List encodings + ) throws IOException { + super.startStripe(streams, encodings); + lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, + OrcProto.Stream.Kind.LENGTH)), false); + if (keyReader != null) { + keyReader.startStripe(streams, encodings); + } + if (valueReader != null) { + valueReader.startStripe(streams, encodings); + } + } + + @Override + void skipRows(long items) throws IOException { + items = countNonNulls(items); + long childSkip = 0; + for(long i=0; i < items; ++i) { + childSkip += lengths.next(); + } + keyReader.skipRows(childSkip); + valueReader.skipRows(childSkip); + } + } + + private static TreeReader createTreeReader(Path path, + int columnId, + List types, + boolean[] included + ) throws IOException { + OrcProto.Type type = types.get(columnId); + switch (type.getKind()) { + case BOOLEAN: + return new BooleanTreeReader(path, columnId); + case BYTE: + return new ByteTreeReader(path, columnId); + case DOUBLE: + return new DoubleTreeReader(path, columnId); + case FLOAT: + return new FloatTreeReader(path, columnId); + case SHORT: + return new ShortTreeReader(path, columnId); + case INT: + return new IntTreeReader(path, columnId); + case LONG: + return new LongTreeReader(path, columnId); + case STRING: + return new StringTreeReader(path, columnId); + case BINARY: + return new BinaryTreeReader(path, columnId); + case TIMESTAMP: + return new TimestampTreeReader(path, columnId); + case DATE: + return new DateTreeReader(path, columnId); + case DECIMAL: + return new DecimalTreeReader(path, columnId); + case STRUCT: + return new StructTreeReader(path, columnId, types, included); + case LIST: + return new ListTreeReader(path, columnId, types, included); + case MAP: + return new MapTreeReader(path, columnId, types, included); + case UNION: + return new UnionTreeReader(path, columnId, types, included); + default: + throw new IllegalArgumentException("Unsupported type " + + type.getKind()); + } + } + + OrcProto.StripeFooter readStripeFooter(StripeInformation stripe + ) throws IOException { + long offset = stripe.getOffset() + stripe.getIndexLength() + + stripe.getDataLength(); + int tailLength = (int) stripe.getFooterLength(); + + // read the footer + ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); + file.seek(offset); + file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength); + return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf, + codec, bufferSize)); + } + + private void readStripe() throws IOException { + StripeInformation stripe = stripes.get(currentStripe); + stripeFooter = readStripeFooter(stripe); + long offset = stripe.getOffset(); + streams.clear(); + + // if we aren't projecting columns, just read the whole stripe + if (included == null) { + byte[] buffer = + new byte[(int) (stripe.getDataLength())]; + file.seek(offset + stripe.getIndexLength()); + file.readFully(buffer, 0, buffer.length); + int sectionOffset = 0; + for(OrcProto.Stream section: stripeFooter.getStreamsList()) { + if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) { + int sectionLength = (int) section.getLength(); + ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset, + sectionLength); + StreamName name = new StreamName(section.getColumn(), + section.getKind()); + streams.put(name, + InStream.create(name.toString(), sectionBuffer, codec, + bufferSize)); + sectionOffset += sectionLength; + } + } + } else { + List streamList = stripeFooter.getStreamsList(); + // the index of the current section + int currentSection = 0; + while (currentSection < streamList.size() && + StreamName.getArea(streamList.get(currentSection).getKind()) != + StreamName.Area.DATA) { + currentSection += 1; + } + // byte position of the current section relative to the stripe start + long sectionOffset = stripe.getIndexLength(); + while (currentSection < streamList.size()) { + int bytes = 0; + + // find the first section that shouldn't be read + int excluded=currentSection; + while (excluded < streamList.size() && + included[streamList.get(excluded).getColumn()]) { + bytes += streamList.get(excluded).getLength(); + excluded += 1; + } + + // actually read the bytes as a big chunk + if (bytes != 0) { + byte[] buffer = new byte[bytes]; + file.seek(offset + sectionOffset); + file.readFully(buffer, 0, bytes); + sectionOffset += bytes; + + // create the streams for the sections we just read + bytes = 0; + while (currentSection < excluded) { + OrcProto.Stream section = streamList.get(currentSection); + StreamName name = + new StreamName(section.getColumn(), section.getKind()); + this.streams.put(name, + InStream.create(name.toString(), + ByteBuffer.wrap(buffer, bytes, + (int) section.getLength()), codec, bufferSize)); + currentSection += 1; + bytes += section.getLength(); + } + } + + // skip forward until we get back to a section that we need + while (currentSection < streamList.size() && + !included[streamList.get(currentSection).getColumn()]) { + sectionOffset += streamList.get(currentSection).getLength(); + currentSection += 1; + } + } + } + reader.startStripe(streams, stripeFooter.getColumnsList()); + rowInStripe = 0; + rowCountInStripe = stripe.getNumberOfRows(); + rowBaseInStripe = 0; + for(int i=0; i < currentStripe; ++i) { + rowBaseInStripe += stripes.get(i).getNumberOfRows(); + } + for(int i=0; i < indexes.length; ++i) { + indexes[i] = null; + } + } + + @Override + public boolean hasNext() throws IOException { + return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1; + } + + @Override + public Object next(Object previous) throws IOException { + if (rowInStripe >= rowCountInStripe) { + currentStripe += 1; + readStripe(); + } + rowInStripe += 1; + return reader.next(previous); + } + + @Override + public void close() throws IOException { + file.close(); + } + + @Override + public long getRowNumber() { + return rowInStripe + rowBaseInStripe + firstRow; + } + + /** + * Return the fraction of rows that have been read from the selected. + * section of the file + * @return fraction between 0.0 and 1.0 of rows consumed + */ + @Override + public float getProgress() { + return ((float) rowBaseInStripe + rowInStripe) / totalRowCount; + } + + private int findStripe(long rowNumber) { + if (rowNumber < 0) { + throw new IllegalArgumentException("Seek to a negative row number " + + rowNumber); + } else if (rowNumber < firstRow) { + throw new IllegalArgumentException("Seek before reader range " + + rowNumber); + } + rowNumber -= firstRow; + for(int i=0; i < stripes.size(); i++) { + StripeInformation stripe = stripes.get(i); + if (stripe.getNumberOfRows() > rowNumber) { + return i; + } + rowNumber -= stripe.getNumberOfRows(); + } + throw new IllegalArgumentException("Seek after the end of reader range"); + } + + private void readRowIndex() throws IOException { + long offset = stripes.get(currentStripe).getOffset(); + for(OrcProto.Stream stream: stripeFooter.getStreamsList()) { + if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) { + int col = stream.getColumn(); + if ((included == null || included[col]) && indexes[col] == null) { + byte[] buffer = new byte[(int) stream.getLength()]; + file.seek(offset); + file.readFully(buffer); + indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", + ByteBuffer.wrap(buffer), codec, bufferSize)); + } + } + offset += stream.getLength(); + } + } + + private void seekToRowEntry(int rowEntry) throws IOException { + PositionProvider[] index = new PositionProvider[indexes.length]; + for(int i=0; i < indexes.length; ++i) { + if (indexes[i] != null) { + index[i]= + new PositionProviderImpl(indexes[i].getEntry(rowEntry)); + } + } + reader.seek(index); + } + + @Override + public void seekToRow(long rowNumber) throws IOException { + int rightStripe = findStripe(rowNumber); + if (rightStripe != currentStripe) { + currentStripe = rightStripe; + readStripe(); + } + readRowIndex(); + rowInStripe = rowNumber - rowBaseInStripe - firstRow; + if (rowIndexStride != 0) { + long entry = rowInStripe / rowIndexStride; + seekToRowEntry((int) entry); + reader.skipRows(rowInStripe - entry * rowIndexStride); + } else { + reader.skipRows(rowInStripe); + } + } +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1513155&r1=1513154&r2=1513155&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Mon Aug 12 15:03:30 2013 @@ -23,7 +23,7 @@ import java.io.IOException; /** * A reader that reads a sequence of integers. * */ -class RunLengthIntegerReader { +class RunLengthIntegerReader implements IntegerReader { private final InStream input; private final boolean signed; private final long[] literals = @@ -71,11 +71,13 @@ class RunLengthIntegerReader { } } - boolean hasNext() throws IOException { + @Override + public boolean hasNext() throws IOException { return used != numLiterals || input.available() > 0; } - long next() throws IOException { + @Override + public long next() throws IOException { long result; if (used == numLiterals) { readValues(); @@ -88,7 +90,8 @@ class RunLengthIntegerReader { return result; } - void seek(PositionProvider index) throws IOException { + @Override + public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); if (consumed != 0) { @@ -104,7 +107,8 @@ class RunLengthIntegerReader { } } - void skip(long numValues) throws IOException { + @Override + public void skip(long numValues) throws IOException { while (numValues > 0) { if (used == numLiterals) { readValues();