tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [4/7] tajo git commit: TAJO-2102: Migrate to Apache Orc from Presto's one.
Date Wed, 23 Mar 2016 01:41:49 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
deleted file mode 100644
index f6cfd57..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-class OutStream extends PositionedOutputStream {
-
-  interface OutputReceiver {
-    /**
-     * Output the given buffer to the final destination
-     * @param buffer the buffer to output
-     * @throws IOException
-     */
-    void output(ByteBuffer buffer) throws IOException;
-  }
-
-  static final int HEADER_SIZE = 3;
-  private final String name;
-  private final OutputReceiver receiver;
-  // if enabled the stream will be suppressed when writing stripe
-  private boolean suppress;
-
-  /**
-   * Stores the uncompressed bytes that have been serialized, but not
-   * compressed yet. When this fills, we compress the entire buffer.
-   */
-  private ByteBuffer current = null;
-
-  /**
-   * Stores the compressed bytes until we have a full buffer and then outputs
-   * them to the receiver. If no compression is being done, this (and overflow)
-   * will always be null and the current buffer will be sent directly to the
-   * receiver.
-   */
-  private ByteBuffer compressed = null;
-
-  /**
-   * Since the compressed buffer may start with contents from previous
-   * compression blocks, we allocate an overflow buffer so that the
-   * output of the codec can be split between the two buffers. After the
-   * compressed buffer is sent to the receiver, the overflow buffer becomes
-   * the new compressed buffer.
-   */
-  private ByteBuffer overflow = null;
-  private final int bufferSize;
-  private final CompressionCodec codec;
-  private long compressedBytes = 0;
-  private long uncompressedBytes = 0;
-
-  OutStream(String name,
-            int bufferSize,
-            CompressionCodec codec,
-            OutputReceiver receiver) throws IOException {
-    this.name = name;
-    this.bufferSize = bufferSize;
-    this.codec = codec;
-    this.receiver = receiver;
-    this.suppress = false;
-  }
-
-  public void clear() throws IOException {
-    flush();
-    suppress = false;
-  }
-
-  /**
-   * Write the length of the compressed bytes. Life is much easier if the
-   * header is constant length, so just use 3 bytes. Considering most of the
-   * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
-   * be plenty. We also use the low bit for whether it is the original or
-   * compressed bytes.
-   * @param buffer the buffer to write the header to
-   * @param position the position in the buffer to write at
-   * @param val the size in the file
-   * @param original is it uncompressed
-   */
-  private static void writeHeader(ByteBuffer buffer,
-                                  int position,
-                                  int val,
-                                  boolean original) {
-    buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
-    buffer.put(position + 1, (byte) (val >> 7));
-    buffer.put(position + 2, (byte) (val >> 15));
-  }
-
-  private void getNewInputBuffer() throws IOException {
-    if (codec == null) {
-      current = ByteBuffer.allocate(bufferSize);
-    } else {
-      current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
-      writeHeader(current, 0, bufferSize, true);
-      current.position(HEADER_SIZE);
-    }
-  }
-
-  /**
-   * Allocate a new output buffer if we are compressing.
-   */
-  private ByteBuffer getNewOutputBuffer() throws IOException {
-    return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
-  }
-
-  private void flip() throws IOException {
-    current.limit(current.position());
-    current.position(codec == null ? 0 : HEADER_SIZE);
-  }
-
-  @Override
-  public void write(int i) throws IOException {
-    if (current == null) {
-      getNewInputBuffer();
-    }
-    if (current.remaining() < 1) {
-      spill();
-    }
-    uncompressedBytes += 1;
-    current.put((byte) i);
-  }
-
-  @Override
-  public void write(byte[] bytes, int offset, int length) throws IOException {
-    if (current == null) {
-      getNewInputBuffer();
-    }
-    int remaining = Math.min(current.remaining(), length);
-    current.put(bytes, offset, remaining);
-    uncompressedBytes += remaining;
-    length -= remaining;
-    while (length != 0) {
-      spill();
-      offset += remaining;
-      remaining = Math.min(current.remaining(), length);
-      current.put(bytes, offset, remaining);
-      uncompressedBytes += remaining;
-      length -= remaining;
-    }
-  }
-
-  private void spill() throws IOException {
-    // if there isn't anything in the current buffer, don't spill
-    if (current == null ||
-        current.position() == (codec == null ? 0 : HEADER_SIZE)) {
-      return;
-    }
-    flip();
-    if (codec == null) {
-      receiver.output(current);
-      getNewInputBuffer();
-    } else {
-      if (compressed == null) {
-        compressed = getNewOutputBuffer();
-      } else if (overflow == null) {
-        overflow = getNewOutputBuffer();
-      }
-      int sizePosn = compressed.position();
-      compressed.position(compressed.position() + HEADER_SIZE);
-      if (codec.compress(current, compressed, overflow)) {
-        uncompressedBytes = 0;
-        // move position back to after the header
-        current.position(HEADER_SIZE);
-        current.limit(current.capacity());
-        // find the total bytes in the chunk
-        int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
-        if (overflow != null) {
-          totalBytes += overflow.position();
-        }
-        compressedBytes += totalBytes + HEADER_SIZE;
-        writeHeader(compressed, sizePosn, totalBytes, false);
-        // if we have less than the next header left, spill it.
-        if (compressed.remaining() < HEADER_SIZE) {
-          compressed.flip();
-          receiver.output(compressed);
-          compressed = overflow;
-          overflow = null;
-        }
-      } else {
-        compressedBytes += uncompressedBytes + HEADER_SIZE;
-        uncompressedBytes = 0;
-        // we are using the original, but need to spill the current
-        // compressed buffer first. So back up to where we started,
-        // flip it and add it to done.
-        if (sizePosn != 0) {
-          compressed.position(sizePosn);
-          compressed.flip();
-          receiver.output(compressed);
-          compressed = null;
-          // if we have an overflow, clear it and make it the new compress
-          // buffer
-          if (overflow != null) {
-            overflow.clear();
-            compressed = overflow;
-            overflow = null;
-          }
-        } else {
-          compressed.clear();
-          if (overflow != null) {
-            overflow.clear();
-          }
-        }
-
-        // now add the current buffer into the done list and get a new one.
-        current.position(0);
-        // update the header with the current length
-        writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
-        receiver.output(current);
-        getNewInputBuffer();
-      }
-    }
-  }
-
-  void getPosition(PositionRecorder recorder) throws IOException {
-    if (codec == null) {
-      recorder.addPosition(uncompressedBytes);
-    } else {
-      recorder.addPosition(compressedBytes);
-      recorder.addPosition(uncompressedBytes);
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    spill();
-    if (compressed != null && compressed.position() != 0) {
-      compressed.flip();
-      receiver.output(compressed);
-      compressed = null;
-    }
-    uncompressedBytes = 0;
-    compressedBytes = 0;
-    overflow = null;
-    current = null;
-  }
-
-  @Override
-  public String toString() {
-    return name;
-  }
-
-  @Override
-  public long getBufferSize() {
-    long result = 0;
-    if (current != null) {
-      result += current.capacity();
-    }
-    if (compressed != null) {
-      result += compressed.capacity();
-    }
-    if (overflow != null) {
-      result += overflow.capacity();
-    }
-    return result;
-  }
-
-  /**
-   * Set suppress flag
-   */
-  public void suppress() {
-    suppress = true;
-  }
-
-  /**
-   * Returns the state of suppress flag
-   * @return value of suppress flag
-   */
-  public boolean isSuppressed() {
-    return suppress;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
deleted file mode 100644
index a39926e..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * An interface for recording positions in a stream.
- */
-interface PositionRecorder {
-  void addPosition(long offset);
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
deleted file mode 100644
index 748c98c..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-abstract class PositionedOutputStream extends OutputStream {
-
-  /**
-   * Record the current position to the recorder.
-   * @param recorder the object that receives the position
-   * @throws IOException
-   */
-  abstract void getPosition(PositionRecorder recorder) throws IOException;
-
-  /**
-   * Get the memory size currently allocated as buffer associated with this
-   * stream.
-   * @return the number of bytes used by buffers.
-   */
-  abstract long getBufferSize();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java
new file mode 100644
index 0000000..bc882e0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.BufferChunk;
+import org.apache.orc.impl.DirectDecompressionCodec;
+import org.apache.orc.impl.OutStream;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RecordReaderUtils {
+
+  public static class DefaultDataReader implements DataReader {
+    private FSDataInputStream file;
+    private ByteBufferAllocatorPool pool;
+    private ZeroCopyAdapter zcr;
+    private FileSystem fs;
+    private Path path;
+    private boolean useZeroCopy;
+    private CompressionCodec codec;
+    private long readBytes = 0;
+
+    public DefaultDataReader(
+        FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
+      this.fs = fs;
+      this.path = path;
+      this.useZeroCopy = useZeroCopy;
+      this.codec = codec;
+    }
+
+    @Override
+    public void open() throws IOException {
+      this.file = fs.open(path);
+      if (useZeroCopy) {
+        pool = new ByteBufferAllocatorPool();
+        zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+      } else {
+        pool = null;
+        zcr = null;
+      }
+    }
+
+    @Override
+    public DiskRangeList readFileData(
+        DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
+      return readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (file != null) {
+        file.close();
+      }
+      if (pool != null) {
+        pool.clear();
+      }
+    }
+
+    @Override
+    public boolean isTrackingDiskRanges() {
+      return zcr != null;
+    }
+
+    @Override
+    public void releaseBuffer(ByteBuffer buffer) {
+      zcr.releaseBuffer(buffer);
+    }
+
+    public long getReadBytes() {
+      return readBytes;
+    }
+
+    /**
+     * Read the list of ranges from the file.
+     * @param file the file to read
+     * @param base the base of the stripe
+     * @param range the disk ranges within the stripe to read
+     * @return the bytes read for each disk range, which is the same length as
+     *    ranges
+     * @throws IOException
+     */
+    private DiskRangeList readDiskRanges(FSDataInputStream file,
+                                         ZeroCopyAdapter zcr,
+                                        long base,
+                                        DiskRangeList range,
+                                        boolean doForceDirect) throws IOException {
+      if (range == null) return null;
+      DiskRangeList prev = range.prev;
+      if (prev == null) {
+        prev = new DiskRangeList.MutateHelper(range);
+      }
+      while (range != null) {
+        if (range.hasData()) {
+          range = range.next;
+          continue;
+        }
+        int len = (int) (range.getEnd() - range.getOffset());
+        long off = range.getOffset();
+        if (zcr != null) {
+          file.seek(base + off);
+          boolean hasReplaced = false;
+          while (len > 0) {
+            ByteBuffer partial = zcr.readBuffer(len, false);
+            readBytes += partial.remaining();
+            BufferChunk bc = new BufferChunk(partial, off);
+            if (!hasReplaced) {
+              range.replaceSelfWith(bc);
+              hasReplaced = true;
+            } else {
+              range.insertAfter(bc);
+            }
+            range = bc;
+            int read = partial.remaining();
+            len -= read;
+            off += read;
+          }
+        } else {
+          // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
+          byte[] buffer = new byte[len];
+          file.readFully((base + off), buffer, 0, buffer.length);
+          readBytes += buffer.length;
+          ByteBuffer bb = null;
+          if (doForceDirect) {
+            bb = ByteBuffer.allocateDirect(len);
+            bb.put(buffer);
+            bb.position(0);
+            bb.limit(len);
+          } else {
+            bb = ByteBuffer.wrap(buffer);
+          }
+          range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
+        }
+        range = range.next;
+      }
+      return prev.next;
+    }
+  }
+
+  public static DataReader createDefaultDataReader(
+      FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
+    return new DefaultDataReader(fs, path, useZeroCopy, codec);
+  }
+
+  public static boolean[] findPresentStreamsByColumn(
+      List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
+    boolean[] hasNull = new boolean[types.size()];
+    for(OrcProto.Stream stream: streamList) {
+      if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
+        hasNull[stream.getColumn()] = true;
+      }
+    }
+    return hasNull;
+  }
+
+  /**
+   * Does region A overlap region B? The end points are inclusive on both sides.
+   * @param leftA A's left point
+   * @param rightA A's right point
+   * @param leftB B's left point
+   * @param rightB B's right point
+   * @return Does region A overlap region B?
+   */
+  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+    if (leftA <= leftB) {
+      return rightA >= leftB;
+    }
+    return rightB >= leftA;
+  }
+
+  public static void addEntireStreamToRanges(
+      long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
+    list.addOrMerge(offset, offset + length, doMergeBuffers, false);
+  }
+
+  public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
+                                                 boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
+                                                 OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
+                                                 long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
+    for (int group = 0; group < includedRowGroups.length; ++group) {
+      if (!includedRowGroups[group]) continue;
+      int posn = getIndexPosition(
+          encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
+      long start = index.getEntry(group).getPositions(posn);
+      final long nextGroupOffset;
+      boolean isLast = group == (includedRowGroups.length - 1);
+      nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
+
+      start += offset;
+      long end = offset + estimateRgEndOffset(
+          isCompressed, isLast, nextGroupOffset, length, compressionSize);
+      list.addOrMerge(start, end, doMergeBuffers, true);
+    }
+  }
+
+  public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
+                                         long nextGroupOffset, long streamLength, int bufferSize) {
+    // figure out the worst case last location
+    // if adjacent groups have the same compressed block offset then stretch the slop
+    // by factor of 2 to safely accommodate the next compression block.
+    // One for the current compression block and another for the next compression block.
+    long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
+    return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
+  }
+
+  private static final int BYTE_STREAM_POSITIONS = 1;
+  private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+  private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+
+  /**
+   * Get the offset in the index positions for the column that the given
+   * stream starts.
+   * @param columnEncoding the encoding of the column
+   * @param columnType the type of the column
+   * @param streamType the kind of the stream
+   * @param isCompressed is the file compressed
+   * @param hasNulls does the column have a PRESENT stream?
+   * @return the number of positions that will be used for that stream
+   */
+  public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
+                                     OrcProto.Type.Kind columnType,
+                                     OrcProto.Stream.Kind streamType,
+                                     boolean isCompressed,
+                                     boolean hasNulls) {
+    if (streamType == OrcProto.Stream.Kind.PRESENT) {
+      return 0;
+    }
+    int compressionValue = isCompressed ? 1 : 0;
+    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+    switch (columnType) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case DATE:
+      case STRUCT:
+      case MAP:
+      case LIST:
+      case UNION:
+        return base;
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+            columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+          return base;
+        } else {
+          if (streamType == OrcProto.Stream.Kind.DATA) {
+            return base;
+          } else {
+            return base + BYTE_STREAM_POSITIONS + compressionValue;
+          }
+        }
+      case BINARY:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case DECIMAL:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case TIMESTAMP:
+        if (streamType == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+      default:
+        throw new IllegalArgumentException("Unknown type " + columnType);
+    }
+  }
+
+  // for uncompressed streams, what is the most overlap with the following set
+  // of rows (long vint literal group).
+  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+  /**
+   * Is this stream part of a dictionary?
+   * @return is this part of a dictionary?
+   */
+  public static boolean isDictionary(OrcProto.Stream.Kind kind,
+                                     OrcProto.ColumnEncoding encoding) {
+    assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
+    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+        (kind == OrcProto.Stream.Kind.LENGTH &&
+            (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+                encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+  }
+
+  /**
+   * Build a string representation of a list of disk ranges.
+   * @param range ranges to stringify
+   * @return the resulting string
+   */
+  public static String stringifyDiskRanges(DiskRangeList range) {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("[");
+    boolean isFirst = true;
+    while (range != null) {
+      if (!isFirst) {
+        buffer.append(", {");
+      } else {
+        buffer.append("{");
+      }
+      isFirst = false;
+      buffer.append(range.toString());
+      buffer.append("}");
+      range = range.next;
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  public static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
+    // This assumes sorted ranges (as do many other parts of ORC code.
+    ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
+    if (length == 0) return buffers;
+    long streamEnd = offset + length;
+    boolean inRange = false;
+    while (range != null) {
+      if (!inRange) {
+        if (range.getEnd() <= offset) {
+          range = range.next;
+          continue; // Skip until we are in range.
+        }
+        inRange = true;
+        if (range.getOffset() < offset) {
+          // Partial first buffer, add a slice of it.
+          buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
+          if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
+          range = range.next;
+          continue;
+        }
+      } else if (range.getOffset() >= streamEnd) {
+        break;
+      }
+      if (range.getEnd() > streamEnd) {
+        // Partial last buffer (may also be the first buffer), add a slice of it.
+        buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
+        break;
+      }
+      // Buffer that belongs entirely to one stream.
+      // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
+      //       because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
+      buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
+      if (range.getEnd() == streamEnd) break;
+      range = range.next;
+    }
+    return buffers;
+  }
+
+  static ZeroCopyAdapter createZeroCopyShim(FSDataInputStream file,
+                                            CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
+    if ((codec == null || ((codec instanceof DirectDecompressionCodec)
+        && ((DirectDecompressionCodec) codec).isAvailable()))) {
+      /* codec is null or is available */
+      return new ZeroCopyAdapter(file, pool);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
deleted file mode 100644
index 2482f93..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * A memory efficient red-black tree that does not allocate any objects per
- * an element. This class is abstract and assumes that the child class
- * handles the key and comparisons with the key.
- */
-abstract class RedBlackTree {
-  public static final int NULL = -1;
-
-  // Various values controlling the offset of the data within the array.
-  private static final int LEFT_OFFSET = 0;
-  private static final int RIGHT_OFFSET = 1;
-  private static final int ELEMENT_SIZE = 2;
-
-  protected int size = 0;
-  private final DynamicIntArray data;
-  protected int root = NULL;
-  protected int lastAdd = 0;
-  private boolean wasAdd = false;
-
-  /**
-   * Create a set with the given initial capacity.
-   */
-  public RedBlackTree(int initialCapacity) {
-    data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
-  }
-
-  /**
-   * Insert a new node into the data array, growing the array as necessary.
-   *
-   * @return Returns the position of the new node.
-   */
-  private int insert(int left, int right, boolean isRed) {
-    int position = size;
-    size += 1;
-    setLeft(position, left, isRed);
-    setRight(position, right);
-    return position;
-  }
-
-  /**
-   * Compare the value at the given position to the new value.
-   * @return 0 if the values are the same, -1 if the new value is smaller and
-   *         1 if the new value is larger.
-   */
-  protected abstract int compareValue(int position);
-
-  /**
-   * Is the given node red as opposed to black? To prevent having an extra word
-   * in the data array, we just the low bit on the left child index.
-   */
-  protected boolean isRed(int position) {
-    return position != NULL &&
-        (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
-  }
-
-  /**
-   * Set the red bit true or false.
-   */
-  private void setRed(int position, boolean isRed) {
-    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
-    if (isRed) {
-      data.set(offset, data.get(offset) | 1);
-    } else {
-      data.set(offset, data.get(offset) & ~1);
-    }
-  }
-
-  /**
-   * Get the left field of the given position.
-   */
-  protected int getLeft(int position) {
-    return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
-  }
-
-  /**
-   * Get the right field of the given position.
-   */
-  protected int getRight(int position) {
-    return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
-  }
-
-  /**
-   * Set the left field of the given position.
-   * Note that we are storing the node color in the low bit of the left pointer.
-   */
-  private void setLeft(int position, int left) {
-    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
-    data.set(offset, (left << 1) | (data.get(offset) & 1));
-  }
-
-  /**
-   * Set the left field of the given position.
-   * Note that we are storing the node color in the low bit of the left pointer.
-   */
-  private void setLeft(int position, int left, boolean isRed) {
-    int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
-    data.set(offset, (left << 1) | (isRed ? 1 : 0));
-  }
-
-  /**
-   * Set the right field of the given position.
-   */
-  private void setRight(int position, int right) {
-    data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
-  }
-
-  /**
-   * Insert or find a given key in the tree and rebalance the tree correctly.
-   * Rebalancing restores the red-black aspect of the tree to maintain the
-   * invariants:
-   *   1. If a node is red, both of its children are black.
-   *   2. Each child of a node has the same black height (the number of black
-   *      nodes between it and the leaves of the tree).
-   *
-   * Inserted nodes are at the leaves and are red, therefore there is at most a
-   * violation of rule 1 at the node we just put in. Instead of always keeping
-   * the parents, this routine passing down the context.
-   *
-   * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
-   * left-right mirror images of each other). See Algorighms by Cormen,
-   * Leiserson, and Rivest for the explaination of the subcases.
-   *
-   * @param node The node that we are fixing right now.
-   * @param fromLeft Did we come down from the left?
-   * @param parent Nodes' parent
-   * @param grandparent Parent's parent
-   * @param greatGrandparent Grandparent's parent
-   * @return Does parent also need to be checked and/or fixed?
-   */
-  private boolean add(int node, boolean fromLeft, int parent,
-                      int grandparent, int greatGrandparent) {
-    if (node == NULL) {
-      if (root == NULL) {
-        lastAdd = insert(NULL, NULL, false);
-        root = lastAdd;
-        wasAdd = true;
-        return false;
-      } else {
-        lastAdd = insert(NULL, NULL, true);
-        node = lastAdd;
-        wasAdd = true;
-        // connect the new node into the tree
-        if (fromLeft) {
-          setLeft(parent, node);
-        } else {
-          setRight(parent, node);
-        }
-      }
-    } else {
-      int compare = compareValue(node);
-      boolean keepGoing;
-
-      // Recurse down to find where the node needs to be added
-      if (compare < 0) {
-        keepGoing = add(getLeft(node), true, node, parent, grandparent);
-      } else if (compare > 0) {
-        keepGoing = add(getRight(node), false, node, parent, grandparent);
-      } else {
-        lastAdd = node;
-        wasAdd = false;
-        return false;
-      }
-
-      // we don't need to fix the root (because it is always set to black)
-      if (node == root || !keepGoing) {
-        return false;
-      }
-    }
-
-
-    // Do we need to fix this node? Only if there are two reds right under each
-    // other.
-    if (isRed(node) && isRed(parent)) {
-      if (parent == getLeft(grandparent)) {
-        int uncle = getRight(grandparent);
-        if (isRed(uncle)) {
-          // case 1.1
-          setRed(parent, false);
-          setRed(uncle, false);
-          setRed(grandparent, true);
-          return true;
-        } else {
-          if (node == getRight(parent)) {
-            // case 1.2
-            // swap node and parent
-            int tmp = node;
-            node = parent;
-            parent = tmp;
-            // left-rotate on node
-            setLeft(grandparent, parent);
-            setRight(node, getLeft(parent));
-            setLeft(parent, node);
-          }
-
-          // case 1.2 and 1.3
-          setRed(parent, false);
-          setRed(grandparent, true);
-
-          // right-rotate on grandparent
-          if (greatGrandparent == NULL) {
-            root = parent;
-          } else if (getLeft(greatGrandparent) == grandparent) {
-            setLeft(greatGrandparent, parent);
-          } else {
-            setRight(greatGrandparent, parent);
-          }
-          setLeft(grandparent, getRight(parent));
-          setRight(parent, grandparent);
-          return false;
-        }
-      } else {
-        int uncle = getLeft(grandparent);
-        if (isRed(uncle)) {
-          // case 2.1
-          setRed(parent, false);
-          setRed(uncle, false);
-          setRed(grandparent, true);
-          return true;
-        } else {
-          if (node == getLeft(parent)) {
-            // case 2.2
-            // swap node and parent
-            int tmp = node;
-            node = parent;
-            parent = tmp;
-            // right-rotate on node
-            setRight(grandparent, parent);
-            setLeft(node, getRight(parent));
-            setRight(parent, node);
-          }
-          // case 2.2 and 2.3
-          setRed(parent, false);
-          setRed(grandparent, true);
-          // left-rotate on grandparent
-          if (greatGrandparent == NULL) {
-            root = parent;
-          } else if (getRight(greatGrandparent) == grandparent) {
-            setRight(greatGrandparent, parent);
-          } else {
-            setLeft(greatGrandparent, parent);
-          }
-          setRight(grandparent, getLeft(parent));
-          setLeft(parent, grandparent);
-          return false;
-        }
-      }
-    } else {
-      return true;
-    }
-  }
-
-  /**
-   * Add the new key to the tree.
-   * @return true if the element is a new one.
-   */
-  protected boolean add() {
-    add(root, false, NULL, NULL, NULL);
-    if (wasAdd) {
-      setRed(root, false);
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Get the number of elements in the set.
-   */
-  public int size() {
-    return size;
-  }
-
-  /**
-   * Reset the table to empty.
-   */
-  public void clear() {
-    root = NULL;
-    size = 0;
-    data.clear();
-  }
-
-  /**
-   * Get the buffer size in bytes.
-   */
-  public long getSizeInBytes() {
-    return data.getSizeInBytes();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
deleted file mode 100644
index 0953cdd..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * A streamFactory that writes a sequence of bytes. A control byte is written before
- * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
- * bytes is -1 to -128, 1 to 128 literal byte values follow.
- */
-class RunLengthByteWriter {
-  static final int MIN_REPEAT_SIZE = 3;
-  static final int MAX_LITERAL_SIZE = 128;
-  static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
-  private final PositionedOutputStream output;
-  private final byte[] literals = new byte[MAX_LITERAL_SIZE];
-  private int numLiterals = 0;
-  private boolean repeat = false;
-  private int tailRunLength = 0;
-
-  RunLengthByteWriter(PositionedOutputStream output) {
-    this.output = output;
-  }
-
-  private void writeValues() throws IOException {
-    if (numLiterals != 0) {
-      if (repeat) {
-        output.write(numLiterals - MIN_REPEAT_SIZE);
-        output.write(literals, 0, 1);
-     } else {
-        output.write(-numLiterals);
-        output.write(literals, 0, numLiterals);
-      }
-      repeat = false;
-      tailRunLength = 0;
-      numLiterals = 0;
-    }
-  }
-
-  void flush() throws IOException {
-    writeValues();
-    output.flush();
-  }
-
-  void write(byte value) throws IOException {
-    if (numLiterals == 0) {
-      literals[numLiterals++] = value;
-      tailRunLength = 1;
-    } else if (repeat) {
-      if (value == literals[0]) {
-        numLiterals += 1;
-        if (numLiterals == MAX_REPEAT_SIZE) {
-          writeValues();
-        }
-      } else {
-        writeValues();
-        literals[numLiterals++] = value;
-        tailRunLength = 1;
-      }
-    } else {
-      if (value == literals[numLiterals - 1]) {
-        tailRunLength += 1;
-      } else {
-        tailRunLength = 1;
-      }
-      if (tailRunLength == MIN_REPEAT_SIZE) {
-        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
-          repeat = true;
-          numLiterals += 1;
-        } else {
-          numLiterals -= MIN_REPEAT_SIZE - 1;
-          writeValues();
-          literals[0] = value;
-          repeat = true;
-          numLiterals = MIN_REPEAT_SIZE;
-        }
-      } else {
-        literals[numLiterals++] = value;
-        if (numLiterals == MAX_LITERAL_SIZE) {
-          writeValues();
-        }
-      }
-    }
-  }
-
-  void getPosition(PositionRecorder recorder) throws IOException {
-    output.getPosition(recorder);
-    recorder.addPosition(numLiterals);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
deleted file mode 100644
index 867f041..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * A streamFactory that writes a sequence of integers. A control byte is written before
- * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each
- * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
- * literal vint values follow.
- */
-class RunLengthIntegerWriter implements IntegerWriter {
-  static final int MIN_REPEAT_SIZE = 3;
-  static final int MAX_DELTA = 127;
-  static final int MIN_DELTA = -128;
-  static final int MAX_LITERAL_SIZE = 128;
-  private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
-  private final PositionedOutputStream output;
-  private final boolean signed;
-  private final long[] literals = new long[MAX_LITERAL_SIZE];
-  private int numLiterals = 0;
-  private long delta = 0;
-  private boolean repeat = false;
-  private int tailRunLength = 0;
-  private SerializationUtils utils;
-
-  RunLengthIntegerWriter(PositionedOutputStream output,
-                         boolean signed) {
-    this.output = output;
-    this.signed = signed;
-    this.utils = new SerializationUtils();
-  }
-
-  private void writeValues() throws IOException {
-    if (numLiterals != 0) {
-      if (repeat) {
-        output.write(numLiterals - MIN_REPEAT_SIZE);
-        output.write((byte) delta);
-        if (signed) {
-          utils.writeVslong(output, literals[0]);
-        } else {
-          utils.writeVulong(output, literals[0]);
-        }
-      } else {
-        output.write(-numLiterals);
-        for(int i=0; i < numLiterals; ++i) {
-          if (signed) {
-            utils.writeVslong(output, literals[i]);
-          } else {
-            utils.writeVulong(output, literals[i]);
-          }
-        }
-      }
-      repeat = false;
-      numLiterals = 0;
-      tailRunLength = 0;
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    writeValues();
-    output.flush();
-  }
-
-  @Override
-  public void write(long value) throws IOException {
-    if (numLiterals == 0) {
-      literals[numLiterals++] = value;
-      tailRunLength = 1;
-    } else if (repeat) {
-      if (value == literals[0] + delta * numLiterals) {
-        numLiterals += 1;
-        if (numLiterals == MAX_REPEAT_SIZE) {
-          writeValues();
-        }
-      } else {
-        writeValues();
-        literals[numLiterals++] = value;
-        tailRunLength = 1;
-      }
-    } else {
-      if (tailRunLength == 1) {
-        delta = value - literals[numLiterals - 1];
-        if (delta < MIN_DELTA || delta > MAX_DELTA) {
-          tailRunLength = 1;
-        } else {
-          tailRunLength = 2;
-        }
-      } else if (value == literals[numLiterals - 1] + delta) {
-        tailRunLength += 1;
-      } else {
-        delta = value - literals[numLiterals - 1];
-        if (delta < MIN_DELTA || delta > MAX_DELTA) {
-          tailRunLength = 1;
-        } else {
-          tailRunLength = 2;
-        }
-      }
-      if (tailRunLength == MIN_REPEAT_SIZE) {
-        if (numLiterals + 1 == MIN_REPEAT_SIZE) {
-          repeat = true;
-          numLiterals += 1;
-        } else {
-          numLiterals -= MIN_REPEAT_SIZE - 1;
-          long base = literals[numLiterals];
-          writeValues();
-          literals[0] = base;
-          repeat = true;
-          numLiterals = MIN_REPEAT_SIZE;
-        }
-      } else {
-        literals[numLiterals++] = value;
-        if (numLiterals == MAX_LITERAL_SIZE) {
-          writeValues();
-        }
-      }
-    }
-  }
-
-  @Override
-  public void getPosition(PositionRecorder recorder) throws IOException {
-    output.getPosition(recorder);
-    recorder.addPosition(numLiterals);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
deleted file mode 100644
index 7237b2e..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
+++ /dev/null
@@ -1,832 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * A writer that performs light weight compression over sequence of integers.
- * <p>
- * There are four types of lightweight integer compression
- * <ul>
- * <li>SHORT_REPEAT</li>
- * <li>DIRECT</li>
- * <li>PATCHED_BASE</li>
- * <li>DELTA</li>
- * </ul>
- * </p>
- * The description and format for these types are as below:
- * <p>
- * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences.
- * <ul>
- * <li>1 byte header
- * <ul>
- * <li>2 bits for encoding type</li>
- * <li>3 bits for bytes required for repeating value</li>
- * <li>3 bits for repeat count (MIN_REPEAT + run length)</li>
- * </ul>
- * </li>
- * <li>Blob - repeat value (fixed bytes)</li>
- * </ul>
- * </p>
- * <p>
- * <b>DIRECT:</b> Used for random integer sequences whose number of bit
- * requirement doesn't vary a lot.
- * <ul>
- * <li>2 bytes header
- * <ul>
- * 1st byte
- * <li>2 bits for encoding type</li>
- * <li>5 bits for fixed bit width of values in blob</li>
- * <li>1 bit for storing MSB of run length</li>
- * </ul>
- * <ul>
- * 2nd byte
- * <li>8 bits for lower run length bits</li>
- * </ul>
- * </li>
- * <li>Blob - stores the direct values using fixed bit width. The length of the
- * data blob is (fixed width * run length) bits long</li>
- * </ul>
- * </p>
- * <p>
- * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit
- * requirement varies beyond a threshold.
- * <ul>
- * <li>4 bytes header
- * <ul>
- * 1st byte
- * <li>2 bits for encoding type</li>
- * <li>5 bits for fixed bit width of values in blob</li>
- * <li>1 bit for storing MSB of run length</li>
- * </ul>
- * <ul>
- * 2nd byte
- * <li>8 bits for lower run length bits</li>
- * </ul>
- * <ul>
- * 3rd byte
- * <li>3 bits for bytes required to encode base value</li>
- * <li>5 bits for patch width</li>
- * </ul>
- * <ul>
- * 4th byte
- * <li>3 bits for patch gap width</li>
- * <li>5 bits for patch length</li>
- * </ul>
- * </li>
- * <li>Base value - Stored using fixed number of bytes. If MSB is set, base
- * value is negative else positive. Length of base value is (base width * 8)
- * bits.</li>
- * <li>Data blob - Base reduced values as stored using fixed bit width. Length
- * of data blob is (fixed width * run length) bits.</li>
- * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in
- * the patch list is (patch width + patch gap width) bits long. Gap between the
- * subsequent elements to be patched are stored in upper part of entry whereas
- * patch values are stored in lower part of entry. Length of patch blob is
- * ((patch width + patch gap width) * patch length) bits.</li>
- * </ul>
- * </p>
- * <p>
- * <b>DELTA</b> Used for monotonically increasing or decreasing sequences,
- * sequences with fixed delta values or long repeated sequences.
- * <ul>
- * <li>2 bytes header
- * <ul>
- * 1st byte
- * <li>2 bits for encoding type</li>
- * <li>5 bits for fixed bit width of values in blob</li>
- * <li>1 bit for storing MSB of run length</li>
- * </ul>
- * <ul>
- * 2nd byte
- * <li>8 bits for lower run length bits</li>
- * </ul>
- * </li>
- * <li>Base value - encoded as varint</li>
- * <li>Delta base - encoded as varint</li>
- * <li>Delta blob - only positive values. monotonicity and orderness are decided
- * based on the sign of the base value and delta base</li>
- * </ul>
- * </p>
- */
-class RunLengthIntegerWriterV2 implements IntegerWriter {
-
-  public enum EncodingType {
-    SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
-  }
-
-  static final int MAX_SCOPE = 512;
-  static final int MIN_REPEAT = 3;
-  private static final int MAX_SHORT_REPEAT_LENGTH = 10;
-  private long prevDelta = 0;
-  private int fixedRunLength = 0;
-  private int variableRunLength = 0;
-  private final long[] literals = new long[MAX_SCOPE];
-  private final PositionedOutputStream output;
-  private final boolean signed;
-  private EncodingType encoding;
-  private int numLiterals;
-  private final long[] zigzagLiterals = new long[MAX_SCOPE];
-  private final long[] baseRedLiterals = new long[MAX_SCOPE];
-  private final long[] adjDeltas = new long[MAX_SCOPE];
-  private long fixedDelta;
-  private int zzBits90p;
-  private int zzBits100p;
-  private int brBits95p;
-  private int brBits100p;
-  private int bitsDeltaMax;
-  private int patchWidth;
-  private int patchGapWidth;
-  private int patchLength;
-  private long[] gapVsPatchList;
-  private long min;
-  private boolean isFixedDelta;
-  private SerializationUtils utils;
-  private boolean alignedBitpacking;
-
-  RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
-    this(output, signed, true);
-  }
-
-  RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
-      boolean alignedBitpacking) {
-    this.output = output;
-    this.signed = signed;
-    this.alignedBitpacking = alignedBitpacking;
-    this.utils = new SerializationUtils();
-    clear();
-  }
-
-  private void writeValues() throws IOException {
-    if (numLiterals != 0) {
-
-      if (encoding.equals(EncodingType.SHORT_REPEAT)) {
-        writeShortRepeatValues();
-      } else if (encoding.equals(EncodingType.DIRECT)) {
-        writeDirectValues();
-      } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
-        writePatchedBaseValues();
-      } else {
-        writeDeltaValues();
-      }
-
-      // clear all the variables
-      clear();
-    }
-  }
-
-  private void writeDeltaValues() throws IOException {
-    int len = 0;
-    int fb = bitsDeltaMax;
-    int efb = 0;
-
-    if (alignedBitpacking) {
-      fb = utils.getClosestAlignedFixedBits(fb);
-    }
-
-    if (isFixedDelta) {
-      // if fixed run length is greater than threshold then it will be fixed
-      // delta sequence with delta value 0 else fixed delta sequence with
-      // non-zero delta value
-      if (fixedRunLength > MIN_REPEAT) {
-        // ex. sequence: 2 2 2 2 2 2 2 2
-        len = fixedRunLength - 1;
-        fixedRunLength = 0;
-      } else {
-        // ex. sequence: 4 6 8 10 12 14 16
-        len = variableRunLength - 1;
-        variableRunLength = 0;
-      }
-    } else {
-      // fixed width 0 is used for long repeating values.
-      // sequences that require only 1 bit to encode will have an additional bit
-      if (fb == 1) {
-        fb = 2;
-      }
-      efb = utils.encodeBitWidth(fb);
-      efb = efb << 1;
-      len = variableRunLength - 1;
-      variableRunLength = 0;
-    }
-
-    // extract the 9th bit of run length
-    final int tailBits = (len & 0x100) >>> 8;
-
-    // create first byte of the header
-    final int headerFirstByte = getOpcode() | efb | tailBits;
-
-    // second byte of the header stores the remaining 8 bits of runlength
-    final int headerSecondByte = len & 0xff;
-
-    // write header
-    output.write(headerFirstByte);
-    output.write(headerSecondByte);
-
-    // store the first value from zigzag literal array
-    if (signed) {
-      utils.writeVslong(output, literals[0]);
-    } else {
-      utils.writeVulong(output, literals[0]);
-    }
-
-    if (isFixedDelta) {
-      // if delta is fixed then we don't need to store delta blob
-      utils.writeVslong(output, fixedDelta);
-    } else {
-      // store the first value as delta value using zigzag encoding
-      utils.writeVslong(output, adjDeltas[0]);
-
-      // adjacent delta values are bit packed. The length of adjDeltas array is
-      // always one less than the number of literals (delta difference for n
-      // elements is n-1). We have already written one element, write the
-      // remaining numLiterals - 2 elements here
-      utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output);
-    }
-  }
-
-  private void writePatchedBaseValues() throws IOException {
-
-    // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
-    // because patch is applied to MSB bits. For example: If fixed bit width of
-    // base value is 7 bits and if patch is 3 bits, the actual value is
-    // constructed by shifting the patch to left by 7 positions.
-    // actual_value = patch << 7 | base_value
-    // So, if we align base_value then actual_value can not be reconstructed.
-
-    // write the number of fixed bits required in next 5 bits
-    final int fb = brBits95p;
-    final int efb = utils.encodeBitWidth(fb) << 1;
-
-    // adjust variable run length, they are one off
-    variableRunLength -= 1;
-
-    // extract the 9th bit of run length
-    final int tailBits = (variableRunLength & 0x100) >>> 8;
-
-    // create first byte of the header
-    final int headerFirstByte = getOpcode() | efb | tailBits;
-
-    // second byte of the header stores the remaining 8 bits of runlength
-    final int headerSecondByte = variableRunLength & 0xff;
-
-    // if the min value is negative toggle the sign
-    final boolean isNegative = min < 0 ? true : false;
-    if (isNegative) {
-      min = -min;
-    }
-
-    // find the number of bytes required for base and shift it by 5 bits
-    // to accommodate patch width. The additional bit is used to store the sign
-    // of the base value.
-    final int baseWidth = utils.findClosestNumBits(min) + 1;
-    final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
-    final int bb = (baseBytes - 1) << 5;
-
-    // if the base value is negative then set MSB to 1
-    if (isNegative) {
-      min |= (1L << ((baseBytes * 8) - 1));
-    }
-
-    // third byte contains 3 bits for number of bytes occupied by base
-    // and 5 bits for patchWidth
-    final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
-
-    // fourth byte contains 3 bits for page gap width and 5 bits for
-    // patch length
-    final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
-
-    // write header
-    output.write(headerFirstByte);
-    output.write(headerSecondByte);
-    output.write(headerThirdByte);
-    output.write(headerFourthByte);
-
-    // write the base value using fixed bytes in big endian order
-    for(int i = baseBytes - 1; i >= 0; i--) {
-      byte b = (byte) ((min >>> (i * 8)) & 0xff);
-      output.write(b);
-    }
-
-    // base reduced literals are bit packed
-    int closestFixedBits = utils.getClosestFixedBits(fb);
-
-    utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits,
-        output);
-
-    // write patch list
-    closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
-
-    utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
-        output);
-
-    // reset run length
-    variableRunLength = 0;
-  }
-
-  /**
-   * Store the opcode in 2 MSB bits
-   * @return opcode
-   */
-  private int getOpcode() {
-    return encoding.ordinal() << 6;
-  }
-
-  private void writeDirectValues() throws IOException {
-
-    // write the number of fixed bits required in next 5 bits
-    int fb = zzBits100p;
-
-    if (alignedBitpacking) {
-      fb = utils.getClosestAlignedFixedBits(fb);
-    }
-
-    final int efb = utils.encodeBitWidth(fb) << 1;
-
-    // adjust variable run length
-    variableRunLength -= 1;
-
-    // extract the 9th bit of run length
-    final int tailBits = (variableRunLength & 0x100) >>> 8;
-
-    // create first byte of the header
-    final int headerFirstByte = getOpcode() | efb | tailBits;
-
-    // second byte of the header stores the remaining 8 bits of runlength
-    final int headerSecondByte = variableRunLength & 0xff;
-
-    // write header
-    output.write(headerFirstByte);
-    output.write(headerSecondByte);
-
-    // bit packing the zigzag encoded literals
-    utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output);
-
-    // reset run length
-    variableRunLength = 0;
-  }
-
-  private void writeShortRepeatValues() throws IOException {
-    // get the value that is repeating, compute the bits and bytes required
-    long repeatVal = 0;
-    if (signed) {
-      repeatVal = utils.zigzagEncode(literals[0]);
-    } else {
-      repeatVal = literals[0];
-    }
-
-    final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
-    final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
-        : (numBitsRepeatVal >>> 3) + 1;
-
-    // write encoding type in top 2 bits
-    int header = getOpcode();
-
-    // write the number of bytes required for the value
-    header |= ((numBytesRepeatVal - 1) << 3);
-
-    // write the run length
-    fixedRunLength -= MIN_REPEAT;
-    header |= fixedRunLength;
-
-    // write the header
-    output.write(header);
-
-    // write the repeating value in big endian byte order
-    for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
-      int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
-      output.write(b);
-    }
-
-    fixedRunLength = 0;
-  }
-
-  private void determineEncoding() {
-
-    // we need to compute zigzag values for DIRECT encoding if we decide to
-    // break early for delta overflows or for shorter runs
-    computeZigZagLiterals();
-
-    zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
-
-    // not a big win for shorter runs to determine encoding
-    if (numLiterals <= MIN_REPEAT) {
-      encoding = EncodingType.DIRECT;
-      return;
-    }
-
-    // DELTA encoding check
-
-    // for identifying monotonic sequences
-    boolean isIncreasing = true;
-    boolean isDecreasing = true;
-    this.isFixedDelta = true;
-
-    this.min = literals[0];
-    long max = literals[0];
-    final long initialDelta = literals[1] - literals[0];
-    long currDelta = initialDelta;
-    long deltaMax = initialDelta;
-    this.adjDeltas[0] = initialDelta;
-
-    for (int i = 1; i < numLiterals; i++) {
-      final long l1 = literals[i];
-      final long l0 = literals[i - 1];
-      currDelta = l1 - l0;
-      min = Math.min(min, l1);
-      max = Math.max(max, l1);
-
-      isIncreasing &= (l0 <= l1);
-      isDecreasing &= (l0 >= l1);
-
-      isFixedDelta &= (currDelta == initialDelta);
-      if (i > 1) {
-        adjDeltas[i - 1] = Math.abs(currDelta);
-        deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
-      }
-    }
-
-    // its faster to exit under delta overflow condition without checking for
-    // PATCHED_BASE condition as encoding using DIRECT is faster and has less
-    // overhead than PATCHED_BASE
-    if (!utils.isSafeSubtract(max, min)) {
-      encoding = EncodingType.DIRECT;
-      return;
-    }
-
-    // invariant - subtracting any number from any other in the literals after
-    // this point won't overflow
-
-    // if initialDelta is 0 then we cannot delta encode as we cannot identify
-    // the sign of deltas (increasing or decreasing)
-    if (initialDelta != 0) {
-
-      // if min is equal to max then the delta is 0, this condition happens for
-      // fixed values run >10 which cannot be encoded with SHORT_REPEAT
-      if (min == max) {
-        assert isFixedDelta : min + "==" + max +
-            ", isFixedDelta cannot be false";
-        assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
-        fixedDelta = 0;
-        encoding = EncodingType.DELTA;
-        return;
-      }
-
-      if (isFixedDelta) {
-        assert currDelta == initialDelta
-            : "currDelta should be equal to initialDelta for fixed delta encoding";
-        encoding = EncodingType.DELTA;
-        fixedDelta = currDelta;
-        return;
-      }
-
-      // stores the number of bits required for packing delta blob in
-      // delta encoding
-      bitsDeltaMax = utils.findClosestNumBits(deltaMax);
-
-      // monotonic condition
-      if (isIncreasing || isDecreasing) {
-        encoding = EncodingType.DELTA;
-        return;
-      }
-    }
-
-    // PATCHED_BASE encoding check
-
-    // percentile values are computed for the zigzag encoded values. if the
-    // number of bit requirement between 90th and 100th percentile varies
-    // beyond a threshold then we need to patch the values. if the variation
-    // is not significant then we can use direct encoding
-
-    zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
-    int diffBitsLH = zzBits100p - zzBits90p;
-
-    // if the difference between 90th percentile and 100th percentile fixed
-    // bits is > 1 then we need patch the values
-    if (diffBitsLH > 1) {
-
-      // patching is done only on base reduced values.
-      // remove base from literals
-      for (int i = 0; i < numLiterals; i++) {
-        baseRedLiterals[i] = literals[i] - min;
-      }
-
-      // 95th percentile width is used to determine max allowed value
-      // after which patching will be done
-      brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
-
-      // 100th percentile is used to compute the max patch width
-      brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
-
-      // after base reducing the values, if the difference in bits between
-      // 95th percentile and 100th percentile value is zero then there
-      // is no point in patching the values, in which case we will
-      // fallback to DIRECT encoding.
-      // The decision to use patched base was based on zigzag values, but the
-      // actual patching is done on base reduced literals.
-      if ((brBits100p - brBits95p) != 0) {
-        encoding = EncodingType.PATCHED_BASE;
-        preparePatchedBlob();
-        return;
-      } else {
-        encoding = EncodingType.DIRECT;
-        return;
-      }
-    } else {
-      // if difference in bits between 95th percentile and 100th percentile is
-      // 0, then patch length will become 0. Hence we will fallback to direct
-      encoding = EncodingType.DIRECT;
-      return;
-    }
-  }
-
-  private void computeZigZagLiterals() {
-    // populate zigzag encoded literals
-    long zzEncVal = 0;
-    for (int i = 0; i < numLiterals; i++) {
-      if (signed) {
-        zzEncVal = utils.zigzagEncode(literals[i]);
-      } else {
-        zzEncVal = literals[i];
-      }
-      zigzagLiterals[i] = zzEncVal;
-    }
-  }
-
-  private void preparePatchedBlob() {
-    // mask will be max value beyond which patch will be generated
-    long mask = (1L << brBits95p) - 1;
-
-    // since we are considering only 95 percentile, the size of gap and
-    // patch array can contain only be 5% values
-    patchLength = (int) Math.ceil((numLiterals * 0.05));
-
-    int[] gapList = new int[patchLength];
-    long[] patchList = new long[patchLength];
-
-    // #bit for patch
-    patchWidth = brBits100p - brBits95p;
-    patchWidth = utils.getClosestFixedBits(patchWidth);
-
-    // if patch bit requirement is 64 then it will not possible to pack
-    // gap and patch together in a long. To make sure gap and patch can be
-    // packed together adjust the patch width
-    if (patchWidth == 64) {
-      patchWidth = 56;
-      brBits95p = 8;
-      mask = (1L << brBits95p) - 1;
-    }
-
-    int gapIdx = 0;
-    int patchIdx = 0;
-    int prev = 0;
-    int gap = 0;
-    int maxGap = 0;
-
-    for(int i = 0; i < numLiterals; i++) {
-      // if value is above mask then create the patch and record the gap
-      if (baseRedLiterals[i] > mask) {
-        gap = i - prev;
-        if (gap > maxGap) {
-          maxGap = gap;
-        }
-
-        // gaps are relative, so store the previous patched value index
-        prev = i;
-        gapList[gapIdx++] = gap;
-
-        // extract the most significant bits that are over mask bits
-        long patch = baseRedLiterals[i] >>> brBits95p;
-        patchList[patchIdx++] = patch;
-
-        // strip off the MSB to enable safe bit packing
-        baseRedLiterals[i] &= mask;
-      }
-    }
-
-    // adjust the patch length to number of entries in gap list
-    patchLength = gapIdx;
-
-    // if the element to be patched is the first and only element then
-    // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
-    if (maxGap == 0 && patchLength != 0) {
-      patchGapWidth = 1;
-    } else {
-      patchGapWidth = utils.findClosestNumBits(maxGap);
-    }
-
-    // special case: if the patch gap width is greater than 256, then
-    // we need 9 bits to encode the gap width. But we only have 3 bits in
-    // header to record the gap width. To deal with this case, we will save
-    // two entries in patch list in the following way
-    // 256 gap width => 0 for patch value
-    // actual gap - 256 => actual patch value
-    // We will do the same for gap width = 511. If the element to be patched is
-    // the last element in the scope then gap width will be 511. In this case we
-    // will have 3 entries in the patch list in the following way
-    // 255 gap width => 0 for patch value
-    // 255 gap width => 0 for patch value
-    // 1 gap width => actual patch value
-    if (patchGapWidth > 8) {
-      patchGapWidth = 8;
-      // for gap = 511, we need two additional entries in patch list
-      if (maxGap == 511) {
-        patchLength += 2;
-      } else {
-        patchLength += 1;
-      }
-    }
-
-    // create gap vs patch list
-    gapIdx = 0;
-    patchIdx = 0;
-    gapVsPatchList = new long[patchLength];
-    for(int i = 0; i < patchLength; i++) {
-      long g = gapList[gapIdx++];
-      long p = patchList[patchIdx++];
-      while (g > 255) {
-        gapVsPatchList[i++] = (255L << patchWidth);
-        g -= 255;
-      }
-
-      // store patch value in LSBs and gap in MSBs
-      gapVsPatchList[i] = (g << patchWidth) | p;
-    }
-  }
-
-  /**
-   * clears all the variables
-   */
-  private void clear() {
-    numLiterals = 0;
-    encoding = null;
-    prevDelta = 0;
-    fixedDelta = 0;
-    zzBits90p = 0;
-    zzBits100p = 0;
-    brBits95p = 0;
-    brBits100p = 0;
-    bitsDeltaMax = 0;
-    patchGapWidth = 0;
-    patchLength = 0;
-    patchWidth = 0;
-    gapVsPatchList = null;
-    min = 0;
-    isFixedDelta = true;
-  }
-
-  @Override
-  public void flush() throws IOException {
-    if (numLiterals != 0) {
-      if (variableRunLength != 0) {
-        determineEncoding();
-        writeValues();
-      } else if (fixedRunLength != 0) {
-        if (fixedRunLength < MIN_REPEAT) {
-          variableRunLength = fixedRunLength;
-          fixedRunLength = 0;
-          determineEncoding();
-          writeValues();
-        } else if (fixedRunLength >= MIN_REPEAT
-            && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
-          encoding = EncodingType.SHORT_REPEAT;
-          writeValues();
-        } else {
-          encoding = EncodingType.DELTA;
-          isFixedDelta = true;
-          writeValues();
-        }
-      }
-    }
-    output.flush();
-  }
-
-  @Override
-  public void write(long val) throws IOException {
-    if (numLiterals == 0) {
-      initializeLiterals(val);
-    } else {
-      if (numLiterals == 1) {
-        prevDelta = val - literals[0];
-        literals[numLiterals++] = val;
-        // if both values are same count as fixed run else variable run
-        if (val == literals[0]) {
-          fixedRunLength = 2;
-          variableRunLength = 0;
-        } else {
-          fixedRunLength = 0;
-          variableRunLength = 2;
-        }
-      } else {
-        long currentDelta = val - literals[numLiterals - 1];
-        if (prevDelta == 0 && currentDelta == 0) {
-          // fixed delta run
-
-          literals[numLiterals++] = val;
-
-          // if variable run is non-zero then we are seeing repeating
-          // values at the end of variable run in which case keep
-          // updating variable and fixed runs
-          if (variableRunLength > 0) {
-            fixedRunLength = 2;
-          }
-          fixedRunLength += 1;
-
-          // if fixed run met the minimum condition and if variable
-          // run is non-zero then flush the variable run and shift the
-          // tail fixed runs to start of the buffer
-          if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
-            numLiterals -= MIN_REPEAT;
-            variableRunLength -= MIN_REPEAT - 1;
-            // copy the tail fixed runs
-            long[] tailVals = new long[MIN_REPEAT];
-            System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
-
-            // determine variable encoding and flush values
-            determineEncoding();
-            writeValues();
-
-            // shift tail fixed runs to beginning of the buffer
-            for(long l : tailVals) {
-              literals[numLiterals++] = l;
-            }
-          }
-
-          // if fixed runs reached max repeat length then write values
-          if (fixedRunLength == MAX_SCOPE) {
-            determineEncoding();
-            writeValues();
-          }
-        } else {
-          // variable delta run
-
-          // if fixed run length is non-zero and if it satisfies the
-          // short repeat conditions then write the values as short repeats
-          // else use delta encoding
-          if (fixedRunLength >= MIN_REPEAT) {
-            if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
-              encoding = EncodingType.SHORT_REPEAT;
-              writeValues();
-            } else {
-              encoding = EncodingType.DELTA;
-              isFixedDelta = true;
-              writeValues();
-            }
-          }
-
-          // if fixed run length is <MIN_REPEAT and current value is
-          // different from previous then treat it as variable run
-          if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) {
-            if (val != literals[numLiterals - 1]) {
-              variableRunLength = fixedRunLength;
-              fixedRunLength = 0;
-            }
-          }
-
-          // after writing values re-initialize the variables
-          if (numLiterals == 0) {
-            initializeLiterals(val);
-          } else {
-            // keep updating variable run lengths
-            prevDelta = val - literals[numLiterals - 1];
-            literals[numLiterals++] = val;
-            variableRunLength += 1;
-
-            // if variable run length reach the max scope, write it
-            if (variableRunLength == MAX_SCOPE) {
-              determineEncoding();
-              writeValues();
-            }
-          }
-        }
-      }
-    }
-  }
-
-  private void initializeLiterals(long val) {
-    literals[numLiterals++] = val;
-    fixedRunLength = 1;
-    variableRunLength = 1;
-  }
-
-  @Override
-  public void getPosition(PositionRecorder recorder) throws IOException {
-    output.getPosition(recorder);
-    recorder.addPosition(numLiterals);
-  }
-}


Mime
View raw message