Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 10B9A200CCA for ; Wed, 19 Jul 2017 18:58:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0F509169648; Wed, 19 Jul 2017 16:58:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DD86E169639 for ; Wed, 19 Jul 2017 18:58:31 +0200 (CEST) Received: (qmail 97487 invoked by uid 500); 19 Jul 2017 16:58:27 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 96745 invoked by uid 99); 19 Jul 2017 16:58:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2017 16:58:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E4FCBF16C3; Wed, 19 Jul 2017 16:58:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: omalley@apache.org To: commits@hive.apache.org Date: Wed, 19 Jul 2017 16:58:51 -0000 Message-Id: In-Reply-To: <3fc3df21261a48f086876006a87e459b@git.apache.org> References: <3fc3df21261a48f086876006a87e459b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [28/37] hive git commit: HIVE-17118. Move the hive-orc source files to make the package names unique. archived-at: Wed, 19 Jul 2017 16:58:34 -0000 http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java new file mode 100644 index 0000000..b94b6a9 --- /dev/null +++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.orc.impl; + +import java.io.IOException; + +/** + * A streamFactory that writes a sequence of bytes. A control byte is written before + * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the + * bytes is -1 to -128, 1 to 128 literal byte values follow. + */ +public class RunLengthByteWriter { + static final int MIN_REPEAT_SIZE = 3; + static final int MAX_LITERAL_SIZE = 128; + static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE; + private final PositionedOutputStream output; + private final byte[] literals = new byte[MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private boolean repeat = false; + private int tailRunLength = 0; + + public RunLengthByteWriter(PositionedOutputStream output) { + this.output = output; + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + if (repeat) { + output.write(numLiterals - MIN_REPEAT_SIZE); + output.write(literals, 0, 1); + } else { + output.write(-numLiterals); + output.write(literals, 0, numLiterals); + } + repeat = false; + tailRunLength = 0; + numLiterals = 0; + } + } + + public void flush() throws IOException { + writeValues(); + output.flush(); + } + + public void write(byte value) throws IOException { + if (numLiterals == 0) { + literals[numLiterals++] = value; + tailRunLength = 1; + } else if (repeat) { + if (value == literals[0]) { + numLiterals += 1; + if (numLiterals == MAX_REPEAT_SIZE) { + writeValues(); + } + } else { + writeValues(); + literals[numLiterals++] = value; + tailRunLength = 1; + } + } else { + if (value == literals[numLiterals - 1]) { + tailRunLength += 1; + } else { + tailRunLength = 1; + } + if (tailRunLength == MIN_REPEAT_SIZE) { + if (numLiterals + 1 == MIN_REPEAT_SIZE) { + repeat = true; + numLiterals += 1; + } else { + numLiterals -= MIN_REPEAT_SIZE - 1; + writeValues(); + literals[0] = value; + repeat = true; + numLiterals = MIN_REPEAT_SIZE; + } + } else { + literals[numLiterals++] = value; + if (numLiterals == MAX_LITERAL_SIZE) { + writeValues(); + } + } + } + } + + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java new file mode 100644 index 0000000..5b613f6 --- /dev/null +++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.orc.impl; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + +/** + * A reader that reads a sequence of integers. + * */ +public class RunLengthIntegerReader implements IntegerReader { + private InStream input; + private final boolean signed; + private final long[] literals = + new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private int delta = 0; + private int used = 0; + private boolean repeat = false; + private SerializationUtils utils; + + public RunLengthIntegerReader(InStream input, boolean signed) throws IOException { + this.input = input; + this.signed = signed; + this.utils = new SerializationUtils(); + } + + private void readValues(boolean ignoreEof) throws IOException { + int control = input.read(); + if (control == -1) { + if (!ignoreEof) { + throw new EOFException("Read past end of RLE integer from " + input); + } + used = numLiterals = 0; + return; + } else if (control < 0x80) { + numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE; + used = 0; + repeat = true; + delta = input.read(); + if (delta == -1) { + throw new EOFException("End of stream in RLE Integer from " + input); + } + // convert from 0 to 255 to -128 to 127 by converting to a signed byte + delta = (byte) (0 + delta); + if (signed) { + literals[0] = utils.readVslong(input); + } else { + literals[0] = utils.readVulong(input); + } + } else { + repeat = false; + numLiterals = 0x100 - control; + used = 0; + for(int i=0; i < numLiterals; ++i) { + if (signed) { + literals[i] = utils.readVslong(input); + } else { + literals[i] = utils.readVulong(input); + } + } + } + } + + @Override + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + @Override + public long next() throws IOException { + long result; + if (used == numLiterals) { + readValues(false); + } + if (repeat) { + result = literals[0] + (used++) * delta; + } else { + result = literals[used++]; + } + return result; + } + + @Override + public void nextVector(ColumnVector previous, + long[] data, + int previousLen) throws IOException { + previous.isRepeating = true; + for (int i = 0; i < previousLen; i++) { + if (!previous.isNull[i]) { + data[i] = next(); + } else { + // The default value of null for int type in vectorized + // processing is 1, so set that if the value is null + data[i] = 1; + } + + // The default value for nulls in Vectorization for int types is 1 + // and given that non null value can also be 1, we need to check for isNull also + // when determining the isRepeating flag. + if (previous.isRepeating + && i > 0 + && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) { + previous.isRepeating = false; + } + } + } + + @Override + public void nextVector(ColumnVector vector, + int[] data, + int size) throws IOException { + if (vector.noNulls) { + for(int r=0; r < data.length && r < size; ++r) { + data[r] = (int) next(); + } + } else if (!(vector.isRepeating && vector.isNull[0])) { + for(int r=0; r < data.length && r < size; ++r) { + if (!vector.isNull[r]) { + data[r] = (int) next(); + } else { + data[r] = 1; + } + } + } + } + + @Override + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two parts + while (consumed > 0) { + readValues(false); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + @Override + public void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + readValues(false); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java new file mode 100644 index 0000000..d0c2b54 --- /dev/null +++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.orc.impl; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A reader that reads a sequence of light weight compressed integers. Refer + * {@link RunLengthIntegerWriterV2} for description of various lightweight + * compression techniques. + */ +public class RunLengthIntegerReaderV2 implements IntegerReader { + public static final Logger LOG = LoggerFactory.getLogger(RunLengthIntegerReaderV2.class); + + private InStream input; + private final boolean signed; + private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; + private boolean isRepeating = false; + private int numLiterals = 0; + private int used = 0; + private final boolean skipCorrupt; + private final SerializationUtils utils; + private RunLengthIntegerWriterV2.EncodingType currentEncoding; + + public RunLengthIntegerReaderV2(InStream input, boolean signed, + boolean skipCorrupt) throws IOException { + this.input = input; + this.signed = signed; + this.skipCorrupt = skipCorrupt; + this.utils = new SerializationUtils(); + } + + private final static RunLengthIntegerWriterV2.EncodingType[] encodings = RunLengthIntegerWriterV2.EncodingType.values(); + private void readValues(boolean ignoreEof) throws IOException { + // read the first 2 bits and determine the encoding type + isRepeating = false; + int firstByte = input.read(); + if (firstByte < 0) { + if (!ignoreEof) { + throw new EOFException("Read past end of RLE integer from " + input); + } + used = numLiterals = 0; + return; + } + currentEncoding = encodings[(firstByte >>> 6) & 0x03]; + switch (currentEncoding) { + case SHORT_REPEAT: readShortRepeatValues(firstByte); break; + case DIRECT: readDirectValues(firstByte); break; + case PATCHED_BASE: readPatchedBaseValues(firstByte); break; + case DELTA: readDeltaValues(firstByte); break; + default: throw new IOException("Unknown encoding " + currentEncoding); + } + } + + private void readDeltaValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fb = (firstByte >>> 1) & 0x1f; + if (fb != 0) { + fb = utils.decodeBitWidth(fb); + } + + // extract the blob run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + + // read the first value stored as vint + long firstVal = 0; + if (signed) { + firstVal = utils.readVslong(input); + } else { + firstVal = utils.readVulong(input); + } + + // store first value to result buffer + long prevVal = firstVal; + literals[numLiterals++] = firstVal; + + // if fixed bits is 0 then all values have fixed delta + if (fb == 0) { + // read the fixed delta value stored as vint (deltas can be negative even + // if all number are positive) + long fd = utils.readVslong(input); + if (fd == 0) { + isRepeating = true; + assert numLiterals == 1; + Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]); + numLiterals += len; + } else { + // add fixed deltas to adjacent values + for(int i = 0; i < len; i++) { + literals[numLiterals++] = literals[numLiterals - 2] + fd; + } + } + } else { + long deltaBase = utils.readVslong(input); + // add delta base and first value + literals[numLiterals++] = firstVal + deltaBase; + prevVal = literals[numLiterals - 1]; + len -= 1; + + // write the unpacked values, add it to previous value and store final + // value to result buffer. if the delta base value is negative then it + // is a decreasing sequence else an increasing sequence + utils.readInts(literals, numLiterals, len, fb, input); + while (len > 0) { + if (deltaBase < 0) { + literals[numLiterals] = prevVal - literals[numLiterals]; + } else { + literals[numLiterals] = prevVal + literals[numLiterals]; + } + prevVal = literals[numLiterals]; + len--; + numLiterals++; + } + } + } + + private void readPatchedBaseValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = utils.decodeBitWidth(fbo); + + // extract the run length of data blob + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are always one off + len += 1; + + // extract the number of bytes occupied by base + int thirdByte = input.read(); + int bw = (thirdByte >>> 5) & 0x07; + // base width is one off + bw += 1; + + // extract patch width + int pwo = thirdByte & 0x1f; + int pw = utils.decodeBitWidth(pwo); + + // read fourth byte and extract patch gap width + int fourthByte = input.read(); + int pgw = (fourthByte >>> 5) & 0x07; + // patch gap width is one off + pgw += 1; + + // extract the length of the patch list + int pl = fourthByte & 0x1f; + + // read the next base width number of bytes to extract base value + long base = utils.bytesToLongBE(input, bw); + long mask = (1L << ((bw * 8) - 1)); + // if MSB of base value is 1 then base is negative value else positive + if ((base & mask) != 0) { + base = base & ~mask; + base = -base; + } + + // unpack the data blob + long[] unpacked = new long[len]; + utils.readInts(unpacked, 0, len, fb, input); + + // unpack the patch blob + long[] unpackedPatch = new long[pl]; + + if ((pw + pgw) > 64 && !skipCorrupt) { + throw new IOException("Corruption in ORC data encountered. To skip" + + " reading corrupted data, set hive.exec.orc.skip.corrupt.data to" + + " true"); + } + int bitSize = utils.getClosestFixedBits(pw + pgw); + utils.readInts(unpackedPatch, 0, pl, bitSize, input); + + // apply the patch directly when decoding the packed data + int patchIdx = 0; + long currGap = 0; + long currPatch = 0; + long patchMask = ((1L << pw) - 1); + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + long actualGap = 0; + + // special case: gap is >255 then patch value will be 0. + // if gap is <=255 then patch value cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + } + // add the left over gap + actualGap += currGap; + + // unpack data blob, patch it (if required), add base to get final result + for(int i = 0; i < unpacked.length; i++) { + if (i == actualGap) { + // extract the patch value + long patchedVal = unpacked[i] | (currPatch << fb); + + // add base to patched value + literals[numLiterals++] = base + patchedVal; + + // increment the patch to point to next entry in patch list + patchIdx++; + + if (patchIdx < pl) { + // read the next gap and patch + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + actualGap = 0; + + // special case: gap is >255 then patch will be 0. if gap is + // <=255 then patch cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + } + // add the left over gap + actualGap += currGap; + + // next gap is relative to the current gap + actualGap += i; + } + } else { + // no patching required. add base to unpacked value to get final value + literals[numLiterals++] = base + unpacked[i]; + } + } + + } + + private void readDirectValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = utils.decodeBitWidth(fbo); + + // extract the run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are one off + len += 1; + + // write the unpacked values and zigzag decode to result buffer + utils.readInts(literals, numLiterals, len, fb, input); + if (signed) { + for(int i = 0; i < len; i++) { + literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]); + numLiterals++; + } + } else { + numLiterals += len; + } + } + + private void readShortRepeatValues(int firstByte) throws IOException { + + // read the number of bytes occupied by the value + int size = (firstByte >>> 3) & 0x07; + // #bytes are one off + size += 1; + + // read the run length + int len = firstByte & 0x07; + // run lengths values are stored only after MIN_REPEAT value is met + len += RunLengthIntegerWriterV2.MIN_REPEAT; + + // read the repeated value which is store using fixed bytes + long val = utils.bytesToLongBE(input, size); + + if (signed) { + val = utils.zigzagDecode(val); + } + + if (numLiterals != 0) { + // Currently this always holds, which makes peekNextAvailLength simpler. + // If this changes, peekNextAvailLength should be adjusted accordingly. + throw new AssertionError("readValues called with existing values present"); + } + // repeat the value for length times + isRepeating = true; + // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0 + for(int i = 0; i < len; i++) { + literals[i] = val; + } + numLiterals = len; + } + + @Override + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + @Override + public long next() throws IOException { + long result; + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(false); + } + result = literals[used++]; + return result; + } + + @Override + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two + // parts + while (consumed > 0) { + numLiterals = 0; + readValues(false); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + @Override + public void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(false); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } + + @Override + public void nextVector(ColumnVector previous, + long[] data, + int previousLen) throws IOException { + previous.isRepeating = true; + for (int i = 0; i < previousLen; i++) { + if (!previous.isNull[i]) { + data[i] = next(); + } else { + // The default value of null for int type in vectorized + // processing is 1, so set that if the value is null + data[i] = 1; + } + + // The default value for nulls in Vectorization for int types is 1 + // and given that non null value can also be 1, we need to check for isNull also + // when determining the isRepeating flag. + if (previous.isRepeating + && i > 0 + && (data[0] != data[i] || + previous.isNull[0] != previous.isNull[i])) { + previous.isRepeating = false; + } + } + } + + @Override + public void nextVector(ColumnVector vector, + int[] data, + int size) throws IOException { + if (vector.noNulls) { + for(int r=0; r < data.length && r < size; ++r) { + data[r] = (int) next(); + } + } else if (!(vector.isRepeating && vector.isNull[0])) { + for(int r=0; r < data.length && r < size; ++r) { + if (!vector.isNull[r]) { + data[r] = (int) next(); + } else { + data[r] = 1; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java new file mode 100644 index 0000000..2153001 --- /dev/null +++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.orc.impl; + +import java.io.IOException; + +/** + * A streamFactory that writes a sequence of integers. A control byte is written before + * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each + * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 + * literal vint values follow. + */ +public class RunLengthIntegerWriter implements IntegerWriter { + static final int MIN_REPEAT_SIZE = 3; + static final int MAX_DELTA = 127; + static final int MIN_DELTA = -128; + static final int MAX_LITERAL_SIZE = 128; + private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE; + private final PositionedOutputStream output; + private final boolean signed; + private final long[] literals = new long[MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private long delta = 0; + private boolean repeat = false; + private int tailRunLength = 0; + private SerializationUtils utils; + + public RunLengthIntegerWriter(PositionedOutputStream output, + boolean signed) { + this.output = output; + this.signed = signed; + this.utils = new SerializationUtils(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + if (repeat) { + output.write(numLiterals - MIN_REPEAT_SIZE); + output.write((byte) delta); + if (signed) { + utils.writeVslong(output, literals[0]); + } else { + utils.writeVulong(output, literals[0]); + } + } else { + output.write(-numLiterals); + for(int i=0; i < numLiterals; ++i) { + if (signed) { + utils.writeVslong(output, literals[i]); + } else { + utils.writeVulong(output, literals[i]); + } + } + } + repeat = false; + numLiterals = 0; + tailRunLength = 0; + } + } + + @Override + public void flush() throws IOException { + writeValues(); + output.flush(); + } + + @Override + public void write(long value) throws IOException { + if (numLiterals == 0) { + literals[numLiterals++] = value; + tailRunLength = 1; + } else if (repeat) { + if (value == literals[0] + delta * numLiterals) { + numLiterals += 1; + if (numLiterals == MAX_REPEAT_SIZE) { + writeValues(); + } + } else { + writeValues(); + literals[numLiterals++] = value; + tailRunLength = 1; + } + } else { + if (tailRunLength == 1) { + delta = value - literals[numLiterals - 1]; + if (delta < MIN_DELTA || delta > MAX_DELTA) { + tailRunLength = 1; + } else { + tailRunLength = 2; + } + } else if (value == literals[numLiterals - 1] + delta) { + tailRunLength += 1; + } else { + delta = value - literals[numLiterals - 1]; + if (delta < MIN_DELTA || delta > MAX_DELTA) { + tailRunLength = 1; + } else { + tailRunLength = 2; + } + } + if (tailRunLength == MIN_REPEAT_SIZE) { + if (numLiterals + 1 == MIN_REPEAT_SIZE) { + repeat = true; + numLiterals += 1; + } else { + numLiterals -= MIN_REPEAT_SIZE - 1; + long base = literals[numLiterals]; + writeValues(); + literals[0] = base; + repeat = true; + numLiterals = MIN_REPEAT_SIZE; + } + } else { + literals[numLiterals++] = value; + if (numLiterals == MAX_LITERAL_SIZE) { + writeValues(); + } + } + } + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java new file mode 100644 index 0000000..1140ab4 --- /dev/null +++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java @@ -0,0 +1,831 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.orc.impl; + +import java.io.IOException; + +/** + * A writer that performs light weight compression over sequence of integers. + *

+ * There are four types of lightweight integer compression + *

    + *
  • SHORT_REPEAT
  • + *
  • DIRECT
  • + *
  • PATCHED_BASE
  • + *
  • DELTA
  • + *
+ *

+ * The description and format for these types are as below: + *

+ * SHORT_REPEAT: Used for short repeated integer sequences. + *

    + *
  • 1 byte header + *
      + *
    • 2 bits for encoding type
    • + *
    • 3 bits for bytes required for repeating value
    • + *
    • 3 bits for repeat count (MIN_REPEAT + run length)
    • + *
    + *
  • + *
  • Blob - repeat value (fixed bytes)
  • + *
+ *

+ *

+ * DIRECT: Used for random integer sequences whose number of bit + * requirement doesn't vary a lot. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Blob - stores the direct values using fixed bit width. The length of the + * data blob is (fixed width * run length) bits long
  • + *
+ *

+ *

+ * PATCHED_BASE: Used for random integer sequences whose number of bit + * requirement varies beyond a threshold. + *

    + *
  • 4 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
      + * 3rd byte + *
    • 3 bits for bytes required to encode base value
    • + *
    • 5 bits for patch width
    • + *
    + *
      + * 4th byte + *
    • 3 bits for patch gap width
    • + *
    • 5 bits for patch length
    • + *
    + *
  • + *
  • Base value - Stored using fixed number of bytes. If MSB is set, base + * value is negative else positive. Length of base value is (base width * 8) + * bits.
  • + *
  • Data blob - Base reduced values as stored using fixed bit width. Length + * of data blob is (fixed width * run length) bits.
  • + *
  • Patch blob - Patch blob is a list of gap and patch value. Each entry in + * the patch list is (patch width + patch gap width) bits long. Gap between the + * subsequent elements to be patched are stored in upper part of entry whereas + * patch values are stored in lower part of entry. Length of patch blob is + * ((patch width + patch gap width) * patch length) bits.
  • + *
+ *

+ *

+ * DELTA Used for monotonically increasing or decreasing sequences, + * sequences with fixed delta values or long repeated sequences. + *

    + *
  • 2 bytes header + *
      + * 1st byte + *
    • 2 bits for encoding type
    • + *
    • 5 bits for fixed bit width of values in blob
    • + *
    • 1 bit for storing MSB of run length
    • + *
    + *
      + * 2nd byte + *
    • 8 bits for lower run length bits
    • + *
    + *
  • + *
  • Base value - zigzag encoded value written as varint
  • + *
  • Delta base - zigzag encoded value written as varint
  • + *
  • Delta blob - only positive values. monotonicity and orderness are decided + * based on the sign of the base value and delta base
  • + *
+ *

+ */ +public class RunLengthIntegerWriterV2 implements IntegerWriter { + + public enum EncodingType { + SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA + } + + static final int MAX_SCOPE = 512; + static final int MIN_REPEAT = 3; + private static final int MAX_SHORT_REPEAT_LENGTH = 10; + private long prevDelta = 0; + private int fixedRunLength = 0; + private int variableRunLength = 0; + private final long[] literals = new long[MAX_SCOPE]; + private final PositionedOutputStream output; + private final boolean signed; + private EncodingType encoding; + private int numLiterals; + private final long[] zigzagLiterals = new long[MAX_SCOPE]; + private final long[] baseRedLiterals = new long[MAX_SCOPE]; + private final long[] adjDeltas = new long[MAX_SCOPE]; + private long fixedDelta; + private int zzBits90p; + private int zzBits100p; + private int brBits95p; + private int brBits100p; + private int bitsDeltaMax; + private int patchWidth; + private int patchGapWidth; + private int patchLength; + private long[] gapVsPatchList; + private long min; + private boolean isFixedDelta; + private SerializationUtils utils; + private boolean alignedBitpacking; + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { + this(output, signed, true); + } + + public RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed, + boolean alignedBitpacking) { + this.output = output; + this.signed = signed; + this.alignedBitpacking = alignedBitpacking; + this.utils = new SerializationUtils(); + clear(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + + if (encoding.equals(EncodingType.SHORT_REPEAT)) { + writeShortRepeatValues(); + } else if (encoding.equals(EncodingType.DIRECT)) { + writeDirectValues(); + } else if (encoding.equals(EncodingType.PATCHED_BASE)) { + writePatchedBaseValues(); + } else { + writeDeltaValues(); + } + + // clear all the variables + clear(); + } + } + + private void writeDeltaValues() throws IOException { + int len = 0; + int fb = bitsDeltaMax; + int efb = 0; + + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + + if (isFixedDelta) { + // if fixed run length is greater than threshold then it will be fixed + // delta sequence with delta value 0 else fixed delta sequence with + // non-zero delta value + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + // fixed width 0 is used for long repeating values. + // sequences that require only 1 bit to encode will have an additional bit + if (fb == 1) { + fb = 2; + } + efb = utils.encodeBitWidth(fb); + efb = efb << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + final int tailBits = (len & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = len & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // store the first value from zigzag literal array + if (signed) { + utils.writeVslong(output, literals[0]); + } else { + utils.writeVulong(output, literals[0]); + } + + if (isFixedDelta) { + // if delta is fixed then we don't need to store delta blob + utils.writeVslong(output, fixedDelta); + } else { + // store the first value as delta value using zigzag encoding + utils.writeVslong(output, adjDeltas[0]); + + // adjacent delta values are bit packed. The length of adjDeltas array is + // always one less than the number of literals (delta difference for n + // elements is n-1). We have already written one element, write the + // remaining numLiterals - 2 elements here + utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output); + } + } + + private void writePatchedBaseValues() throws IOException { + + // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding + // because patch is applied to MSB bits. For example: If fixed bit width of + // base value is 7 bits and if patch is 3 bits, the actual value is + // constructed by shifting the patch to left by 7 positions. + // actual_value = patch << 7 | base_value + // So, if we align base_value then actual_value can not be reconstructed. + + // write the number of fixed bits required in next 5 bits + final int fb = brBits95p; + final int efb = utils.encodeBitWidth(fb) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + final int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = variableRunLength & 0xff; + + // if the min value is negative toggle the sign + final boolean isNegative = min < 0 ? true : false; + if (isNegative) { + min = -min; + } + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width. The additional bit is used to store the sign + // of the base value. + final int baseWidth = utils.findClosestNumBits(min) + 1; + final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + final int bb = (baseBytes - 1) << 5; + + // if the base value is negative then set MSB to 1 + if (isNegative) { + min |= (1L << ((baseBytes * 8) - 1)); + } + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + output.write(headerThirdByte); + output.write(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for(int i = baseBytes - 1; i >= 0; i--) { + byte b = (byte) ((min >>> (i * 8)) & 0xff); + output.write(b); + } + + // base reduced literals are bit packed + int closestFixedBits = utils.getClosestFixedBits(fb); + + utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits, + output); + + // write patch list + closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth); + + utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, + output); + + // reset run length + variableRunLength = 0; + } + + /** + * Store the opcode in 2 MSB bits + * @return opcode + */ + private int getOpcode() { + return encoding.ordinal() << 6; + } + + private void writeDirectValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int fb = zzBits100p; + + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + + final int efb = utils.encodeBitWidth(fb) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + final int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = variableRunLength & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // bit packing the zigzag encoded literals + utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output); + + // reset run length + variableRunLength = 0; + } + + private void writeShortRepeatValues() throws IOException { + // get the value that is repeating, compute the bits and bytes required + long repeatVal = 0; + if (signed) { + repeatVal = utils.zigzagEncode(literals[0]); + } else { + repeatVal = literals[0]; + } + + final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal); + final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + : (numBitsRepeatVal >>> 3) + 1; + + // write encoding type in top 2 bits + int header = getOpcode(); + + // write the number of bytes required for the value + header |= ((numBytesRepeatVal - 1) << 3); + + // write the run length + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + + // write the header + output.write(header); + + // write the repeating value in big endian byte order + for(int i = numBytesRepeatVal - 1; i >= 0; i--) { + int b = (int) ((repeatVal >>> (i * 8)) & 0xff); + output.write(b); + } + + fixedRunLength = 0; + } + + private void determineEncoding() { + + // we need to compute zigzag values for DIRECT encoding if we decide to + // break early for delta overflows or for shorter runs + computeZigZagLiterals(); + + zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0); + + // not a big win for shorter runs to determine encoding + if (numLiterals <= MIN_REPEAT) { + encoding = EncodingType.DIRECT; + return; + } + + // DELTA encoding check + + // for identifying monotonic sequences + boolean isIncreasing = true; + boolean isDecreasing = true; + this.isFixedDelta = true; + + this.min = literals[0]; + long max = literals[0]; + final long initialDelta = literals[1] - literals[0]; + long currDelta = initialDelta; + long deltaMax = initialDelta; + this.adjDeltas[0] = initialDelta; + + for (int i = 1; i < numLiterals; i++) { + final long l1 = literals[i]; + final long l0 = literals[i - 1]; + currDelta = l1 - l0; + min = Math.min(min, l1); + max = Math.max(max, l1); + + isIncreasing &= (l0 <= l1); + isDecreasing &= (l0 >= l1); + + isFixedDelta &= (currDelta == initialDelta); + if (i > 1) { + adjDeltas[i - 1] = Math.abs(currDelta); + deltaMax = Math.max(deltaMax, adjDeltas[i - 1]); + } + } + + // its faster to exit under delta overflow condition without checking for + // PATCHED_BASE condition as encoding using DIRECT is faster and has less + // overhead than PATCHED_BASE + if (!utils.isSafeSubtract(max, min)) { + encoding = EncodingType.DIRECT; + return; + } + + // invariant - subtracting any number from any other in the literals after + // this point won't overflow + + // if min is equal to max then the delta is 0, this condition happens for + // fixed values run >10 which cannot be encoded with SHORT_REPEAT + if (min == max) { + assert isFixedDelta : min + "==" + max + + ", isFixedDelta cannot be false"; + assert currDelta == 0 : min + "==" + max + ", currDelta should be zero"; + fixedDelta = 0; + encoding = EncodingType.DELTA; + return; + } + + if (isFixedDelta) { + assert currDelta == initialDelta + : "currDelta should be equal to initialDelta for fixed delta encoding"; + encoding = EncodingType.DELTA; + fixedDelta = currDelta; + return; + } + + // if initialDelta is 0 then we cannot delta encode as we cannot identify + // the sign of deltas (increasing or decreasing) + if (initialDelta != 0) { + // stores the number of bits required for packing delta blob in + // delta encoding + bitsDeltaMax = utils.findClosestNumBits(deltaMax); + + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } + } + + // PATCHED_BASE encoding check + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct encoding + + zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9); + int diffBitsLH = zzBits100p - zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (diffBitsLH > 1) { + + // patching is done only on base reduced values. + // remove base from literals + for (int i = 0; i < numLiterals; i++) { + baseRedLiterals[i] = literals[i] - min; + } + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95); + + // 100th percentile is used to compute the max patch width + brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. + // The decision to use patched base was based on zigzag values, but the + // actual patching is done on base reduced literals. + if ((brBits100p - brBits95p) != 0) { + encoding = EncodingType.PATCHED_BASE; + preparePatchedBlob(); + return; + } else { + encoding = EncodingType.DIRECT; + return; + } + } else { + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + encoding = EncodingType.DIRECT; + return; + } + } + + private void computeZigZagLiterals() { + // populate zigzag encoded literals + long zzEncVal = 0; + for (int i = 0; i < numLiterals; i++) { + if (signed) { + zzEncVal = utils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[i] = zzEncVal; + } + } + + private void preparePatchedBlob() { + // mask will be max value beyond which patch will be generated + long mask = (1L << brBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + patchLength = (int) Math.ceil((numLiterals * 0.05)); + + int[] gapList = new int[patchLength]; + long[] patchList = new long[patchLength]; + + // #bit for patch + patchWidth = brBits100p - brBits95p; + patchWidth = utils.getClosestFixedBits(patchWidth); + + // if patch bit requirement is 64 then it will not possible to pack + // gap and patch together in a long. To make sure gap and patch can be + // packed together adjust the patch width + if (patchWidth == 64) { + patchWidth = 56; + brBits95p = 8; + mask = (1L << brBits95p) - 1; + } + + int gapIdx = 0; + int patchIdx = 0; + int prev = 0; + int gap = 0; + int maxGap = 0; + + for(int i = 0; i < numLiterals; i++) { + // if value is above mask then create the patch and record the gap + if (baseRedLiterals[i] > mask) { + gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value index + prev = i; + gapList[gapIdx++] = gap; + + // extract the most significant bits that are over mask bits + long patch = baseRedLiterals[i] >>> brBits95p; + patchList[patchIdx++] = patch; + + // strip off the MSB to enable safe bit packing + baseRedLiterals[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && patchLength != 0) { + patchGapWidth = 1; + } else { + patchGapWidth = utils.findClosestNumBits(maxGap); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in patch list in the following way + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + // We will do the same for gap width = 511. If the element to be patched is + // the last element in the scope then gap width will be 511. In this case we + // will have 3 entries in the patch list in the following way + // 255 gap width => 0 for patch value + // 255 gap width => 0 for patch value + // 1 gap width => actual patch value + if (patchGapWidth > 8) { + patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + patchLength += 2; + } else { + patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + gapVsPatchList = new long[patchLength]; + for(int i = 0; i < patchLength; i++) { + long g = gapList[gapIdx++]; + long p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[i++] = (255L << patchWidth); + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[i] = (g << patchWidth) | p; + } + } + + /** + * clears all the variables + */ + private void clear() { + numLiterals = 0; + encoding = null; + prevDelta = 0; + fixedDelta = 0; + zzBits90p = 0; + zzBits100p = 0; + brBits95p = 0; + brBits100p = 0; + bitsDeltaMax = 0; + patchGapWidth = 0; + patchLength = 0; + patchWidth = 0; + gapVsPatchList = null; + min = 0; + isFixedDelta = true; + } + + @Override + public void flush() throws IOException { + if (numLiterals != 0) { + if (variableRunLength != 0) { + determineEncoding(); + writeValues(); + } else if (fixedRunLength != 0) { + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(); + writeValues(); + } else if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + } + output.flush(); + } + + @Override + public void write(long val) throws IOException { + if (numLiterals == 0) { + initializeLiterals(val); + } else { + if (numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + // if both values are same count as fixed run else variable run + if (val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + } else { + long currentDelta = val - literals[numLiterals - 1]; + if (prevDelta == 0 && currentDelta == 0) { + // fixed delta run + + literals[numLiterals++] = val; + + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case keep + // updating variable and fixed runs + if (variableRunLength > 0) { + fixedRunLength = 2; + } + fixedRunLength += 1; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= MIN_REPEAT - 1; + // copy the tail fixed runs + long[] tailVals = new long[MIN_REPEAT]; + System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); + + // determine variable encoding and flush values + determineEncoding(); + writeValues(); + + // shift tail fixed runs to beginning of the buffer + for(long l : tailVals) { + literals[numLiterals++] = l; + } + } + + // if fixed runs reached max repeat length then write values + if (fixedRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } else { + // variable delta run + + // if fixed run length is non-zero and if it satisfies the + // short repeat conditions then write the values as short repeats + // else use delta encoding + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + + // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) { + if (val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // keep updating variable run lengths + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength += 1; + + // if variable run length reach the max scope, write it + if (variableRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } + } + } + } + } + + private void initializeLiterals(long val) { + literals[numLiterals++] = val; + fixedRunLength = 1; + variableRunLength = 1; + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java b/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java new file mode 100644 index 0000000..3c7124b --- /dev/null +++ b/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java @@ -0,0 +1,399 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.orc.impl; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hive.orc.TypeDescription; + +/** + * Take the file types and the (optional) configuration column names/types and see if there + * has been schema evolution. + */ +public class SchemaEvolution { + // indexed by reader column id + private final TypeDescription[] readerFileTypes; + // indexed by reader column id + private final boolean[] readerIncluded; + // the offset to the first column id ignoring any ACID columns + private final int readerColumnOffset; + // indexed by file column id + private final boolean[] fileIncluded; + private final TypeDescription fileSchema; + private final TypeDescription readerSchema; + private boolean hasConversion; + // indexed by reader column id + private final boolean[] ppdSafeConversion; + + public SchemaEvolution(TypeDescription fileSchema, boolean[] includedCols) { + this(fileSchema, null, includedCols); + } + + public SchemaEvolution(TypeDescription fileSchema, + TypeDescription readerSchema, + boolean[] includeCols) { + this.readerIncluded = includeCols == null ? null : Arrays.copyOf(includeCols, includeCols.length); + this.fileIncluded = new boolean[fileSchema.getMaximumId() + 1]; + this.hasConversion = false; + this.fileSchema = fileSchema; + boolean isAcid = checkAcidSchema(fileSchema); + this.readerColumnOffset = isAcid ? acidEventFieldNames.size() : 0; + if (readerSchema != null) { + if (isAcid) { + this.readerSchema = createEventSchema(readerSchema); + } else { + this.readerSchema = readerSchema; + } + if (readerIncluded != null && + readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) { + throw new IllegalArgumentException("Include vector the wrong length: " + + this.readerSchema.toJson() + " with include length " + + readerIncluded.length); + } + this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1]; + buildConversionFileTypesArray(fileSchema, this.readerSchema); + } else { + this.readerSchema = fileSchema; + this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1]; + if (readerIncluded != null && + readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) { + throw new IllegalArgumentException("Include vector the wrong length: " + + this.readerSchema.toJson() + " with include length " + + readerIncluded.length); + } + buildSameSchemaFileTypesArray(); + } + this.ppdSafeConversion = populatePpdSafeConversion(); + } + + public TypeDescription getReaderSchema() { + return readerSchema; + } + + /** + * Returns the non-ACID (aka base) reader type description. + * + * @return the reader type ignoring the ACID rowid columns, if any + */ + public TypeDescription getReaderBaseSchema() { + return readerSchema.findSubtype(readerColumnOffset); + } + + /** + * Is there Schema Evolution data type conversion? + * @return + */ + public boolean hasConversion() { + return hasConversion; + } + + public TypeDescription getFileType(TypeDescription readerType) { + return getFileType(readerType.getId()); + } + + /** + * Get whether each column is included from the reader's point of view. + * @return a boolean array indexed by reader column id + */ + public boolean[] getReaderIncluded() { + return readerIncluded; + } + + /** + * Get whether each column is included from the file's point of view. + * @return a boolean array indexed by file column id + */ + public boolean[] getFileIncluded() { + return fileIncluded; + } + + /** + * Get the file type by reader type id. + * @param id reader column id + * @return + */ + public TypeDescription getFileType(int id) { + return readerFileTypes[id]; + } + + /** + * Check if column is safe for ppd evaluation + * @param colId reader column id + * @return true if the specified column is safe for ppd evaluation else false + */ + public boolean isPPDSafeConversion(final int colId) { + if (hasConversion()) { + if (colId < 0 || colId >= ppdSafeConversion.length) { + return false; + } + return ppdSafeConversion[colId]; + } + + // when there is no schema evolution PPD is safe + return true; + } + + private boolean[] populatePpdSafeConversion() { + if (fileSchema == null || readerSchema == null || readerFileTypes == null) { + return null; + } + + boolean[] result = new boolean[readerSchema.getMaximumId() + 1]; + boolean safePpd = validatePPDConversion(fileSchema, readerSchema); + result[readerSchema.getId()] = safePpd; + List children = readerSchema.getChildren(); + if (children != null) { + for (TypeDescription child : children) { + TypeDescription fileType = getFileType(child.getId()); + safePpd = validatePPDConversion(fileType, child); + result[child.getId()] = safePpd; + } + } + return result; + } + + private boolean validatePPDConversion(final TypeDescription fileType, + final TypeDescription readerType) { + if (fileType == null) { + return false; + } + if (fileType.getCategory().isPrimitive()) { + if (fileType.getCategory().equals(readerType.getCategory())) { + // for decimals alone do equality check to not mess up with precision change + if (fileType.getCategory().equals(TypeDescription.Category.DECIMAL) && + !fileType.equals(readerType)) { + return false; + } + return true; + } + + // only integer and string evolutions are safe + // byte -> short -> int -> long + // string <-> char <-> varchar + // NOTE: Float to double evolution is not safe as floats are stored as doubles in ORC's + // internal index, but when doing predicate evaluation for queries like "select * from + // orc_float where f = 74.72" the constant on the filter is converted from string -> double + // so the precisions will be different and the comparison will fail. + // Soon, we should convert all sargs that compare equality between floats or + // doubles to range predicates. + + // Similarly string -> char and varchar -> char and vice versa is not possible, as ORC stores + // char with padded spaces in its internal index. + switch (fileType.getCategory()) { + case BYTE: + if (readerType.getCategory().equals(TypeDescription.Category.SHORT) || + readerType.getCategory().equals(TypeDescription.Category.INT) || + readerType.getCategory().equals(TypeDescription.Category.LONG)) { + return true; + } + break; + case SHORT: + if (readerType.getCategory().equals(TypeDescription.Category.INT) || + readerType.getCategory().equals(TypeDescription.Category.LONG)) { + return true; + } + break; + case INT: + if (readerType.getCategory().equals(TypeDescription.Category.LONG)) { + return true; + } + break; + case STRING: + if (readerType.getCategory().equals(TypeDescription.Category.VARCHAR)) { + return true; + } + break; + case VARCHAR: + if (readerType.getCategory().equals(TypeDescription.Category.STRING)) { + return true; + } + break; + default: + break; + } + } + return false; + } + + /** + * Should we read the given reader column? + * @param readerId the id of column in the extended reader schema + * @return true if the column should be read + */ + public boolean includeReaderColumn(int readerId) { + return readerIncluded == null || + readerId <= readerColumnOffset || + readerIncluded[readerId - readerColumnOffset]; + } + + void buildConversionFileTypesArray(TypeDescription fileType, + TypeDescription readerType) { + // if the column isn't included, don't map it + int readerId = readerType.getId(); + if (!includeReaderColumn(readerId)) { + return; + } + boolean isOk = true; + // check the easy case first + if (fileType.getCategory() == readerType.getCategory()) { + switch (readerType.getCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DOUBLE: + case FLOAT: + case STRING: + case TIMESTAMP: + case BINARY: + case DATE: + // these are always a match + break; + case CHAR: + case VARCHAR: + // We do conversion when same CHAR/VARCHAR type but different maxLength. + if (fileType.getMaxLength() != readerType.getMaxLength()) { + hasConversion = true; + } + break; + case DECIMAL: + // We do conversion when same DECIMAL type but different precision/scale. + if (fileType.getPrecision() != readerType.getPrecision() || + fileType.getScale() != readerType.getScale()) { + hasConversion = true; + } + break; + case UNION: + case MAP: + case LIST: { + // these must be an exact match + List fileChildren = fileType.getChildren(); + List readerChildren = readerType.getChildren(); + if (fileChildren.size() == readerChildren.size()) { + for(int i=0; i < fileChildren.size(); ++i) { + buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i)); + } + } else { + isOk = false; + } + break; + } + case STRUCT: { + // allow either side to have fewer fields than the other + List fileChildren = fileType.getChildren(); + List readerChildren = readerType.getChildren(); + if (fileChildren.size() != readerChildren.size()) { + hasConversion = true; + } + int jointSize = Math.min(fileChildren.size(), readerChildren.size()); + for(int i=0; i < jointSize; ++i) { + buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i)); + } + break; + } + default: + throw new IllegalArgumentException("Unknown type " + readerType); + } + } else { + /* + * Check for the few cases where will not convert.... + */ + + isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType); + hasConversion = true; + } + if (isOk) { + if (readerFileTypes[readerId] != null) { + throw new RuntimeException("reader to file type entry already assigned"); + } + readerFileTypes[readerId] = fileType; + fileIncluded[fileType.getId()] = true; + } else { + throw new IllegalArgumentException( + String.format( + "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)", + fileType.toString(), fileType.getId(), + readerType.toString(), readerId)); + } + } + + /** + * Use to make a reader to file type array when the schema is the same. + * @return + */ + private void buildSameSchemaFileTypesArray() { + buildSameSchemaFileTypesArrayRecurse(readerSchema); + } + + void buildSameSchemaFileTypesArrayRecurse(TypeDescription readerType) { + int id = readerType.getId(); + if (!includeReaderColumn(id)) { + return; + } + if (readerFileTypes[id] != null) { + throw new RuntimeException("reader to file type entry already assigned"); + } + readerFileTypes[id] = readerType; + fileIncluded[id] = true; + List children = readerType.getChildren(); + if (children != null) { + for (TypeDescription child : children) { + buildSameSchemaFileTypesArrayRecurse(child); + } + } + } + + private static boolean checkAcidSchema(TypeDescription type) { + if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { + List rootFields = type.getFieldNames(); + if (acidEventFieldNames.equals(rootFields)) { + return true; + } + } + return false; + } + + /** + * @param typeDescr + * @return ORC types for the ACID event based on the row's type description + */ + public static TypeDescription createEventSchema(TypeDescription typeDescr) { + TypeDescription result = TypeDescription.createStruct() + .addField("operation", TypeDescription.createInt()) + .addField("originalTransaction", TypeDescription.createLong()) + .addField("bucket", TypeDescription.createInt()) + .addField("rowId", TypeDescription.createLong()) + .addField("currentTransaction", TypeDescription.createLong()) + .addField("row", typeDescr.clone()); + return result; + } + + public static final List acidEventFieldNames= new ArrayList(); + static { + acidEventFieldNames.add("operation"); + acidEventFieldNames.add("originalTransaction"); + acidEventFieldNames.add("bucket"); + acidEventFieldNames.add("rowId"); + acidEventFieldNames.add("currentTransaction"); + acidEventFieldNames.add("row"); + } +}