hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [28/37] hive git commit: HIVE-17118. Move the hive-orc source files to make the package names unique.
Date Wed, 19 Jul 2017 16:58:51 GMT
http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java
new file mode 100644
index 0000000..b94b6a9
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthByteWriter.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of bytes. A control byte is written before
+ * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
+ * bytes is -1 to -128, 1 to 128 literal byte values follow.
+ */
+public class RunLengthByteWriter {
+  static final int MIN_REPEAT_SIZE = 3;
+  static final int MAX_LITERAL_SIZE = 128;
+  static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
+  private final PositionedOutputStream output;
+  private final byte[] literals = new byte[MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private boolean repeat = false;
+  private int tailRunLength = 0;
+
+  public RunLengthByteWriter(PositionedOutputStream output) {
+    this.output = output;
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+      if (repeat) {
+        output.write(numLiterals - MIN_REPEAT_SIZE);
+        output.write(literals, 0, 1);
+     } else {
+        output.write(-numLiterals);
+        output.write(literals, 0, numLiterals);
+      }
+      repeat = false;
+      tailRunLength = 0;
+      numLiterals = 0;
+    }
+  }
+
+  public void flush() throws IOException {
+    writeValues();
+    output.flush();
+  }
+
+  public void write(byte value) throws IOException {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0]) {
+        numLiterals += 1;
+        if (numLiterals == MAX_REPEAT_SIZE) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (value == literals[numLiterals - 1]) {
+        tailRunLength += 1;
+      } else {
+        tailRunLength = 1;
+      }
+      if (tailRunLength == MIN_REPEAT_SIZE) {
+        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= MIN_REPEAT_SIZE - 1;
+          writeValues();
+          literals[0] = value;
+          repeat = true;
+          numLiterals = MIN_REPEAT_SIZE;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java
new file mode 100644
index 0000000..5b613f6
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReader.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+
+/**
+ * A reader that reads a sequence of integers.
+ * */
+public class RunLengthIntegerReader implements IntegerReader {
+  private InStream input;
+  private final boolean signed;
+  private final long[] literals =
+    new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private int delta = 0;
+  private int used = 0;
+  private boolean repeat = false;
+  private SerializationUtils utils;
+
+  public RunLengthIntegerReader(InStream input, boolean signed) throws IOException {
+    this.input = input;
+    this.signed = signed;
+    this.utils = new SerializationUtils();
+  }
+
+  private void readValues(boolean ignoreEof) throws IOException {
+    int control = input.read();
+    if (control == -1) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of RLE integer from " + input);
+      }
+      used = numLiterals = 0;
+      return;
+    } else if (control < 0x80) {
+      numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE;
+      used = 0;
+      repeat = true;
+      delta = input.read();
+      if (delta == -1) {
+        throw new EOFException("End of stream in RLE Integer from " + input);
+      }
+      // convert from 0 to 255 to -128 to 127 by converting to a signed byte
+      delta = (byte) (0 + delta);
+      if (signed) {
+        literals[0] = utils.readVslong(input);
+      } else {
+        literals[0] = utils.readVulong(input);
+      }
+    } else {
+      repeat = false;
+      numLiterals = 0x100 - control;
+      used = 0;
+      for(int i=0; i < numLiterals; ++i) {
+        if (signed) {
+          literals[i] = utils.readVslong(input);
+        } else {
+          literals[i] = utils.readVulong(input);
+        }
+      }
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  @Override
+  public long next() throws IOException {
+    long result;
+    if (used == numLiterals) {
+      readValues(false);
+    }
+    if (repeat) {
+      result = literals[0] + (used++) * delta;
+    } else {
+      result = literals[used++];
+    }
+    return result;
+  }
+
+  @Override
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        data[i] = next();
+      } else {
+        // The default value of null for int type in vectorized
+        // processing is 1, so set that if the value is null
+        data[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two parts
+      while (consumed > 0) {
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    while (numValues > 0) {
+      if (used == numLiterals) {
+        readValues(false);
+      }
+      long consume = Math.min(numValues, numLiterals - used);
+      used += consume;
+      numValues -= consume;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java
new file mode 100644
index 0000000..d0c2b54
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerReaderV2.java
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A reader that reads a sequence of light weight compressed integers. Refer
+ * {@link RunLengthIntegerWriterV2} for description of various lightweight
+ * compression techniques.
+ */
+public class RunLengthIntegerReaderV2 implements IntegerReader {
+  public static final Logger LOG = LoggerFactory.getLogger(RunLengthIntegerReaderV2.class);
+
+  private InStream input;
+  private final boolean signed;
+  private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+  private boolean isRepeating = false;
+  private int numLiterals = 0;
+  private int used = 0;
+  private final boolean skipCorrupt;
+  private final SerializationUtils utils;
+  private RunLengthIntegerWriterV2.EncodingType currentEncoding;
+
+  public RunLengthIntegerReaderV2(InStream input, boolean signed,
+      boolean skipCorrupt) throws IOException {
+    this.input = input;
+    this.signed = signed;
+    this.skipCorrupt = skipCorrupt;
+    this.utils = new SerializationUtils();
+  }
+
+  private final static RunLengthIntegerWriterV2.EncodingType[] encodings = RunLengthIntegerWriterV2.EncodingType.values();
+  private void readValues(boolean ignoreEof) throws IOException {
+    // read the first 2 bits and determine the encoding type
+    isRepeating = false;
+    int firstByte = input.read();
+    if (firstByte < 0) {
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of RLE integer from " + input);
+      }
+      used = numLiterals = 0;
+      return;
+    }
+    currentEncoding = encodings[(firstByte >>> 6) & 0x03];
+    switch (currentEncoding) {
+    case SHORT_REPEAT: readShortRepeatValues(firstByte); break;
+    case DIRECT: readDirectValues(firstByte); break;
+    case PATCHED_BASE: readPatchedBaseValues(firstByte); break;
+    case DELTA: readDeltaValues(firstByte); break;
+    default: throw new IOException("Unknown encoding " + currentEncoding);
+    }
+  }
+
+  private void readDeltaValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fb = (firstByte >>> 1) & 0x1f;
+    if (fb != 0) {
+      fb = utils.decodeBitWidth(fb);
+    }
+
+    // extract the blob run length
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+
+    // read the first value stored as vint
+    long firstVal = 0;
+    if (signed) {
+      firstVal = utils.readVslong(input);
+    } else {
+      firstVal = utils.readVulong(input);
+    }
+
+    // store first value to result buffer
+    long prevVal = firstVal;
+    literals[numLiterals++] = firstVal;
+
+    // if fixed bits is 0 then all values have fixed delta
+    if (fb == 0) {
+      // read the fixed delta value stored as vint (deltas can be negative even
+      // if all number are positive)
+      long fd = utils.readVslong(input);
+      if (fd == 0) {
+        isRepeating = true;
+        assert numLiterals == 1;
+        Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]);
+        numLiterals += len;
+      } else {
+        // add fixed deltas to adjacent values
+        for(int i = 0; i < len; i++) {
+          literals[numLiterals++] = literals[numLiterals - 2] + fd;
+        }
+      }
+    } else {
+      long deltaBase = utils.readVslong(input);
+      // add delta base and first value
+      literals[numLiterals++] = firstVal + deltaBase;
+      prevVal = literals[numLiterals - 1];
+      len -= 1;
+
+      // write the unpacked values, add it to previous value and store final
+      // value to result buffer. if the delta base value is negative then it
+      // is a decreasing sequence else an increasing sequence
+      utils.readInts(literals, numLiterals, len, fb, input);
+      while (len > 0) {
+        if (deltaBase < 0) {
+          literals[numLiterals] = prevVal - literals[numLiterals];
+        } else {
+          literals[numLiterals] = prevVal + literals[numLiterals];
+        }
+        prevVal = literals[numLiterals];
+        len--;
+        numLiterals++;
+      }
+    }
+  }
+
+  private void readPatchedBaseValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fbo = (firstByte >>> 1) & 0x1f;
+    int fb = utils.decodeBitWidth(fbo);
+
+    // extract the run length of data blob
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+    // runs are always one off
+    len += 1;
+
+    // extract the number of bytes occupied by base
+    int thirdByte = input.read();
+    int bw = (thirdByte >>> 5) & 0x07;
+    // base width is one off
+    bw += 1;
+
+    // extract patch width
+    int pwo = thirdByte & 0x1f;
+    int pw = utils.decodeBitWidth(pwo);
+
+    // read fourth byte and extract patch gap width
+    int fourthByte = input.read();
+    int pgw = (fourthByte >>> 5) & 0x07;
+    // patch gap width is one off
+    pgw += 1;
+
+    // extract the length of the patch list
+    int pl = fourthByte & 0x1f;
+
+    // read the next base width number of bytes to extract base value
+    long base = utils.bytesToLongBE(input, bw);
+    long mask = (1L << ((bw * 8) - 1));
+    // if MSB of base value is 1 then base is negative value else positive
+    if ((base & mask) != 0) {
+      base = base & ~mask;
+      base = -base;
+    }
+
+    // unpack the data blob
+    long[] unpacked = new long[len];
+    utils.readInts(unpacked, 0, len, fb, input);
+
+    // unpack the patch blob
+    long[] unpackedPatch = new long[pl];
+
+    if ((pw + pgw) > 64 && !skipCorrupt) {
+      throw new IOException("Corruption in ORC data encountered. To skip" +
+          " reading corrupted data, set hive.exec.orc.skip.corrupt.data to" +
+          " true");
+    }
+    int bitSize = utils.getClosestFixedBits(pw + pgw);
+    utils.readInts(unpackedPatch, 0, pl, bitSize, input);
+
+    // apply the patch directly when decoding the packed data
+    int patchIdx = 0;
+    long currGap = 0;
+    long currPatch = 0;
+    long patchMask = ((1L << pw) - 1);
+    currGap = unpackedPatch[patchIdx] >>> pw;
+    currPatch = unpackedPatch[patchIdx] & patchMask;
+    long actualGap = 0;
+
+    // special case: gap is >255 then patch value will be 0.
+    // if gap is <=255 then patch value cannot be 0
+    while (currGap == 255 && currPatch == 0) {
+      actualGap += 255;
+      patchIdx++;
+      currGap = unpackedPatch[patchIdx] >>> pw;
+      currPatch = unpackedPatch[patchIdx] & patchMask;
+    }
+    // add the left over gap
+    actualGap += currGap;
+
+    // unpack data blob, patch it (if required), add base to get final result
+    for(int i = 0; i < unpacked.length; i++) {
+      if (i == actualGap) {
+        // extract the patch value
+        long patchedVal = unpacked[i] | (currPatch << fb);
+
+        // add base to patched value
+        literals[numLiterals++] = base + patchedVal;
+
+        // increment the patch to point to next entry in patch list
+        patchIdx++;
+
+        if (patchIdx < pl) {
+          // read the next gap and patch
+          currGap = unpackedPatch[patchIdx] >>> pw;
+          currPatch = unpackedPatch[patchIdx] & patchMask;
+          actualGap = 0;
+
+          // special case: gap is >255 then patch will be 0. if gap is
+          // <=255 then patch cannot be 0
+          while (currGap == 255 && currPatch == 0) {
+            actualGap += 255;
+            patchIdx++;
+            currGap = unpackedPatch[patchIdx] >>> pw;
+            currPatch = unpackedPatch[patchIdx] & patchMask;
+          }
+          // add the left over gap
+          actualGap += currGap;
+
+          // next gap is relative to the current gap
+          actualGap += i;
+        }
+      } else {
+        // no patching required. add base to unpacked value to get final value
+        literals[numLiterals++] = base + unpacked[i];
+      }
+    }
+
+  }
+
+  private void readDirectValues(int firstByte) throws IOException {
+
+    // extract the number of fixed bits
+    int fbo = (firstByte >>> 1) & 0x1f;
+    int fb = utils.decodeBitWidth(fbo);
+
+    // extract the run length
+    int len = (firstByte & 0x01) << 8;
+    len |= input.read();
+    // runs are one off
+    len += 1;
+
+    // write the unpacked values and zigzag decode to result buffer
+    utils.readInts(literals, numLiterals, len, fb, input);
+    if (signed) {
+      for(int i = 0; i < len; i++) {
+        literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]);
+        numLiterals++;
+      }
+    } else {
+      numLiterals += len;
+    }
+  }
+
+  private void readShortRepeatValues(int firstByte) throws IOException {
+
+    // read the number of bytes occupied by the value
+    int size = (firstByte >>> 3) & 0x07;
+    // #bytes are one off
+    size += 1;
+
+    // read the run length
+    int len = firstByte & 0x07;
+    // run lengths values are stored only after MIN_REPEAT value is met
+    len += RunLengthIntegerWriterV2.MIN_REPEAT;
+
+    // read the repeated value which is store using fixed bytes
+    long val = utils.bytesToLongBE(input, size);
+
+    if (signed) {
+      val = utils.zigzagDecode(val);
+    }
+
+    if (numLiterals != 0) {
+      // Currently this always holds, which makes peekNextAvailLength simpler.
+      // If this changes, peekNextAvailLength should be adjusted accordingly.
+      throw new AssertionError("readValues called with existing values present");
+    }
+    // repeat the value for length times
+    isRepeating = true;
+    // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0
+    for(int i = 0; i < len; i++) {
+      literals[i] = val;
+    }
+    numLiterals = len;
+  }
+
+  @Override
+  public boolean hasNext() throws IOException {
+    return used != numLiterals || input.available() > 0;
+  }
+
+  @Override
+  public long next() throws IOException {
+    long result;
+    if (used == numLiterals) {
+      numLiterals = 0;
+      used = 0;
+      readValues(false);
+    }
+    result = literals[used++];
+    return result;
+  }
+
+  @Override
+  public void seek(PositionProvider index) throws IOException {
+    input.seek(index);
+    int consumed = (int) index.getNext();
+    if (consumed != 0) {
+      // a loop is required for cases where we break the run into two
+      // parts
+      while (consumed > 0) {
+        numLiterals = 0;
+        readValues(false);
+        used = consumed;
+        consumed -= numLiterals;
+      }
+    } else {
+      used = 0;
+      numLiterals = 0;
+    }
+  }
+
+  @Override
+  public void skip(long numValues) throws IOException {
+    while (numValues > 0) {
+      if (used == numLiterals) {
+        numLiterals = 0;
+        used = 0;
+        readValues(false);
+      }
+      long consume = Math.min(numValues, numLiterals - used);
+      used += consume;
+      numValues -= consume;
+    }
+  }
+
+  @Override
+  public void nextVector(ColumnVector previous,
+                         long[] data,
+                         int previousLen) throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        data[i] = next();
+      } else {
+        // The default value of null for int type in vectorized
+        // processing is 1, so set that if the value is null
+        data[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && (data[0] != data[i] ||
+          previous.isNull[0] != previous.isNull[i])) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+  @Override
+  public void nextVector(ColumnVector vector,
+                         int[] data,
+                         int size) throws IOException {
+    if (vector.noNulls) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        data[r] = (int) next();
+      }
+    } else if (!(vector.isRepeating && vector.isNull[0])) {
+      for(int r=0; r < data.length && r < size; ++r) {
+        if (!vector.isNull[r]) {
+          data[r] = (int) next();
+        } else {
+          data[r] = 1;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java
new file mode 100644
index 0000000..2153001
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriter.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A streamFactory that writes a sequence of integers. A control byte is written before
+ * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each
+ * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
+ * literal vint values follow.
+ */
+public class RunLengthIntegerWriter implements IntegerWriter {
+  static final int MIN_REPEAT_SIZE = 3;
+  static final int MAX_DELTA = 127;
+  static final int MIN_DELTA = -128;
+  static final int MAX_LITERAL_SIZE = 128;
+  private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
+  private final PositionedOutputStream output;
+  private final boolean signed;
+  private final long[] literals = new long[MAX_LITERAL_SIZE];
+  private int numLiterals = 0;
+  private long delta = 0;
+  private boolean repeat = false;
+  private int tailRunLength = 0;
+  private SerializationUtils utils;
+
+  public RunLengthIntegerWriter(PositionedOutputStream output,
+                         boolean signed) {
+    this.output = output;
+    this.signed = signed;
+    this.utils = new SerializationUtils();
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+      if (repeat) {
+        output.write(numLiterals - MIN_REPEAT_SIZE);
+        output.write((byte) delta);
+        if (signed) {
+          utils.writeVslong(output, literals[0]);
+        } else {
+          utils.writeVulong(output, literals[0]);
+        }
+      } else {
+        output.write(-numLiterals);
+        for(int i=0; i < numLiterals; ++i) {
+          if (signed) {
+            utils.writeVslong(output, literals[i]);
+          } else {
+            utils.writeVulong(output, literals[i]);
+          }
+        }
+      }
+      repeat = false;
+      numLiterals = 0;
+      tailRunLength = 0;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    writeValues();
+    output.flush();
+  }
+
+  @Override
+  public void write(long value) throws IOException {
+    if (numLiterals == 0) {
+      literals[numLiterals++] = value;
+      tailRunLength = 1;
+    } else if (repeat) {
+      if (value == literals[0] + delta * numLiterals) {
+        numLiterals += 1;
+        if (numLiterals == MAX_REPEAT_SIZE) {
+          writeValues();
+        }
+      } else {
+        writeValues();
+        literals[numLiterals++] = value;
+        tailRunLength = 1;
+      }
+    } else {
+      if (tailRunLength == 1) {
+        delta = value - literals[numLiterals - 1];
+        if (delta < MIN_DELTA || delta > MAX_DELTA) {
+          tailRunLength = 1;
+        } else {
+          tailRunLength = 2;
+        }
+      } else if (value == literals[numLiterals - 1] + delta) {
+        tailRunLength += 1;
+      } else {
+        delta = value - literals[numLiterals - 1];
+        if (delta < MIN_DELTA || delta > MAX_DELTA) {
+          tailRunLength = 1;
+        } else {
+          tailRunLength = 2;
+        }
+      }
+      if (tailRunLength == MIN_REPEAT_SIZE) {
+        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
+          repeat = true;
+          numLiterals += 1;
+        } else {
+          numLiterals -= MIN_REPEAT_SIZE - 1;
+          long base = literals[numLiterals];
+          writeValues();
+          literals[0] = base;
+          repeat = true;
+          numLiterals = MIN_REPEAT_SIZE;
+        }
+      } else {
+        literals[numLiterals++] = value;
+        if (numLiterals == MAX_LITERAL_SIZE) {
+          writeValues();
+        }
+      }
+    }
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java
new file mode 100644
index 0000000..1140ab4
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/RunLengthIntegerWriterV2.java
@@ -0,0 +1,831 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.orc.impl;
+
+import java.io.IOException;
+
+/**
+ * A writer that performs light weight compression over sequence of integers.
+ * <p>
+ * There are four types of lightweight integer compression
+ * <ul>
+ * <li>SHORT_REPEAT</li>
+ * <li>DIRECT</li>
+ * <li>PATCHED_BASE</li>
+ * <li>DELTA</li>
+ * </ul>
+ * </p>
+ * The description and format for these types are as below:
+ * <p>
+ * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences.
+ * <ul>
+ * <li>1 byte header
+ * <ul>
+ * <li>2 bits for encoding type</li>
+ * <li>3 bits for bytes required for repeating value</li>
+ * <li>3 bits for repeat count (MIN_REPEAT + run length)</li>
+ * </ul>
+ * </li>
+ * <li>Blob - repeat value (fixed bytes)</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DIRECT:</b> Used for random integer sequences whose number of bit
+ * requirement doesn't vary a lot.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Blob - stores the direct values using fixed bit width. The length of the
+ * data blob is (fixed width * run length) bits long</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit
+ * requirement varies beyond a threshold.
+ * <ul>
+ * <li>4 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * <ul>
+ * 3rd byte
+ * <li>3 bits for bytes required to encode base value</li>
+ * <li>5 bits for patch width</li>
+ * </ul>
+ * <ul>
+ * 4th byte
+ * <li>3 bits for patch gap width</li>
+ * <li>5 bits for patch length</li>
+ * </ul>
+ * </li>
+ * <li>Base value - Stored using fixed number of bytes. If MSB is set, base
+ * value is negative else positive. Length of base value is (base width * 8)
+ * bits.</li>
+ * <li>Data blob - Base reduced values as stored using fixed bit width. Length
+ * of data blob is (fixed width * run length) bits.</li>
+ * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in
+ * the patch list is (patch width + patch gap width) bits long. Gap between the
+ * subsequent elements to be patched are stored in upper part of entry whereas
+ * patch values are stored in lower part of entry. Length of patch blob is
+ * ((patch width + patch gap width) * patch length) bits.</li>
+ * </ul>
+ * </p>
+ * <p>
+ * <b>DELTA</b> Used for monotonically increasing or decreasing sequences,
+ * sequences with fixed delta values or long repeated sequences.
+ * <ul>
+ * <li>2 bytes header
+ * <ul>
+ * 1st byte
+ * <li>2 bits for encoding type</li>
+ * <li>5 bits for fixed bit width of values in blob</li>
+ * <li>1 bit for storing MSB of run length</li>
+ * </ul>
+ * <ul>
+ * 2nd byte
+ * <li>8 bits for lower run length bits</li>
+ * </ul>
+ * </li>
+ * <li>Base value - zigzag encoded value written as varint</li>
+ * <li>Delta base - zigzag encoded value written as varint</li>
+ * <li>Delta blob - only positive values. monotonicity and orderness are decided
+ * based on the sign of the base value and delta base</li>
+ * </ul>
+ * </p>
+ */
+public class RunLengthIntegerWriterV2 implements IntegerWriter {
+
+  public enum EncodingType {
+    SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
+  }
+
+  static final int MAX_SCOPE = 512;
+  static final int MIN_REPEAT = 3;
+  private static final int MAX_SHORT_REPEAT_LENGTH = 10;
+  private long prevDelta = 0;
+  private int fixedRunLength = 0;
+  private int variableRunLength = 0;
+  private final long[] literals = new long[MAX_SCOPE];
+  private final PositionedOutputStream output;
+  private final boolean signed;
+  private EncodingType encoding;
+  private int numLiterals;
+  private final long[] zigzagLiterals = new long[MAX_SCOPE];
+  private final long[] baseRedLiterals = new long[MAX_SCOPE];
+  private final long[] adjDeltas = new long[MAX_SCOPE];
+  private long fixedDelta;
+  private int zzBits90p;
+  private int zzBits100p;
+  private int brBits95p;
+  private int brBits100p;
+  private int bitsDeltaMax;
+  private int patchWidth;
+  private int patchGapWidth;
+  private int patchLength;
+  private long[] gapVsPatchList;
+  private long min;
+  private boolean isFixedDelta;
+  private SerializationUtils utils;
+  private boolean alignedBitpacking;
+
+  RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
+    this(output, signed, true);
+  }
+
+  public RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
+      boolean alignedBitpacking) {
+    this.output = output;
+    this.signed = signed;
+    this.alignedBitpacking = alignedBitpacking;
+    this.utils = new SerializationUtils();
+    clear();
+  }
+
+  private void writeValues() throws IOException {
+    if (numLiterals != 0) {
+
+      if (encoding.equals(EncodingType.SHORT_REPEAT)) {
+        writeShortRepeatValues();
+      } else if (encoding.equals(EncodingType.DIRECT)) {
+        writeDirectValues();
+      } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
+        writePatchedBaseValues();
+      } else {
+        writeDeltaValues();
+      }
+
+      // clear all the variables
+      clear();
+    }
+  }
+
+  private void writeDeltaValues() throws IOException {
+    int len = 0;
+    int fb = bitsDeltaMax;
+    int efb = 0;
+
+    if (alignedBitpacking) {
+      fb = utils.getClosestAlignedFixedBits(fb);
+    }
+
+    if (isFixedDelta) {
+      // if fixed run length is greater than threshold then it will be fixed
+      // delta sequence with delta value 0 else fixed delta sequence with
+      // non-zero delta value
+      if (fixedRunLength > MIN_REPEAT) {
+        // ex. sequence: 2 2 2 2 2 2 2 2
+        len = fixedRunLength - 1;
+        fixedRunLength = 0;
+      } else {
+        // ex. sequence: 4 6 8 10 12 14 16
+        len = variableRunLength - 1;
+        variableRunLength = 0;
+      }
+    } else {
+      // fixed width 0 is used for long repeating values.
+      // sequences that require only 1 bit to encode will have an additional bit
+      if (fb == 1) {
+        fb = 2;
+      }
+      efb = utils.encodeBitWidth(fb);
+      efb = efb << 1;
+      len = variableRunLength - 1;
+      variableRunLength = 0;
+    }
+
+    // extract the 9th bit of run length
+    final int tailBits = (len & 0x100) >>> 8;
+
+    // create first byte of the header
+    final int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    final int headerSecondByte = len & 0xff;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+
+    // store the first value from zigzag literal array
+    if (signed) {
+      utils.writeVslong(output, literals[0]);
+    } else {
+      utils.writeVulong(output, literals[0]);
+    }
+
+    if (isFixedDelta) {
+      // if delta is fixed then we don't need to store delta blob
+      utils.writeVslong(output, fixedDelta);
+    } else {
+      // store the first value as delta value using zigzag encoding
+      utils.writeVslong(output, adjDeltas[0]);
+
+      // adjacent delta values are bit packed. The length of adjDeltas array is
+      // always one less than the number of literals (delta difference for n
+      // elements is n-1). We have already written one element, write the
+      // remaining numLiterals - 2 elements here
+      utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output);
+    }
+  }
+
+  private void writePatchedBaseValues() throws IOException {
+
+    // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
+    // because patch is applied to MSB bits. For example: If fixed bit width of
+    // base value is 7 bits and if patch is 3 bits, the actual value is
+    // constructed by shifting the patch to left by 7 positions.
+    // actual_value = patch << 7 | base_value
+    // So, if we align base_value then actual_value can not be reconstructed.
+
+    // write the number of fixed bits required in next 5 bits
+    final int fb = brBits95p;
+    final int efb = utils.encodeBitWidth(fb) << 1;
+
+    // adjust variable run length, they are one off
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+    // create first byte of the header
+    final int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    final int headerSecondByte = variableRunLength & 0xff;
+
+    // if the min value is negative toggle the sign
+    final boolean isNegative = min < 0 ? true : false;
+    if (isNegative) {
+      min = -min;
+    }
+
+    // find the number of bytes required for base and shift it by 5 bits
+    // to accommodate patch width. The additional bit is used to store the sign
+    // of the base value.
+    final int baseWidth = utils.findClosestNumBits(min) + 1;
+    final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
+    final int bb = (baseBytes - 1) << 5;
+
+    // if the base value is negative then set MSB to 1
+    if (isNegative) {
+      min |= (1L << ((baseBytes * 8) - 1));
+    }
+
+    // third byte contains 3 bits for number of bytes occupied by base
+    // and 5 bits for patchWidth
+    final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
+
+    // fourth byte contains 3 bits for page gap width and 5 bits for
+    // patch length
+    final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+    output.write(headerThirdByte);
+    output.write(headerFourthByte);
+
+    // write the base value using fixed bytes in big endian order
+    for(int i = baseBytes - 1; i >= 0; i--) {
+      byte b = (byte) ((min >>> (i * 8)) & 0xff);
+      output.write(b);
+    }
+
+    // base reduced literals are bit packed
+    int closestFixedBits = utils.getClosestFixedBits(fb);
+
+    utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits,
+        output);
+
+    // write patch list
+    closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
+
+    utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
+        output);
+
+    // reset run length
+    variableRunLength = 0;
+  }
+
+  /**
+   * Store the opcode in 2 MSB bits
+   * @return opcode
+   */
+  private int getOpcode() {
+    return encoding.ordinal() << 6;
+  }
+
+  private void writeDirectValues() throws IOException {
+
+    // write the number of fixed bits required in next 5 bits
+    int fb = zzBits100p;
+
+    if (alignedBitpacking) {
+      fb = utils.getClosestAlignedFixedBits(fb);
+    }
+
+    final int efb = utils.encodeBitWidth(fb) << 1;
+
+    // adjust variable run length
+    variableRunLength -= 1;
+
+    // extract the 9th bit of run length
+    final int tailBits = (variableRunLength & 0x100) >>> 8;
+
+    // create first byte of the header
+    final int headerFirstByte = getOpcode() | efb | tailBits;
+
+    // second byte of the header stores the remaining 8 bits of runlength
+    final int headerSecondByte = variableRunLength & 0xff;
+
+    // write header
+    output.write(headerFirstByte);
+    output.write(headerSecondByte);
+
+    // bit packing the zigzag encoded literals
+    utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output);
+
+    // reset run length
+    variableRunLength = 0;
+  }
+
+  private void writeShortRepeatValues() throws IOException {
+    // get the value that is repeating, compute the bits and bytes required
+    long repeatVal = 0;
+    if (signed) {
+      repeatVal = utils.zigzagEncode(literals[0]);
+    } else {
+      repeatVal = literals[0];
+    }
+
+    final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
+    final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
+        : (numBitsRepeatVal >>> 3) + 1;
+
+    // write encoding type in top 2 bits
+    int header = getOpcode();
+
+    // write the number of bytes required for the value
+    header |= ((numBytesRepeatVal - 1) << 3);
+
+    // write the run length
+    fixedRunLength -= MIN_REPEAT;
+    header |= fixedRunLength;
+
+    // write the header
+    output.write(header);
+
+    // write the repeating value in big endian byte order
+    for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
+      int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
+      output.write(b);
+    }
+
+    fixedRunLength = 0;
+  }
+
+  private void determineEncoding() {
+
+    // we need to compute zigzag values for DIRECT encoding if we decide to
+    // break early for delta overflows or for shorter runs
+    computeZigZagLiterals();
+
+    zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
+
+    // not a big win for shorter runs to determine encoding
+    if (numLiterals <= MIN_REPEAT) {
+      encoding = EncodingType.DIRECT;
+      return;
+    }
+
+    // DELTA encoding check
+
+    // for identifying monotonic sequences
+    boolean isIncreasing = true;
+    boolean isDecreasing = true;
+    this.isFixedDelta = true;
+
+    this.min = literals[0];
+    long max = literals[0];
+    final long initialDelta = literals[1] - literals[0];
+    long currDelta = initialDelta;
+    long deltaMax = initialDelta;
+    this.adjDeltas[0] = initialDelta;
+
+    for (int i = 1; i < numLiterals; i++) {
+      final long l1 = literals[i];
+      final long l0 = literals[i - 1];
+      currDelta = l1 - l0;
+      min = Math.min(min, l1);
+      max = Math.max(max, l1);
+
+      isIncreasing &= (l0 <= l1);
+      isDecreasing &= (l0 >= l1);
+
+      isFixedDelta &= (currDelta == initialDelta);
+      if (i > 1) {
+        adjDeltas[i - 1] = Math.abs(currDelta);
+        deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
+      }
+    }
+
+    // its faster to exit under delta overflow condition without checking for
+    // PATCHED_BASE condition as encoding using DIRECT is faster and has less
+    // overhead than PATCHED_BASE
+    if (!utils.isSafeSubtract(max, min)) {
+      encoding = EncodingType.DIRECT;
+      return;
+    }
+
+    // invariant - subtracting any number from any other in the literals after
+    // this point won't overflow
+
+    // if min is equal to max then the delta is 0, this condition happens for
+    // fixed values run >10 which cannot be encoded with SHORT_REPEAT
+    if (min == max) {
+      assert isFixedDelta : min + "==" + max +
+          ", isFixedDelta cannot be false";
+      assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
+      fixedDelta = 0;
+      encoding = EncodingType.DELTA;
+      return;
+    }
+
+    if (isFixedDelta) {
+      assert currDelta == initialDelta
+          : "currDelta should be equal to initialDelta for fixed delta encoding";
+      encoding = EncodingType.DELTA;
+      fixedDelta = currDelta;
+      return;
+    }
+
+    // if initialDelta is 0 then we cannot delta encode as we cannot identify
+    // the sign of deltas (increasing or decreasing)
+    if (initialDelta != 0) {
+      // stores the number of bits required for packing delta blob in
+      // delta encoding
+      bitsDeltaMax = utils.findClosestNumBits(deltaMax);
+
+      // monotonic condition
+      if (isIncreasing || isDecreasing) {
+        encoding = EncodingType.DELTA;
+        return;
+      }
+    }
+
+    // PATCHED_BASE encoding check
+
+    // percentile values are computed for the zigzag encoded values. if the
+    // number of bit requirement between 90th and 100th percentile varies
+    // beyond a threshold then we need to patch the values. if the variation
+    // is not significant then we can use direct encoding
+
+    zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
+    int diffBitsLH = zzBits100p - zzBits90p;
+
+    // if the difference between 90th percentile and 100th percentile fixed
+    // bits is > 1 then we need patch the values
+    if (diffBitsLH > 1) {
+
+      // patching is done only on base reduced values.
+      // remove base from literals
+      for (int i = 0; i < numLiterals; i++) {
+        baseRedLiterals[i] = literals[i] - min;
+      }
+
+      // 95th percentile width is used to determine max allowed value
+      // after which patching will be done
+      brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
+
+      // 100th percentile is used to compute the max patch width
+      brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
+
+      // after base reducing the values, if the difference in bits between
+      // 95th percentile and 100th percentile value is zero then there
+      // is no point in patching the values, in which case we will
+      // fallback to DIRECT encoding.
+      // The decision to use patched base was based on zigzag values, but the
+      // actual patching is done on base reduced literals.
+      if ((brBits100p - brBits95p) != 0) {
+        encoding = EncodingType.PATCHED_BASE;
+        preparePatchedBlob();
+        return;
+      } else {
+        encoding = EncodingType.DIRECT;
+        return;
+      }
+    } else {
+      // if difference in bits between 95th percentile and 100th percentile is
+      // 0, then patch length will become 0. Hence we will fallback to direct
+      encoding = EncodingType.DIRECT;
+      return;
+    }
+  }
+
+  private void computeZigZagLiterals() {
+    // populate zigzag encoded literals
+    long zzEncVal = 0;
+    for (int i = 0; i < numLiterals; i++) {
+      if (signed) {
+        zzEncVal = utils.zigzagEncode(literals[i]);
+      } else {
+        zzEncVal = literals[i];
+      }
+      zigzagLiterals[i] = zzEncVal;
+    }
+  }
+
+  private void preparePatchedBlob() {
+    // mask will be max value beyond which patch will be generated
+    long mask = (1L << brBits95p) - 1;
+
+    // since we are considering only 95 percentile, the size of gap and
+    // patch array can contain only be 5% values
+    patchLength = (int) Math.ceil((numLiterals * 0.05));
+
+    int[] gapList = new int[patchLength];
+    long[] patchList = new long[patchLength];
+
+    // #bit for patch
+    patchWidth = brBits100p - brBits95p;
+    patchWidth = utils.getClosestFixedBits(patchWidth);
+
+    // if patch bit requirement is 64 then it will not possible to pack
+    // gap and patch together in a long. To make sure gap and patch can be
+    // packed together adjust the patch width
+    if (patchWidth == 64) {
+      patchWidth = 56;
+      brBits95p = 8;
+      mask = (1L << brBits95p) - 1;
+    }
+
+    int gapIdx = 0;
+    int patchIdx = 0;
+    int prev = 0;
+    int gap = 0;
+    int maxGap = 0;
+
+    for(int i = 0; i < numLiterals; i++) {
+      // if value is above mask then create the patch and record the gap
+      if (baseRedLiterals[i] > mask) {
+        gap = i - prev;
+        if (gap > maxGap) {
+          maxGap = gap;
+        }
+
+        // gaps are relative, so store the previous patched value index
+        prev = i;
+        gapList[gapIdx++] = gap;
+
+        // extract the most significant bits that are over mask bits
+        long patch = baseRedLiterals[i] >>> brBits95p;
+        patchList[patchIdx++] = patch;
+
+        // strip off the MSB to enable safe bit packing
+        baseRedLiterals[i] &= mask;
+      }
+    }
+
+    // adjust the patch length to number of entries in gap list
+    patchLength = gapIdx;
+
+    // if the element to be patched is the first and only element then
+    // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
+    if (maxGap == 0 && patchLength != 0) {
+      patchGapWidth = 1;
+    } else {
+      patchGapWidth = utils.findClosestNumBits(maxGap);
+    }
+
+    // special case: if the patch gap width is greater than 256, then
+    // we need 9 bits to encode the gap width. But we only have 3 bits in
+    // header to record the gap width. To deal with this case, we will save
+    // two entries in patch list in the following way
+    // 256 gap width => 0 for patch value
+    // actual gap - 256 => actual patch value
+    // We will do the same for gap width = 511. If the element to be patched is
+    // the last element in the scope then gap width will be 511. In this case we
+    // will have 3 entries in the patch list in the following way
+    // 255 gap width => 0 for patch value
+    // 255 gap width => 0 for patch value
+    // 1 gap width => actual patch value
+    if (patchGapWidth > 8) {
+      patchGapWidth = 8;
+      // for gap = 511, we need two additional entries in patch list
+      if (maxGap == 511) {
+        patchLength += 2;
+      } else {
+        patchLength += 1;
+      }
+    }
+
+    // create gap vs patch list
+    gapIdx = 0;
+    patchIdx = 0;
+    gapVsPatchList = new long[patchLength];
+    for(int i = 0; i < patchLength; i++) {
+      long g = gapList[gapIdx++];
+      long p = patchList[patchIdx++];
+      while (g > 255) {
+        gapVsPatchList[i++] = (255L << patchWidth);
+        g -= 255;
+      }
+
+      // store patch value in LSBs and gap in MSBs
+      gapVsPatchList[i] = (g << patchWidth) | p;
+    }
+  }
+
+  /**
+   * clears all the variables
+   */
+  private void clear() {
+    numLiterals = 0;
+    encoding = null;
+    prevDelta = 0;
+    fixedDelta = 0;
+    zzBits90p = 0;
+    zzBits100p = 0;
+    brBits95p = 0;
+    brBits100p = 0;
+    bitsDeltaMax = 0;
+    patchGapWidth = 0;
+    patchLength = 0;
+    patchWidth = 0;
+    gapVsPatchList = null;
+    min = 0;
+    isFixedDelta = true;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (numLiterals != 0) {
+      if (variableRunLength != 0) {
+        determineEncoding();
+        writeValues();
+      } else if (fixedRunLength != 0) {
+        if (fixedRunLength < MIN_REPEAT) {
+          variableRunLength = fixedRunLength;
+          fixedRunLength = 0;
+          determineEncoding();
+          writeValues();
+        } else if (fixedRunLength >= MIN_REPEAT
+            && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+          encoding = EncodingType.SHORT_REPEAT;
+          writeValues();
+        } else {
+          encoding = EncodingType.DELTA;
+          isFixedDelta = true;
+          writeValues();
+        }
+      }
+    }
+    output.flush();
+  }
+
+  @Override
+  public void write(long val) throws IOException {
+    if (numLiterals == 0) {
+      initializeLiterals(val);
+    } else {
+      if (numLiterals == 1) {
+        prevDelta = val - literals[0];
+        literals[numLiterals++] = val;
+        // if both values are same count as fixed run else variable run
+        if (val == literals[0]) {
+          fixedRunLength = 2;
+          variableRunLength = 0;
+        } else {
+          fixedRunLength = 0;
+          variableRunLength = 2;
+        }
+      } else {
+        long currentDelta = val - literals[numLiterals - 1];
+        if (prevDelta == 0 && currentDelta == 0) {
+          // fixed delta run
+
+          literals[numLiterals++] = val;
+
+          // if variable run is non-zero then we are seeing repeating
+          // values at the end of variable run in which case keep
+          // updating variable and fixed runs
+          if (variableRunLength > 0) {
+            fixedRunLength = 2;
+          }
+          fixedRunLength += 1;
+
+          // if fixed run met the minimum condition and if variable
+          // run is non-zero then flush the variable run and shift the
+          // tail fixed runs to start of the buffer
+          if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
+            numLiterals -= MIN_REPEAT;
+            variableRunLength -= MIN_REPEAT - 1;
+            // copy the tail fixed runs
+            long[] tailVals = new long[MIN_REPEAT];
+            System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
+
+            // determine variable encoding and flush values
+            determineEncoding();
+            writeValues();
+
+            // shift tail fixed runs to beginning of the buffer
+            for(long l : tailVals) {
+              literals[numLiterals++] = l;
+            }
+          }
+
+          // if fixed runs reached max repeat length then write values
+          if (fixedRunLength == MAX_SCOPE) {
+            determineEncoding();
+            writeValues();
+          }
+        } else {
+          // variable delta run
+
+          // if fixed run length is non-zero and if it satisfies the
+          // short repeat conditions then write the values as short repeats
+          // else use delta encoding
+          if (fixedRunLength >= MIN_REPEAT) {
+            if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
+              encoding = EncodingType.SHORT_REPEAT;
+              writeValues();
+            } else {
+              encoding = EncodingType.DELTA;
+              isFixedDelta = true;
+              writeValues();
+            }
+          }
+
+          // if fixed run length is <MIN_REPEAT and current value is
+          // different from previous then treat it as variable run
+          if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) {
+            if (val != literals[numLiterals - 1]) {
+              variableRunLength = fixedRunLength;
+              fixedRunLength = 0;
+            }
+          }
+
+          // after writing values re-initialize the variables
+          if (numLiterals == 0) {
+            initializeLiterals(val);
+          } else {
+            // keep updating variable run lengths
+            prevDelta = val - literals[numLiterals - 1];
+            literals[numLiterals++] = val;
+            variableRunLength += 1;
+
+            // if variable run length reach the max scope, write it
+            if (variableRunLength == MAX_SCOPE) {
+              determineEncoding();
+              writeValues();
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void initializeLiterals(long val) {
+    literals[numLiterals++] = val;
+    fixedRunLength = 1;
+    variableRunLength = 1;
+  }
+
+  @Override
+  public void getPosition(PositionRecorder recorder) throws IOException {
+    output.getPosition(recorder);
+    recorder.addPosition(numLiterals);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/df8921d8/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java b/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java
new file mode 100644
index 0000000..3c7124b
--- /dev/null
+++ b/orc/src/java/org/apache/hive/orc/impl/SchemaEvolution.java
@@ -0,0 +1,399 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.orc.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hive.orc.TypeDescription;
+
+/**
+ * Take the file types and the (optional) configuration column names/types and see if there
+ * has been schema evolution.
+ */
+public class SchemaEvolution {
+  // indexed by reader column id
+  private final TypeDescription[] readerFileTypes;
+  // indexed by reader column id
+  private final boolean[] readerIncluded;
+  // the offset to the first column id ignoring any ACID columns
+  private final int readerColumnOffset;
+  // indexed by file column id
+  private final boolean[] fileIncluded;
+  private final TypeDescription fileSchema;
+  private final TypeDescription readerSchema;
+  private boolean hasConversion;
+  // indexed by reader column id
+  private final boolean[] ppdSafeConversion;
+
+  public SchemaEvolution(TypeDescription fileSchema, boolean[] includedCols) {
+    this(fileSchema, null, includedCols);
+  }
+
+  public SchemaEvolution(TypeDescription fileSchema,
+                         TypeDescription readerSchema,
+                         boolean[] includeCols) {
+    this.readerIncluded = includeCols == null ? null : Arrays.copyOf(includeCols, includeCols.length);
+    this.fileIncluded = new boolean[fileSchema.getMaximumId() + 1];
+    this.hasConversion = false;
+    this.fileSchema = fileSchema;
+    boolean isAcid = checkAcidSchema(fileSchema);
+    this.readerColumnOffset = isAcid ? acidEventFieldNames.size() : 0;
+    if (readerSchema != null) {
+      if (isAcid) {
+        this.readerSchema = createEventSchema(readerSchema);
+      } else {
+        this.readerSchema = readerSchema;
+      }
+      if (readerIncluded != null &&
+          readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) {
+        throw new IllegalArgumentException("Include vector the wrong length: " +
+            this.readerSchema.toJson() + " with include length " +
+            readerIncluded.length);
+      }
+      this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
+      buildConversionFileTypesArray(fileSchema, this.readerSchema);
+    } else {
+      this.readerSchema = fileSchema;
+      this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1];
+      if (readerIncluded != null &&
+          readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) {
+        throw new IllegalArgumentException("Include vector the wrong length: " +
+            this.readerSchema.toJson() + " with include length " +
+            readerIncluded.length);
+      }
+      buildSameSchemaFileTypesArray();
+    }
+    this.ppdSafeConversion = populatePpdSafeConversion();
+  }
+
+  public TypeDescription getReaderSchema() {
+    return readerSchema;
+  }
+
+  /**
+   * Returns the non-ACID (aka base) reader type description.
+   *
+   * @return the reader type ignoring the ACID rowid columns, if any
+   */
+  public TypeDescription getReaderBaseSchema() {
+    return readerSchema.findSubtype(readerColumnOffset);
+  }
+
+  /**
+   * Is there Schema Evolution data type conversion?
+   * @return
+   */
+  public boolean hasConversion() {
+    return hasConversion;
+  }
+
+  public TypeDescription getFileType(TypeDescription readerType) {
+    return getFileType(readerType.getId());
+  }
+
+  /**
+   * Get whether each column is included from the reader's point of view.
+   * @return a boolean array indexed by reader column id
+   */
+  public boolean[] getReaderIncluded() {
+    return readerIncluded;
+  }
+
+  /**
+   * Get whether each column is included from the file's point of view.
+   * @return a boolean array indexed by file column id
+   */
+  public boolean[] getFileIncluded() {
+    return fileIncluded;
+  }
+
+  /**
+   * Get the file type by reader type id.
+   * @param id reader column id
+   * @return
+   */
+  public TypeDescription getFileType(int id) {
+    return readerFileTypes[id];
+  }
+
+  /**
+   * Check if column is safe for ppd evaluation
+   * @param colId reader column id
+   * @return true if the specified column is safe for ppd evaluation else false
+   */
+  public boolean isPPDSafeConversion(final int colId) {
+    if (hasConversion()) {
+      if (colId < 0 || colId >= ppdSafeConversion.length) {
+        return false;
+      }
+      return ppdSafeConversion[colId];
+    }
+
+    // when there is no schema evolution PPD is safe
+    return true;
+  }
+
+  private boolean[] populatePpdSafeConversion() {
+    if (fileSchema == null || readerSchema == null || readerFileTypes == null) {
+      return null;
+    }
+
+    boolean[] result = new boolean[readerSchema.getMaximumId() + 1];
+    boolean safePpd = validatePPDConversion(fileSchema, readerSchema);
+    result[readerSchema.getId()] = safePpd;
+    List<TypeDescription> children = readerSchema.getChildren();
+    if (children != null) {
+      for (TypeDescription child : children) {
+        TypeDescription fileType = getFileType(child.getId());
+        safePpd = validatePPDConversion(fileType, child);
+        result[child.getId()] = safePpd;
+      }
+    }
+    return result;
+  }
+
+  private boolean validatePPDConversion(final TypeDescription fileType,
+      final TypeDescription readerType) {
+    if (fileType == null) {
+      return false;
+    }
+    if (fileType.getCategory().isPrimitive()) {
+      if (fileType.getCategory().equals(readerType.getCategory())) {
+        // for decimals alone do equality check to not mess up with precision change
+        if (fileType.getCategory().equals(TypeDescription.Category.DECIMAL) &&
+            !fileType.equals(readerType)) {
+          return false;
+        }
+        return true;
+      }
+
+      // only integer and string evolutions are safe
+      // byte -> short -> int -> long
+      // string <-> char <-> varchar
+      // NOTE: Float to double evolution is not safe as floats are stored as doubles in ORC's
+      // internal index, but when doing predicate evaluation for queries like "select * from
+      // orc_float where f = 74.72" the constant on the filter is converted from string -> double
+      // so the precisions will be different and the comparison will fail.
+      // Soon, we should convert all sargs that compare equality between floats or
+      // doubles to range predicates.
+
+      // Similarly string -> char and varchar -> char and vice versa is not possible, as ORC stores
+      // char with padded spaces in its internal index.
+      switch (fileType.getCategory()) {
+        case BYTE:
+          if (readerType.getCategory().equals(TypeDescription.Category.SHORT) ||
+              readerType.getCategory().equals(TypeDescription.Category.INT) ||
+              readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+            return true;
+          }
+          break;
+        case SHORT:
+          if (readerType.getCategory().equals(TypeDescription.Category.INT) ||
+              readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+            return true;
+          }
+          break;
+        case INT:
+          if (readerType.getCategory().equals(TypeDescription.Category.LONG)) {
+            return true;
+          }
+          break;
+        case STRING:
+          if (readerType.getCategory().equals(TypeDescription.Category.VARCHAR)) {
+            return true;
+          }
+          break;
+        case VARCHAR:
+          if (readerType.getCategory().equals(TypeDescription.Category.STRING)) {
+            return true;
+          }
+          break;
+        default:
+          break;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Should we read the given reader column?
+   * @param readerId the id of column in the extended reader schema
+   * @return true if the column should be read
+   */
+  public boolean includeReaderColumn(int readerId) {
+    return readerIncluded == null ||
+        readerId <= readerColumnOffset ||
+        readerIncluded[readerId - readerColumnOffset];
+  }
+
+  void buildConversionFileTypesArray(TypeDescription fileType,
+                                     TypeDescription readerType) {
+    // if the column isn't included, don't map it
+    int readerId = readerType.getId();
+    if (!includeReaderColumn(readerId)) {
+      return;
+    }
+    boolean isOk = true;
+    // check the easy case first
+    if (fileType.getCategory() == readerType.getCategory()) {
+      switch (readerType.getCategory()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case DOUBLE:
+        case FLOAT:
+        case STRING:
+        case TIMESTAMP:
+        case BINARY:
+        case DATE:
+          // these are always a match
+          break;
+        case CHAR:
+        case VARCHAR:
+          // We do conversion when same CHAR/VARCHAR type but different maxLength.
+          if (fileType.getMaxLength() != readerType.getMaxLength()) {
+            hasConversion = true;
+          }
+          break;
+        case DECIMAL:
+          // We do conversion when same DECIMAL type but different precision/scale.
+          if (fileType.getPrecision() != readerType.getPrecision() ||
+              fileType.getScale() != readerType.getScale()) {
+            hasConversion = true;
+          }
+          break;
+        case UNION:
+        case MAP:
+        case LIST: {
+          // these must be an exact match
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          if (fileChildren.size() == readerChildren.size()) {
+            for(int i=0; i < fileChildren.size(); ++i) {
+              buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i));
+            }
+          } else {
+            isOk = false;
+          }
+          break;
+        }
+        case STRUCT: {
+          // allow either side to have fewer fields than the other
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          if (fileChildren.size() != readerChildren.size()) {
+            hasConversion = true;
+          }
+          int jointSize = Math.min(fileChildren.size(), readerChildren.size());
+          for(int i=0; i < jointSize; ++i) {
+            buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i));
+          }
+          break;
+        }
+        default:
+          throw new IllegalArgumentException("Unknown type " + readerType);
+      }
+    } else {
+      /*
+       * Check for the few cases where will not convert....
+       */
+
+      isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType);
+      hasConversion = true;
+    }
+    if (isOk) {
+      if (readerFileTypes[readerId] != null) {
+        throw new RuntimeException("reader to file type entry already assigned");
+      }
+      readerFileTypes[readerId] = fileType;
+      fileIncluded[fileType.getId()] = true;
+    } else {
+      throw new IllegalArgumentException(
+          String.format(
+              "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)",
+              fileType.toString(), fileType.getId(),
+              readerType.toString(), readerId));
+    }
+  }
+
+  /**
+   * Use to make a reader to file type array when the schema is the same.
+   * @return
+   */
+  private void buildSameSchemaFileTypesArray() {
+    buildSameSchemaFileTypesArrayRecurse(readerSchema);
+  }
+
+  void buildSameSchemaFileTypesArrayRecurse(TypeDescription readerType) {
+    int id = readerType.getId();
+    if (!includeReaderColumn(id)) {
+      return;
+    }
+    if (readerFileTypes[id] != null) {
+      throw new RuntimeException("reader to file type entry already assigned");
+    }
+    readerFileTypes[id] = readerType;
+    fileIncluded[id] = true;
+    List<TypeDescription> children = readerType.getChildren();
+    if (children != null) {
+      for (TypeDescription child : children) {
+        buildSameSchemaFileTypesArrayRecurse(child);
+      }
+    }
+  }
+
+  private static boolean checkAcidSchema(TypeDescription type) {
+    if (type.getCategory().equals(TypeDescription.Category.STRUCT)) {
+      List<String> rootFields = type.getFieldNames();
+      if (acidEventFieldNames.equals(rootFields)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param typeDescr
+   * @return ORC types for the ACID event based on the row's type description
+   */
+  public static TypeDescription createEventSchema(TypeDescription typeDescr) {
+    TypeDescription result = TypeDescription.createStruct()
+        .addField("operation", TypeDescription.createInt())
+        .addField("originalTransaction", TypeDescription.createLong())
+        .addField("bucket", TypeDescription.createInt())
+        .addField("rowId", TypeDescription.createLong())
+        .addField("currentTransaction", TypeDescription.createLong())
+        .addField("row", typeDescr.clone());
+    return result;
+  }
+
+  public static final List<String> acidEventFieldNames= new ArrayList<String>();
+  static {
+    acidEventFieldNames.add("operation");
+    acidEventFieldNames.add("originalTransaction");
+    acidEventFieldNames.add("bucket");
+    acidEventFieldNames.add("rowId");
+    acidEventFieldNames.add("currentTransaction");
+    acidEventFieldNames.add("row");
+  }
+}


Mime
View raw message