Return-Path:
X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io
Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io
Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183])
by cust-asf2.ponee.io (Postfix) with ESMTP id 10B9A200CCA
for ; Wed, 19 Jul 2017 18:58:34 +0200 (CEST)
Received: by cust-asf.ponee.io (Postfix)
id 0F509169648; Wed, 19 Jul 2017 16:58:34 +0000 (UTC)
Delivered-To: archive-asf-public@cust-asf.ponee.io
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by cust-asf.ponee.io (Postfix) with SMTP id DD86E169639
for ; Wed, 19 Jul 2017 18:58:31 +0200 (CEST)
Received: (qmail 97487 invoked by uid 500); 19 Jul 2017 16:58:27 -0000
Mailing-List: contact commits-help@hive.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: hive-dev@hive.apache.org
Delivered-To: mailing list commits@hive.apache.org
Received: (qmail 96745 invoked by uid 99); 19 Jul 2017 16:58:26 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jul 2017 16:58:26 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33)
id E4FCBF16C3; Wed, 19 Jul 2017 16:58:25 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: omalley@apache.org
To: commits@hive.apache.org
Date: Wed, 19 Jul 2017 16:58:51 -0000
Message-Id:
In-Reply-To: <3fc3df21261a48f086876006a87e459b@git.apache.org>
References: <3fc3df21261a48f086876006a87e459b@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: [28/37] hive git commit: HIVE-17118. Move the hive-orc source files
to make the package names unique.
archived-at: Wed, 19 Jul 2017 16:58:34 -0000
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java
new file mode 100644
index 0000000..b94b6a9
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of bytes. A control byte is written before
+ * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
+ * bytes is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteWriter {
+ static final int MIN_REPEAT_SIZE = 3;
+ static final int MAX_LITERAL_SIZE = 128;
+ static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
+ private final PositionedOutputStream output;
+ private final byte[] literals = new byte[MAX_LITERAL_SIZE];
+ private int numLiterals = 0;
+ private boolean repeat = false;
+ private int tailRunLength = 0;
+
+ public RunLengthByteWriter(PositionedOutputStream output) {
+ this.output = output;
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+ if (repeat) {
+ output.write(numLiterals - MIN_REPEAT_SIZE);
+ output.write(literals, 0, 1);
+ } else {
+ output.write(-numLiterals);
+ output.write(literals, 0, numLiterals);
+ }
+ repeat = false;
+ tailRunLength = 0;
+ numLiterals = 0;
+ }
+ }
+
+ public void flush() throws IOException {
+ writeValues();
+ output.flush();
+ }
+
+ public void write(byte value) throws IOException {
+ if (numLiterals == 0) {
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ } else if (repeat) {
+ if (value == literals[0]) {
+ numLiterals += 1;
+ if (numLiterals == MAX_REPEAT_SIZE) {
+ writeValues();
+ }
+ } else {
+ writeValues();
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ }
+ } else {
+ if (value == literals[numLiterals - 1]) {
+ tailRunLength += 1;
+ } else {
+ tailRunLength = 1;
+ }
+ if (tailRunLength == MIN_REPEAT_SIZE) {
+ if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+ repeat = true;
+ numLiterals += 1;
+ } else {
+ numLiterals -= MIN_REPEAT_SIZE - 1;
+ writeValues();
+ literals[0] = value;
+ repeat = true;
+ numLiterals = MIN_REPEAT_SIZE;
+ }
+ } else {
+ literals[numLiterals++] = value;
+ if (numLiterals == MAX_LITERAL_SIZE) {
+ writeValues();
+ }
+ }
+ }
+ }
+
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java
new file mode 100644
index 0000000..5b613f6
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * A reader that reads a sequence of integers.
+ * */
+public class RunLengthIntegerReader implements IntegerReader {
+ private InStream input;
+ private final boolean signed;
+ private final long[] literals =
+ new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE];
+ private int numLiterals = 0;
+ private int delta = 0;
+ private int used = 0;
+ private boolean repeat = false;
+ private SerializationUtils utils;
+
+ public RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ this.utils = new SerializationUtils();
+ }
+
+ private void readValues(boolean ignoreEof) throws IOException {
+ int control = input.read();
+ if (control == -1) {
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ }
+ used = numLiterals = 0;
+ return;
+ } else if (control < 0x80) {
+ numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE;
+ used = 0;
+ repeat = true;
+ delta = input.read();
+ if (delta == -1) {
+ throw new EOFException("End of stream in RLE Integer from " + input);
+ }
+ // convert from 0 to 255 to -128 to 127 by converting to a signed byte
+ delta = (byte) (0 + delta);
+ if (signed) {
+ literals[0] = utils.readVslong(input);
+ } else {
+ literals[0] = utils.readVulong(input);
+ }
+ } else {
+ repeat = false;
+ numLiterals = 0x100 - control;
+ used = 0;
+ for(int i=0; i < numLiterals; ++i) {
+ if (signed) {
+ literals[i] = utils.readVslong(input);
+ } else {
+ literals[i] = utils.readVulong(input);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ @Override
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ readValues(false);
+ }
+ if (repeat) {
+ result = literals[0] + (used++) * delta;
+ } else {
+ result = literals[used++];
+ }
+ return result;
+ }
+
+ @Override
+ public void nextVector(ColumnVector previous,
+ long[] data,
+ int previousLen) throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ data[i] = next();
+ } else {
+ // The default value of null for int type in vectorized
+ // processing is 1, so set that if the value is null
+ data[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+ @Override
+ public void nextVector(ColumnVector vector,
+ int[] data,
+ int size) throws IOException {
+ if (vector.noNulls) {
+ for(int r=0; r < data.length && r < size; ++r) {
+ data[r] = (int) next();
+ }
+ } else if (!(vector.isRepeating && vector.isNull[0])) {
+ for(int r=0; r < data.length && r < size; ++r) {
+ if (!vector.isNull[r]) {
+ data[r] = (int) next();
+ } else {
+ data[r] = 1;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two parts
+ while (consumed > 0) {
+ readValues(false);
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ @Override
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ readValues(false);
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java
new file mode 100644
index 0000000..d0c2b54
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+public class RunLengthIntegerReaderV2 implements IntegerReader {
+ public static final Logger LOG = LoggerFactory.getLogger(RunLengthIntegerReaderV2.class);
+
+ private InStream input;
+ private final boolean signed;
+ private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+ private boolean isRepeating = false;
+ private int numLiterals = 0;
+ private int used = 0;
+ private final boolean skipCorrupt;
+ private final SerializationUtils utils;
+ private RunLengthIntegerWriterV2.EncodingType currentEncoding;
+
+ public RunLengthIntegerReaderV2(InStream input, boolean signed,
+ boolean skipCorrupt) throws IOException {
+ this.input = input;
+ this.signed = signed;
+ this.skipCorrupt = skipCorrupt;
+ this.utils = new SerializationUtils();
+ }
+
+ private final static RunLengthIntegerWriterV2.EncodingType[] encodings = RunLengthIntegerWriterV2.EncodingType.values();
+ private void readValues(boolean ignoreEof) throws IOException {
+ // read the first 2 bits and determine the encoding type
+ isRepeating = false;
+ int firstByte = input.read();
+ if (firstByte < 0) {
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ }
+ used = numLiterals = 0;
+ return;
+ }
+ currentEncoding = encodings[(firstByte >>> 6) & 0x03];
+ switch (currentEncoding) {
+ case SHORT_REPEAT: readShortRepeatValues(firstByte); break;
+ case DIRECT: readDirectValues(firstByte); break;
+ case PATCHED_BASE: readPatchedBaseValues(firstByte); break;
+ case DELTA: readDeltaValues(firstByte); break;
+ default: throw new IOException("Unknown encoding " + currentEncoding);
+ }
+ }
+
+ private void readDeltaValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fb = (firstByte >>> 1) & 0x1f;
+ if (fb != 0) {
+ fb = utils.decodeBitWidth(fb);
+ }
+
+ // extract the blob run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+
+ // read the first value stored as vint
+ long firstVal = 0;
+ if (signed) {
+ firstVal = utils.readVslong(input);
+ } else {
+ firstVal = utils.readVulong(input);
+ }
+
+ // store first value to result buffer
+ long prevVal = firstVal;
+ literals[numLiterals++] = firstVal;
+
+ // if fixed bits is 0 then all values have fixed delta
+ if (fb == 0) {
+ // read the fixed delta value stored as vint (deltas can be negative even
+ // if all number are positive)
+ long fd = utils.readVslong(input);
+ if (fd == 0) {
+ isRepeating = true;
+ assert numLiterals == 1;
+ Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]);
+ numLiterals += len;
+ } else {
+ // add fixed deltas to adjacent values
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
+ }
+ } else {
+ long deltaBase = utils.readVslong(input);
+ // add delta base and first value
+ literals[numLiterals++] = firstVal + deltaBase;
+ prevVal = literals[numLiterals - 1];
+ len -= 1;
+
+ // write the unpacked values, add it to previous value and store final
+ // value to result buffer. if the delta base value is negative then it
+ // is a decreasing sequence else an increasing sequence
+ utils.readInts(literals, numLiterals, len, fb, input);
+ while (len > 0) {
+ if (deltaBase < 0) {
+ literals[numLiterals] = prevVal - literals[numLiterals];
+ } else {
+ literals[numLiterals] = prevVal + literals[numLiterals];
+ }
+ prevVal = literals[numLiterals];
+ len--;
+ numLiterals++;
+ }
+ }
+ }
+
+ private void readPatchedBaseValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = utils.decodeBitWidth(fbo);
+
+ // extract the run length of data blob
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are always one off
+ len += 1;
+
+ // extract the number of bytes occupied by base
+ int thirdByte = input.read();
+ int bw = (thirdByte >>> 5) & 0x07;
+ // base width is one off
+ bw += 1;
+
+ // extract patch width
+ int pwo = thirdByte & 0x1f;
+ int pw = utils.decodeBitWidth(pwo);
+
+ // read fourth byte and extract patch gap width
+ int fourthByte = input.read();
+ int pgw = (fourthByte >>> 5) & 0x07;
+ // patch gap width is one off
+ pgw += 1;
+
+ // extract the length of the patch list
+ int pl = fourthByte & 0x1f;
+
+ // read the next base width number of bytes to extract base value
+ long base = utils.bytesToLongBE(input, bw);
+ long mask = (1L << ((bw * 8) - 1));
+ // if MSB of base value is 1 then base is negative value else positive
+ if ((base & mask) != 0) {
+ base = base & ~mask;
+ base = -base;
+ }
+
+ // unpack the data blob
+ long[] unpacked = new long[len];
+ utils.readInts(unpacked, 0, len, fb, input);
+
+ // unpack the patch blob
+ long[] unpackedPatch = new long[pl];
+
+ if ((pw + pgw) > 64 && !skipCorrupt) {
+ throw new IOException("Corruption in ORC data encountered. To skip" +
+ " reading corrupted data, set hive.exec.orc.skip.corrupt.data to" +
+ " true");
+ }
+ int bitSize = utils.getClosestFixedBits(pw + pgw);
+ utils.readInts(unpackedPatch, 0, pl, bitSize, input);
+
+ // apply the patch directly when decoding the packed data
+ int patchIdx = 0;
+ long currGap = 0;
+ long currPatch = 0;
+ long patchMask = ((1L << pw) - 1);
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ long actualGap = 0;
+
+ // special case: gap is >255 then patch value will be 0.
+ // if gap is <=255 then patch value cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // unpack data blob, patch it (if required), add base to get final result
+ for(int i = 0; i < unpacked.length; i++) {
+ if (i == actualGap) {
+ // extract the patch value
+ long patchedVal = unpacked[i] | (currPatch << fb);
+
+ // add base to patched value
+ literals[numLiterals++] = base + patchedVal;
+
+ // increment the patch to point to next entry in patch list
+ patchIdx++;
+
+ if (patchIdx < pl) {
+ // read the next gap and patch
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ actualGap = 0;
+
+ // special case: gap is >255 then patch will be 0. if gap is
+ // <=255 then patch cannot be 0
+ while (currGap == 255 && currPatch == 0) {
+ actualGap += 255;
+ patchIdx++;
+ currGap = unpackedPatch[patchIdx] >>> pw;
+ currPatch = unpackedPatch[patchIdx] & patchMask;
+ }
+ // add the left over gap
+ actualGap += currGap;
+
+ // next gap is relative to the current gap
+ actualGap += i;
+ }
+ } else {
+ // no patching required. add base to unpacked value to get final value
+ literals[numLiterals++] = base + unpacked[i];
+ }
+ }
+
+ }
+
+ private void readDirectValues(int firstByte) throws IOException {
+
+ // extract the number of fixed bits
+ int fbo = (firstByte >>> 1) & 0x1f;
+ int fb = utils.decodeBitWidth(fbo);
+
+ // extract the run length
+ int len = (firstByte & 0x01) << 8;
+ len |= input.read();
+ // runs are one off
+ len += 1;
+
+ // write the unpacked values and zigzag decode to result buffer
+ utils.readInts(literals, numLiterals, len, fb, input);
+ if (signed) {
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]);
+ numLiterals++;
+ }
+ } else {
+ numLiterals += len;
+ }
+ }
+
+ private void readShortRepeatValues(int firstByte) throws IOException {
+
+ // read the number of bytes occupied by the value
+ int size = (firstByte >>> 3) & 0x07;
+ // #bytes are one off
+ size += 1;
+
+ // read the run length
+ int len = firstByte & 0x07;
+ // run lengths values are stored only after MIN_REPEAT value is met
+ len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+ // read the repeated value which is store using fixed bytes
+ long val = utils.bytesToLongBE(input, size);
+
+ if (signed) {
+ val = utils.zigzagDecode(val);
+ }
+
+ if (numLiterals != 0) {
+ // Currently this always holds, which makes peekNextAvailLength simpler.
+ // If this changes, peekNextAvailLength should be adjusted accordingly.
+ throw new AssertionError("readValues called with existing values present");
+ }
+ // repeat the value for length times
+ isRepeating = true;
+ // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0
+ for(int i = 0; i < len; i++) {
+ literals[i] = val;
+ }
+ numLiterals = len;
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return used != numLiterals || input.available() > 0;
+ }
+
+ @Override
+ public long next() throws IOException {
+ long result;
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues(false);
+ }
+ result = literals[used++];
+ return result;
+ }
+
+ @Override
+ public void seek(PositionProvider index) throws IOException {
+ input.seek(index);
+ int consumed = (int) index.getNext();
+ if (consumed != 0) {
+ // a loop is required for cases where we break the run into two
+ // parts
+ while (consumed > 0) {
+ numLiterals = 0;
+ readValues(false);
+ used = consumed;
+ consumed -= numLiterals;
+ }
+ } else {
+ used = 0;
+ numLiterals = 0;
+ }
+ }
+
+ @Override
+ public void skip(long numValues) throws IOException {
+ while (numValues > 0) {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues(false);
+ }
+ long consume = Math.min(numValues, numLiterals - used);
+ used += consume;
+ numValues -= consume;
+ }
+ }
+
+ @Override
+ public void nextVector(ColumnVector previous,
+ long[] data,
+ int previousLen) throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ data[i] = next();
+ } else {
+ // The default value of null for int type in vectorized
+ // processing is 1, so set that if the value is null
+ data[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && (data[0] != data[i] ||
+ previous.isNull[0] != previous.isNull[i])) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+ @Override
+ public void nextVector(ColumnVector vector,
+ int[] data,
+ int size) throws IOException {
+ if (vector.noNulls) {
+ for(int r=0; r < data.length && r < size; ++r) {
+ data[r] = (int) next();
+ }
+ } else if (!(vector.isRepeating && vector.isNull[0])) {
+ for(int r=0; r < data.length && r < size; ++r) {
+ if (!vector.isNull[r]) {
+ data[r] = (int) next();
+ } else {
+ data[r] = 1;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java
new file mode 100644
index 0000000..2153001
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of integers. A control byte is written before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each
+ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
+ * literal vint values follow.
+ */
+public class RunLengthIntegerWriter implements IntegerWriter {
+ static final int MIN_REPEAT_SIZE = 3;
+ static final int MAX_DELTA = 127;
+ static final int MIN_DELTA = -128;
+ static final int MAX_LITERAL_SIZE = 128;
+ private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private final long[] literals = new long[MAX_LITERAL_SIZE];
+ private int numLiterals = 0;
+ private long delta = 0;
+ private boolean repeat = false;
+ private int tailRunLength = 0;
+ private SerializationUtils utils;
+
+ public RunLengthIntegerWriter(PositionedOutputStream output,
+ boolean signed) {
+ this.output = output;
+ this.signed = signed;
+ this.utils = new SerializationUtils();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+ if (repeat) {
+ output.write(numLiterals - MIN_REPEAT_SIZE);
+ output.write((byte) delta);
+ if (signed) {
+ utils.writeVslong(output, literals[0]);
+ } else {
+ utils.writeVulong(output, literals[0]);
+ }
+ } else {
+ output.write(-numLiterals);
+ for(int i=0; i < numLiterals; ++i) {
+ if (signed) {
+ utils.writeVslong(output, literals[i]);
+ } else {
+ utils.writeVulong(output, literals[i]);
+ }
+ }
+ }
+ repeat = false;
+ numLiterals = 0;
+ tailRunLength = 0;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ writeValues();
+ output.flush();
+ }
+
+ @Override
+ public void write(long value) throws IOException {
+ if (numLiterals == 0) {
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ } else if (repeat) {
+ if (value == literals[0] + delta * numLiterals) {
+ numLiterals += 1;
+ if (numLiterals == MAX_REPEAT_SIZE) {
+ writeValues();
+ }
+ } else {
+ writeValues();
+ literals[numLiterals++] = value;
+ tailRunLength = 1;
+ }
+ } else {
+ if (tailRunLength == 1) {
+ delta = value - literals[numLiterals - 1];
+ if (delta < MIN_DELTA || delta > MAX_DELTA) {
+ tailRunLength = 1;
+ } else {
+ tailRunLength = 2;
+ }
+ } else if (value == literals[numLiterals - 1] + delta) {
+ tailRunLength += 1;
+ } else {
+ delta = value - literals[numLiterals - 1];
+ if (delta < MIN_DELTA || delta > MAX_DELTA) {
+ tailRunLength = 1;
+ } else {
+ tailRunLength = 2;
+ }
+ }
+ if (tailRunLength == MIN_REPEAT_SIZE) {
+ if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+ repeat = true;
+ numLiterals += 1;
+ } else {
+ numLiterals -= MIN_REPEAT_SIZE - 1;
+ long base = literals[numLiterals];
+ writeValues();
+ literals[0] = base;
+ repeat = true;
+ numLiterals = MIN_REPEAT_SIZE;
+ }
+ } else {
+ literals[numLiterals++] = value;
+ if (numLiterals == MAX_LITERAL_SIZE) {
+ writeValues();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java
new file mode 100644
index 0000000..1140ab4
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java
@@ -0,0 +1,831 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ *
+ * There are four types of lightweight integer compression
+ *
+ * - SHORT_REPEAT
+ * - DIRECT
+ * - PATCHED_BASE
+ * - DELTA
+ *
+ *
+ * The description and format for these types are as below:
+ *
+ * SHORT_REPEAT: Used for short repeated integer sequences.
+ *
+ * - 1 byte header
+ *
+ * - 2 bits for encoding type
+ * - 3 bits for bytes required for repeating value
+ * - 3 bits for repeat count (MIN_REPEAT + run length)
+ *
+ *
+ * - Blob - repeat value (fixed bytes)
+ *
+ *
+ *
+ * DIRECT: Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Blob - stores the direct values using fixed bit width. The length of the
+ * data blob is (fixed width * run length) bits long
+ *
+ *
+ *
+ * PATCHED_BASE: Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ *
+ * - 4 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * 3rd byte
+ * - 3 bits for bytes required to encode base value
+ * - 5 bits for patch width
+ *
+ *
+ * 4th byte
+ * - 3 bits for patch gap width
+ * - 5 bits for patch length
+ *
+ *
+ * - Base value - Stored using fixed number of bytes. If MSB is set, base
+ * value is negative else positive. Length of base value is (base width * 8)
+ * bits.
+ * - Data blob - Base reduced values as stored using fixed bit width. Length
+ * of data blob is (fixed width * run length) bits.
+ * - Patch blob - Patch blob is a list of gap and patch value. Each entry in
+ * the patch list is (patch width + patch gap width) bits long. Gap between the
+ * subsequent elements to be patched are stored in upper part of entry whereas
+ * patch values are stored in lower part of entry. Length of patch blob is
+ * ((patch width + patch gap width) * patch length) bits.
+ *
+ *
+ *
+ * DELTA Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ *
+ * - 2 bytes header
+ *
+ * 1st byte
+ * - 2 bits for encoding type
+ * - 5 bits for fixed bit width of values in blob
+ * - 1 bit for storing MSB of run length
+ *
+ *
+ * 2nd byte
+ * - 8 bits for lower run length bits
+ *
+ *
+ * - Base value - zigzag encoded value written as varint
+ * - Delta base - zigzag encoded value written as varint
+ * - Delta blob - only positive values. monotonicity and orderness are decided
+ * based on the sign of the base value and delta base
+ *
+ *
+ */
+public class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+ public enum EncodingType {
+ SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+ }
+
+ static final int MAX_SCOPE = 512;
+ static final int MIN_REPEAT = 3;
+ private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+ private long prevDelta = 0;
+ private int fixedRunLength = 0;
+ private int variableRunLength = 0;
+ private final long[] literals = new long[MAX_SCOPE];
+ private final PositionedOutputStream output;
+ private final boolean signed;
+ private EncodingType encoding;
+ private int numLiterals;
+ private final long[] zigzagLiterals = new long[MAX_SCOPE];
+ private final long[] baseRedLiterals = new long[MAX_SCOPE];
+ private final long[] adjDeltas = new long[MAX_SCOPE];
+ private long fixedDelta;
+ private int zzBits90p;
+ private int zzBits100p;
+ private int brBits95p;
+ private int brBits100p;
+ private int bitsDeltaMax;
+ private int patchWidth;
+ private int patchGapWidth;
+ private int patchLength;
+ private long[] gapVsPatchList;
+ private long min;
+ private boolean isFixedDelta;
+ private SerializationUtils utils;
+ private boolean alignedBitpacking;
+
+ RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+ this(output, signed, true);
+ }
+
+ public RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
+ boolean alignedBitpacking) {
+ this.output = output;
+ this.signed = signed;
+ this.alignedBitpacking = alignedBitpacking;
+ this.utils = new SerializationUtils();
+ clear();
+ }
+
+ private void writeValues() throws IOException {
+ if (numLiterals != 0) {
+
+ if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+ writeShortRepeatValues();
+ } else if (encoding.equals(EncodingType.DIRECT)) {
+ writeDirectValues();
+ } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+ writePatchedBaseValues();
+ } else {
+ writeDeltaValues();
+ }
+
+ // clear all the variables
+ clear();
+ }
+ }
+
+ private void writeDeltaValues() throws IOException {
+ int len = 0;
+ int fb = bitsDeltaMax;
+ int efb = 0;
+
+ if (alignedBitpacking) {
+ fb = utils.getClosestAlignedFixedBits(fb);
+ }
+
+ if (isFixedDelta) {
+ // if fixed run length is greater than threshold then it will be fixed
+ // delta sequence with delta value 0 else fixed delta sequence with
+ // non-zero delta value
+ if (fixedRunLength > MIN_REPEAT) {
+ // ex. sequence: 2 2 2 2 2 2 2 2
+ len = fixedRunLength - 1;
+ fixedRunLength = 0;
+ } else {
+ // ex. sequence: 4 6 8 10 12 14 16
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+ } else {
+ // fixed width 0 is used for long repeating values.
+ // sequences that require only 1 bit to encode will have an additional bit
+ if (fb == 1) {
+ fb = 2;
+ }
+ efb = utils.encodeBitWidth(fb);
+ efb = efb << 1;
+ len = variableRunLength - 1;
+ variableRunLength = 0;
+ }
+
+ // extract the 9th bit of run length
+ final int tailBits = (len & 0x100) >>> 8;
+
+ // create first byte of the header
+ final int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ final int headerSecondByte = len & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // store the first value from zigzag literal array
+ if (signed) {
+ utils.writeVslong(output, literals[0]);
+ } else {
+ utils.writeVulong(output, literals[0]);
+ }
+
+ if (isFixedDelta) {
+ // if delta is fixed then we don't need to store delta blob
+ utils.writeVslong(output, fixedDelta);
+ } else {
+ // store the first value as delta value using zigzag encoding
+ utils.writeVslong(output, adjDeltas[0]);
+
+ // adjacent delta values are bit packed. The length of adjDeltas array is
+ // always one less than the number of literals (delta difference for n
+ // elements is n-1). We have already written one element, write the
+ // remaining numLiterals - 2 elements here
+ utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output);
+ }
+ }
+
+ private void writePatchedBaseValues() throws IOException {
+
+ // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
+ // because patch is applied to MSB bits. For example: If fixed bit width of
+ // base value is 7 bits and if patch is 3 bits, the actual value is
+ // constructed by shifting the patch to left by 7 positions.
+ // actual_value = patch << 7 | base_value
+ // So, if we align base_value then actual_value can not be reconstructed.
+
+ // write the number of fixed bits required in next 5 bits
+ final int fb = brBits95p;
+ final int efb = utils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length, they are one off
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ final int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ final int headerSecondByte = variableRunLength & 0xff;
+
+ // if the min value is negative toggle the sign
+ final boolean isNegative = min < 0 ? true : false;
+ if (isNegative) {
+ min = -min;
+ }
+
+ // find the number of bytes required for base and shift it by 5 bits
+ // to accommodate patch width. The additional bit is used to store the sign
+ // of the base value.
+ final int baseWidth = utils.findClosestNumBits(min) + 1;
+ final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+ final int bb = (baseBytes - 1) << 5;
+
+ // if the base value is negative then set MSB to 1
+ if (isNegative) {
+ min |= (1L << ((baseBytes * 8) - 1));
+ }
+
+ // third byte contains 3 bits for number of bytes occupied by base
+ // and 5 bits for patchWidth
+ final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
+
+ // fourth byte contains 3 bits for page gap width and 5 bits for
+ // patch length
+ final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+ output.write(headerThirdByte);
+ output.write(headerFourthByte);
+
+ // write the base value using fixed bytes in big endian order
+ for(int i = baseBytes - 1; i >= 0; i--) {
+ byte b = (byte) ((min >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ // base reduced literals are bit packed
+ int closestFixedBits = utils.getClosestFixedBits(fb);
+
+ utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits,
+ output);
+
+ // write patch list
+ closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
+
+ utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
+ output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ /**
+ * Store the opcode in 2 MSB bits
+ * @return opcode
+ */
+ private int getOpcode() {
+ return encoding.ordinal() << 6;
+ }
+
+ private void writeDirectValues() throws IOException {
+
+ // write the number of fixed bits required in next 5 bits
+ int fb = zzBits100p;
+
+ if (alignedBitpacking) {
+ fb = utils.getClosestAlignedFixedBits(fb);
+ }
+
+ final int efb = utils.encodeBitWidth(fb) << 1;
+
+ // adjust variable run length
+ variableRunLength -= 1;
+
+ // extract the 9th bit of run length
+ final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+ // create first byte of the header
+ final int headerFirstByte = getOpcode() | efb | tailBits;
+
+ // second byte of the header stores the remaining 8 bits of runlength
+ final int headerSecondByte = variableRunLength & 0xff;
+
+ // write header
+ output.write(headerFirstByte);
+ output.write(headerSecondByte);
+
+ // bit packing the zigzag encoded literals
+ utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output);
+
+ // reset run length
+ variableRunLength = 0;
+ }
+
+ private void writeShortRepeatValues() throws IOException {
+ // get the value that is repeating, compute the bits and bytes required
+ long repeatVal = 0;
+ if (signed) {
+ repeatVal = utils.zigzagEncode(literals[0]);
+ } else {
+ repeatVal = literals[0];
+ }
+
+ final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
+ final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+ : (numBitsRepeatVal >>> 3) + 1;
+
+ // write encoding type in top 2 bits
+ int header = getOpcode();
+
+ // write the number of bytes required for the value
+ header |= ((numBytesRepeatVal - 1) << 3);
+
+ // write the run length
+ fixedRunLength -= MIN_REPEAT;
+ header |= fixedRunLength;
+
+ // write the header
+ output.write(header);
+
+ // write the repeating value in big endian byte order
+ for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
+ int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+ output.write(b);
+ }
+
+ fixedRunLength = 0;
+ }
+
+ private void determineEncoding() {
+
+ // we need to compute zigzag values for DIRECT encoding if we decide to
+ // break early for delta overflows or for shorter runs
+ computeZigZagLiterals();
+
+ zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
+
+ // not a big win for shorter runs to determine encoding
+ if (numLiterals <= MIN_REPEAT) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // DELTA encoding check
+
+ // for identifying monotonic sequences
+ boolean isIncreasing = true;
+ boolean isDecreasing = true;
+ this.isFixedDelta = true;
+
+ this.min = literals[0];
+ long max = literals[0];
+ final long initialDelta = literals[1] - literals[0];
+ long currDelta = initialDelta;
+ long deltaMax = initialDelta;
+ this.adjDeltas[0] = initialDelta;
+
+ for (int i = 1; i < numLiterals; i++) {
+ final long l1 = literals[i];
+ final long l0 = literals[i - 1];
+ currDelta = l1 - l0;
+ min = Math.min(min, l1);
+ max = Math.max(max, l1);
+
+ isIncreasing &= (l0 <= l1);
+ isDecreasing &= (l0 >= l1);
+
+ isFixedDelta &= (currDelta == initialDelta);
+ if (i > 1) {
+ adjDeltas[i - 1] = Math.abs(currDelta);
+ deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
+ }
+ }
+
+ // its faster to exit under delta overflow condition without checking for
+ // PATCHED_BASE condition as encoding using DIRECT is faster and has less
+ // overhead than PATCHED_BASE
+ if (!utils.isSafeSubtract(max, min)) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+
+ // invariant - subtracting any number from any other in the literals after
+ // this point won't overflow
+
+ // if min is equal to max then the delta is 0, this condition happens for
+ // fixed values run >10 which cannot be encoded with SHORT_REPEAT
+ if (min == max) {
+ assert isFixedDelta : min + "==" + max +
+ ", isFixedDelta cannot be false";
+ assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
+ fixedDelta = 0;
+ encoding = EncodingType.DELTA;
+ return;
+ }
+
+ if (isFixedDelta) {
+ assert currDelta == initialDelta
+ : "currDelta should be equal to initialDelta for fixed delta encoding";
+ encoding = EncodingType.DELTA;
+ fixedDelta = currDelta;
+ return;
+ }
+
+ // if initialDelta is 0 then we cannot delta encode as we cannot identify
+ // the sign of deltas (increasing or decreasing)
+ if (initialDelta != 0) {
+ // stores the number of bits required for packing delta blob in
+ // delta encoding
+ bitsDeltaMax = utils.findClosestNumBits(deltaMax);
+
+ // monotonic condition
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
+ }
+ }
+
+ // PATCHED_BASE encoding check
+
+ // percentile values are computed for the zigzag encoded values. if the
+ // number of bit requirement between 90th and 100th percentile varies
+ // beyond a threshold then we need to patch the values. if the variation
+ // is not significant then we can use direct encoding
+
+ zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
+ int diffBitsLH = zzBits100p - zzBits90p;
+
+ // if the difference between 90th percentile and 100th percentile fixed
+ // bits is > 1 then we need patch the values
+ if (diffBitsLH > 1) {
+
+ // patching is done only on base reduced values.
+ // remove base from literals
+ for (int i = 0; i < numLiterals; i++) {
+ baseRedLiterals[i] = literals[i] - min;
+ }
+
+ // 95th percentile width is used to determine max allowed value
+ // after which patching will be done
+ brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
+
+ // 100th percentile is used to compute the max patch width
+ brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
+
+ // after base reducing the values, if the difference in bits between
+ // 95th percentile and 100th percentile value is zero then there
+ // is no point in patching the values, in which case we will
+ // fallback to DIRECT encoding.
+ // The decision to use patched base was based on zigzag values, but the
+ // actual patching is done on base reduced literals.
+ if ((brBits100p - brBits95p) != 0) {
+ encoding = EncodingType.PATCHED_BASE;
+ preparePatchedBlob();
+ return;
+ } else {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ } else {
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
+ encoding = EncodingType.DIRECT;
+ return;
+ }
+ }
+
+ private void computeZigZagLiterals() {
+ // populate zigzag encoded literals
+ long zzEncVal = 0;
+ for (int i = 0; i < numLiterals; i++) {
+ if (signed) {
+ zzEncVal = utils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ zigzagLiterals[i] = zzEncVal;
+ }
+ }
+
+ private void preparePatchedBlob() {
+ // mask will be max value beyond which patch will be generated
+ long mask = (1L << brBits95p) - 1;
+
+ // since we are considering only 95 percentile, the size of gap and
+ // patch array can contain only be 5% values
+ patchLength = (int) Math.ceil((numLiterals * 0.05));
+
+ int[] gapList = new int[patchLength];
+ long[] patchList = new long[patchLength];
+
+ // #bit for patch
+ patchWidth = brBits100p - brBits95p;
+ patchWidth = utils.getClosestFixedBits(patchWidth);
+
+ // if patch bit requirement is 64 then it will not possible to pack
+ // gap and patch together in a long. To make sure gap and patch can be
+ // packed together adjust the patch width
+ if (patchWidth == 64) {
+ patchWidth = 56;
+ brBits95p = 8;
+ mask = (1L << brBits95p) - 1;
+ }
+
+ int gapIdx = 0;
+ int patchIdx = 0;
+ int prev = 0;
+ int gap = 0;
+ int maxGap = 0;
+
+ for(int i = 0; i < numLiterals; i++) {
+ // if value is above mask then create the patch and record the gap
+ if (baseRedLiterals[i] > mask) {
+ gap = i - prev;
+ if (gap > maxGap) {
+ maxGap = gap;
+ }
+
+ // gaps are relative, so store the previous patched value index
+ prev = i;
+ gapList[gapIdx++] = gap;
+
+ // extract the most significant bits that are over mask bits
+ long patch = baseRedLiterals[i] >>> brBits95p;
+ patchList[patchIdx++] = patch;
+
+ // strip off the MSB to enable safe bit packing
+ baseRedLiterals[i] &= mask;
+ }
+ }
+
+ // adjust the patch length to number of entries in gap list
+ patchLength = gapIdx;
+
+ // if the element to be patched is the first and only element then
+ // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+ if (maxGap == 0 && patchLength != 0) {
+ patchGapWidth = 1;
+ } else {
+ patchGapWidth = utils.findClosestNumBits(maxGap);
+ }
+
+ // special case: if the patch gap width is greater than 256, then
+ // we need 9 bits to encode the gap width. But we only have 3 bits in
+ // header to record the gap width. To deal with this case, we will save
+ // two entries in patch list in the following way
+ // 256 gap width => 0 for patch value
+ // actual gap - 256 => actual patch value
+ // We will do the same for gap width = 511. If the element to be patched is
+ // the last element in the scope then gap width will be 511. In this case we
+ // will have 3 entries in the patch list in the following way
+ // 255 gap width => 0 for patch value
+ // 255 gap width => 0 for patch value
+ // 1 gap width => actual patch value
+ if (patchGapWidth > 8) {
+ patchGapWidth = 8;
+ // for gap = 511, we need two additional entries in patch list
+ if (maxGap == 511) {
+ patchLength += 2;
+ } else {
+ patchLength += 1;
+ }
+ }
+
+ // create gap vs patch list
+ gapIdx = 0;
+ patchIdx = 0;
+ gapVsPatchList = new long[patchLength];
+ for(int i = 0; i < patchLength; i++) {
+ long g = gapList[gapIdx++];
+ long p = patchList[patchIdx++];
+ while (g > 255) {
+ gapVsPatchList[i++] = (255L << patchWidth);
+ g -= 255;
+ }
+
+ // store patch value in LSBs and gap in MSBs
+ gapVsPatchList[i] = (g << patchWidth) | p;
+ }
+ }
+
+ /**
+ * clears all the variables
+ */
+ private void clear() {
+ numLiterals = 0;
+ encoding = null;
+ prevDelta = 0;
+ fixedDelta = 0;
+ zzBits90p = 0;
+ zzBits100p = 0;
+ brBits95p = 0;
+ brBits100p = 0;
+ bitsDeltaMax = 0;
+ patchGapWidth = 0;
+ patchLength = 0;
+ patchWidth = 0;
+ gapVsPatchList = null;
+ min = 0;
+ isFixedDelta = true;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (numLiterals != 0) {
+ if (variableRunLength != 0) {
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength != 0) {
+ if (fixedRunLength < MIN_REPEAT) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ determineEncoding();
+ writeValues();
+ } else if (fixedRunLength >= MIN_REPEAT
+ && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+ }
+ output.flush();
+ }
+
+ @Override
+ public void write(long val) throws IOException {
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ if (numLiterals == 1) {
+ prevDelta = val - literals[0];
+ literals[numLiterals++] = val;
+ // if both values are same count as fixed run else variable run
+ if (val == literals[0]) {
+ fixedRunLength = 2;
+ variableRunLength = 0;
+ } else {
+ fixedRunLength = 0;
+ variableRunLength = 2;
+ }
+ } else {
+ long currentDelta = val - literals[numLiterals - 1];
+ if (prevDelta == 0 && currentDelta == 0) {
+ // fixed delta run
+
+ literals[numLiterals++] = val;
+
+ // if variable run is non-zero then we are seeing repeating
+ // values at the end of variable run in which case keep
+ // updating variable and fixed runs
+ if (variableRunLength > 0) {
+ fixedRunLength = 2;
+ }
+ fixedRunLength += 1;
+
+ // if fixed run met the minimum condition and if variable
+ // run is non-zero then flush the variable run and shift the
+ // tail fixed runs to start of the buffer
+ if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+ numLiterals -= MIN_REPEAT;
+ variableRunLength -= MIN_REPEAT - 1;
+ // copy the tail fixed runs
+ long[] tailVals = new long[MIN_REPEAT];
+ System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+ // determine variable encoding and flush values
+ determineEncoding();
+ writeValues();
+
+ // shift tail fixed runs to beginning of the buffer
+ for(long l : tailVals) {
+ literals[numLiterals++] = l;
+ }
+ }
+
+ // if fixed runs reached max repeat length then write values
+ if (fixedRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ } else {
+ // variable delta run
+
+ // if fixed run length is non-zero and if it satisfies the
+ // short repeat conditions then write the values as short repeats
+ // else use delta encoding
+ if (fixedRunLength >= MIN_REPEAT) {
+ if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+ encoding = EncodingType.SHORT_REPEAT;
+ writeValues();
+ } else {
+ encoding = EncodingType.DELTA;
+ isFixedDelta = true;
+ writeValues();
+ }
+ }
+
+ // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) {
+ if (val != literals[numLiterals - 1]) {
+ variableRunLength = fixedRunLength;
+ fixedRunLength = 0;
+ }
+ }
+
+ // after writing values re-initialize the variables
+ if (numLiterals == 0) {
+ initializeLiterals(val);
+ } else {
+ // keep updating variable run lengths
+ prevDelta = val - literals[numLiterals - 1];
+ literals[numLiterals++] = val;
+ variableRunLength += 1;
+
+ // if variable run length reach the max scope, write it
+ if (variableRunLength == MAX_SCOPE) {
+ determineEncoding();
+ writeValues();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void initializeLiterals(long val) {
+ literals[numLiterals++] = val;
+ fixedRunLength = 1;
+ variableRunLength = 1;
+ }
+
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
+ output.getPosition(recorder);
+ recorder.addPosition(numLiterals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java b/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java
new file mode 100644
index 0000000..3c7124b
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hive.orc.TypeDescription;
+
+/**
+ * Take the file types and the (optional) configuration column names/types and see if there
+ * has been schema evolution.
+ */
+public class SchemaEvolution {
+ // indexed by reader column id
+ private final TypeDescription[] readerFileTypes;
+ // indexed by reader column id
+ private final boolean[] readerIncluded;
+ // the offset to the first column id ignoring any ACID columns
+ private final int readerColumnOffset;
+ // indexed by file column id
+ private final boolean[] fileIncluded;
+ private final TypeDescription fileSchema;
+ private final TypeDescription readerSchema;
+ private boolean hasConversion;
+ // indexed by reader column id
+ private final boolean[] ppdSafeConversion;
+
+ public SchemaEvolution(TypeDescription fileSchema, boolean[] includedCols) {
+ this(fileSchema, null, includedCols);
+ }
+
+ public SchemaEvolution(TypeDescription fileSchema,
+ TypeDescription readerSchema,
+ boolean[] includeCols) {
+ this.readerIncluded = includeCols == null ? null : Arrays.copyOf(includeCols, includeCols.length);
+ this.fileIncluded = new boolean[fileSchema.getMaximumId() + 1];
+ this.hasConversion = false;
+ this.fileSchema = fileSchema;
+ boolean isAcid = checkAcidSchema(fileSchema);
+ this.readerColumnOffset = isAcid ? acidEventFieldNames.size() : 0;
+ if (readerSchema != null) {
+ if (isAcid) {
+ this.readerSchema = createEventSchema(readerSchema);
+ } else {
+ this.readerSchema = readerSchema;
+ }
+ if (readerIncluded != null &&
+ readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) {
+ throw new IllegalArgumentException("Include vector the wrong length: " +
+ this.readerSchema.toJson() + " with include length " +
+ readerIncluded.length);
+ }
+ this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
+ buildConversionFileTypesArray(fileSchema, this.readerSchema);
+ } else {
+ this.readerSchema = fileSchema;
+ this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
+ if (readerIncluded != null &&
+ readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) {
+ throw new IllegalArgumentException("Include vector the wrong length: " +
+ this.readerSchema.toJson() + " with include length " +
+ readerIncluded.length);
+ }
+ buildSameSchemaFileTypesArray();
+ }
+ this.ppdSafeConversion = populatePpdSafeConversion();
+ }
+
+ public TypeDescription getReaderSchema() {
+ return readerSchema;
+ }
+
+ /**
+ * Returns the non-ACID (aka base) reader type description.
+ *
+ * @return the reader type ignoring the ACID rowid columns, if any
+ */
+ public TypeDescription getReaderBaseSchema() {
+ return readerSchema.findSubtype(readerColumnOffset);
+ }
+
+ /**
+ * Is there Schema Evolution data type conversion?
+ * @return
+ */
+ public boolean hasConversion() {
+ return hasConversion;
+ }
+
+ public TypeDescription getFileType(TypeDescription readerType) {
+ return getFileType(readerType.getId());
+ }
+
+ /**
+ * Get whether each column is included from the reader's point of view.
+ * @return a boolean array indexed by reader column id
+ */
+ public boolean[] getReaderIncluded() {
+ return readerIncluded;
+ }
+
+ /**
+ * Get whether each column is included from the file's point of view.
+ * @return a boolean array indexed by file column id
+ */
+ public boolean[] getFileIncluded() {
+ return fileIncluded;
+ }
+
+ /**
+ * Get the file type by reader type id.
+ * @param id reader column id
+ * @return
+ */
+ public TypeDescription getFileType(int id) {
+ return readerFileTypes[id];
+ }
+
+ /**
+ * Check if column is safe for ppd evaluation
+ * @param colId reader column id
+ * @return true if the specified column is safe for ppd evaluation else false
+ */
+ public boolean isPPDSafeConversion(final int colId) {
+ if (hasConversion()) {
+ if (colId < 0 || colId >= ppdSafeConversion.length) {
+ return false;
+ }
+ return ppdSafeConversion[colId];
+ }
+
+ // when there is no schema evolution PPD is safe
+ return true;
+ }
+
+ private boolean[] populatePpdSafeConversion() {
+ if (fileSchema == null || readerSchema == null || readerFileTypes == null) {
+ return null;
+ }
+
+ boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
+ boolean safePpd = validatePPDConversion(fileSchema, readerSchema);
+ result[readerSchema.getId()] = safePpd;
+ List children = readerSchema.getChildren();
+ if (children != null) {
+ for (TypeDescription child : children) {
+ TypeDescription fileType = getFileType(child.getId());
+ safePpd = validatePPDConversion(fileType, child);
+ result[child.getId()] = safePpd;
+ }
+ }
+ return result;
+ }
+
+ private boolean validatePPDConversion(final TypeDescription fileType,
+ final TypeDescription readerType) {
+ if (fileType == null) {
+ return false;
+ }
+ if (fileType.getCategory().isPrimitive()) {
+ if (fileType.getCategory().equals(readerType.getCategory())) {
+ // for decimals alone do equality check to not mess up with precision change
+ if (fileType.getCategory().equals(TypeDescription.Category.DECIMAL) &&
+ !fileType.equals(readerType)) {
+ return false;
+ }
+ return true;
+ }
+
+ // only integer and string evolutions are safe
+ // byte -> short -> int -> long
+ // string <-> char <-> varchar
+ // NOTE: Float to double evolution is not safe as floats are stored as doubles in ORC's
+ // internal index, but when doing predicate evaluation for queries like "select * from
+ // orc_float where f = 74.72" the constant on the filter is converted from string -> double
+ // so the precisions will be different and the comparison will fail.
+ // Soon, we should convert all sargs that compare equality between floats or
+ // doubles to range predicates.
+
+ // Similarly string -> char and varchar -> char and vice versa is not possible, as ORC stores
+ // char with padded spaces in its internal index.
+ switch (fileType.getCategory()) {
+ case BYTE:
+ if (readerType.getCategory().equals(TypeDescription.Category.SHORT) ||
+ readerType.getCategory().equals(TypeDescription.Category.INT) ||
+ readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+ return true;
+ }
+ break;
+ case SHORT:
+ if (readerType.getCategory().equals(TypeDescription.Category.INT) ||
+ readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+ return true;
+ }
+ break;
+ case INT:
+ if (readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+ return true;
+ }
+ break;
+ case STRING:
+ if (readerType.getCategory().equals(TypeDescription.Category.VARCHAR)) {
+ return true;
+ }
+ break;
+ case VARCHAR:
+ if (readerType.getCategory().equals(TypeDescription.Category.STRING)) {
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Should we read the given reader column?
+ * @param readerId the id of column in the extended reader schema
+ * @return true if the column should be read
+ */
+ public boolean includeReaderColumn(int readerId) {
+ return readerIncluded == null ||
+ readerId <= readerColumnOffset ||
+ readerIncluded[readerId - readerColumnOffset];
+ }
+
+ void buildConversionFileTypesArray(TypeDescription fileType,
+ TypeDescription readerType) {
+ // if the column isn't included, don't map it
+ int readerId = readerType.getId();
+ if (!includeReaderColumn(readerId)) {
+ return;
+ }
+ boolean isOk = true;
+ // check the easy case first
+ if (fileType.getCategory() == readerType.getCategory()) {
+ switch (readerType.getCategory()) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case DOUBLE:
+ case FLOAT:
+ case STRING:
+ case TIMESTAMP:
+ case BINARY:
+ case DATE:
+ // these are always a match
+ break;
+ case CHAR:
+ case VARCHAR:
+ // We do conversion when same CHAR/VARCHAR type but different maxLength.
+ if (fileType.getMaxLength() != readerType.getMaxLength()) {
+ hasConversion = true;
+ }
+ break;
+ case DECIMAL:
+ // We do conversion when same DECIMAL type but different precision/scale.
+ if (fileType.getPrecision() != readerType.getPrecision() ||
+ fileType.getScale() != readerType.getScale()) {
+ hasConversion = true;
+ }
+ break;
+ case UNION:
+ case MAP:
+ case LIST: {
+ // these must be an exact match
+ List fileChildren = fileType.getChildren();
+ List readerChildren = readerType.getChildren();
+ if (fileChildren.size() == readerChildren.size()) {
+ for(int i=0; i < fileChildren.size(); ++i) {
+ buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i));
+ }
+ } else {
+ isOk = false;
+ }
+ break;
+ }
+ case STRUCT: {
+ // allow either side to have fewer fields than the other
+ List fileChildren = fileType.getChildren();
+ List readerChildren = readerType.getChildren();
+ if (fileChildren.size() != readerChildren.size()) {
+ hasConversion = true;
+ }
+ int jointSize = Math.min(fileChildren.size(), readerChildren.size());
+ for(int i=0; i < jointSize; ++i) {
+ buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i));
+ }
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unknown type " + readerType);
+ }
+ } else {
+ /*
+ * Check for the few cases where will not convert....
+ */
+
+ isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType);
+ hasConversion = true;
+ }
+ if (isOk) {
+ if (readerFileTypes[readerId] != null) {
+ throw new RuntimeException("reader to file type entry already assigned");
+ }
+ readerFileTypes[readerId] = fileType;
+ fileIncluded[fileType.getId()] = true;
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)",
+ fileType.toString(), fileType.getId(),
+ readerType.toString(), readerId));
+ }
+ }
+
+ /**
+ * Use to make a reader to file type array when the schema is the same.
+ * @return
+ */
+ private void buildSameSchemaFileTypesArray() {
+ buildSameSchemaFileTypesArrayRecurse(readerSchema);
+ }
+
+ void buildSameSchemaFileTypesArrayRecurse(TypeDescription readerType) {
+ int id = readerType.getId();
+ if (!includeReaderColumn(id)) {
+ return;
+ }
+ if (readerFileTypes[id] != null) {
+ throw new RuntimeException("reader to file type entry already assigned");
+ }
+ readerFileTypes[id] = readerType;
+ fileIncluded[id] = true;
+ List children = readerType.getChildren();
+ if (children != null) {
+ for (TypeDescription child : children) {
+ buildSameSchemaFileTypesArrayRecurse(child);
+ }
+ }
+ }
+
+ private static boolean checkAcidSchema(TypeDescription type) {
+ if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
+ List rootFields = type.getFieldNames();
+ if (acidEventFieldNames.equals(rootFields)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param typeDescr
+ * @return ORC types for the ACID event based on the row's type description
+ */
+ public static TypeDescription createEventSchema(TypeDescription typeDescr) {
+ TypeDescription result = TypeDescription.createStruct()
+ .addField("operation", TypeDescription.createInt())
+ .addField("originalTransaction", TypeDescription.createLong())
+ .addField("bucket", TypeDescription.createInt())
+ .addField("rowId", TypeDescription.createLong())
+ .addField("currentTransaction", TypeDescription.createLong())
+ .addField("row", typeDescr.clone());
+ return result;
+ }
+
+ public static final List acidEventFieldNames= new ArrayList();
+ static {
+ acidEventFieldNames.add("operation");
+ acidEventFieldNames.add("originalTransaction");
+ acidEventFieldNames.add("bucket");
+ acidEventFieldNames.add("rowId");
+ acidEventFieldNames.add("currentTransaction");
+ acidEventFieldNames.add("row");
+ }
+}