Return-Path:
X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org
Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id BB0C218FEF
for ;
Wed, 23 Mar 2016 01:41:46 +0000 (UTC)
Received: (qmail 19306 invoked by uid 500); 23 Mar 2016 01:41:46 -0000
Delivered-To: apmail-tajo-commits-archive@tajo.apache.org
Received: (qmail 19229 invoked by uid 500); 23 Mar 2016 01:41:46 -0000
Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@tajo.apache.org
Delivered-To: mailing list commits@tajo.apache.org
Received: (qmail 19079 invoked by uid 99); 23 Mar 2016 01:41:46 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org)
(140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Mar 2016 01:41:46 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at
git1-us-west.apache.org, from userid 33)
id 609C2E97A6; Wed, 23 Mar 2016 01:41:46 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: jihoonson@apache.org
To: commits@tajo.apache.org
Date: Wed, 23 Mar 2016 01:41:49 -0000
Message-Id: <3d746f9da2b14e84baa987a15ce3c32f@git.apache.org>
In-Reply-To: <2920c5652dd84ceaa2bf594806b32d07@git.apache.org>
References: <2920c5652dd84ceaa2bf594806b32d07@git.apache.org>
X-Mailer: ASF-Git Admin Mailer
Subject: [4/7] tajo git commit: TAJO-2102: Migrate to Apache Orc from Presto's
one.
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
deleted file mode 100644
index f6cfd57..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-class OutStream extends PositionedOutputStream {
-
- interface OutputReceiver {
- /**
- * Output the given buffer to the final destination
- * @param buffer the buffer to output
- * @throws IOException
- */
- void output(ByteBuffer buffer) throws IOException;
- }
-
- static final int HEADER_SIZE = 3;
- private final String name;
- private final OutputReceiver receiver;
- // if enabled the stream will be suppressed when writing stripe
- private boolean suppress;
-
- /**
- * Stores the uncompressed bytes that have been serialized, but not
- * compressed yet. When this fills, we compress the entire buffer.
- */
- private ByteBuffer current = null;
-
- /**
- * Stores the compressed bytes until we have a full buffer and then outputs
- * them to the receiver. If no compression is being done, this (and overflow)
- * will always be null and the current buffer will be sent directly to the
- * receiver.
- */
- private ByteBuffer compressed = null;
-
- /**
- * Since the compressed buffer may start with contents from previous
- * compression blocks, we allocate an overflow buffer so that the
- * output of the codec can be split between the two buffers. After the
- * compressed buffer is sent to the receiver, the overflow buffer becomes
- * the new compressed buffer.
- */
- private ByteBuffer overflow = null;
- private final int bufferSize;
- private final CompressionCodec codec;
- private long compressedBytes = 0;
- private long uncompressedBytes = 0;
-
- OutStream(String name,
- int bufferSize,
- CompressionCodec codec,
- OutputReceiver receiver) throws IOException {
- this.name = name;
- this.bufferSize = bufferSize;
- this.codec = codec;
- this.receiver = receiver;
- this.suppress = false;
- }
-
- public void clear() throws IOException {
- flush();
- suppress = false;
- }
-
- /**
- * Write the length of the compressed bytes. Life is much easier if the
- * header is constant length, so just use 3 bytes. Considering most of the
- * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should
- * be plenty. We also use the low bit for whether it is the original or
- * compressed bytes.
- * @param buffer the buffer to write the header to
- * @param position the position in the buffer to write at
- * @param val the size in the file
- * @param original is it uncompressed
- */
- private static void writeHeader(ByteBuffer buffer,
- int position,
- int val,
- boolean original) {
- buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0)));
- buffer.put(position + 1, (byte) (val >> 7));
- buffer.put(position + 2, (byte) (val >> 15));
- }
-
- private void getNewInputBuffer() throws IOException {
- if (codec == null) {
- current = ByteBuffer.allocate(bufferSize);
- } else {
- current = ByteBuffer.allocate(bufferSize + HEADER_SIZE);
- writeHeader(current, 0, bufferSize, true);
- current.position(HEADER_SIZE);
- }
- }
-
- /**
- * Allocate a new output buffer if we are compressing.
- */
- private ByteBuffer getNewOutputBuffer() throws IOException {
- return ByteBuffer.allocate(bufferSize + HEADER_SIZE);
- }
-
- private void flip() throws IOException {
- current.limit(current.position());
- current.position(codec == null ? 0 : HEADER_SIZE);
- }
-
- @Override
- public void write(int i) throws IOException {
- if (current == null) {
- getNewInputBuffer();
- }
- if (current.remaining() < 1) {
- spill();
- }
- uncompressedBytes += 1;
- current.put((byte) i);
- }
-
- @Override
- public void write(byte[] bytes, int offset, int length) throws IOException {
- if (current == null) {
- getNewInputBuffer();
- }
- int remaining = Math.min(current.remaining(), length);
- current.put(bytes, offset, remaining);
- uncompressedBytes += remaining;
- length -= remaining;
- while (length != 0) {
- spill();
- offset += remaining;
- remaining = Math.min(current.remaining(), length);
- current.put(bytes, offset, remaining);
- uncompressedBytes += remaining;
- length -= remaining;
- }
- }
-
- private void spill() throws IOException {
- // if there isn't anything in the current buffer, don't spill
- if (current == null ||
- current.position() == (codec == null ? 0 : HEADER_SIZE)) {
- return;
- }
- flip();
- if (codec == null) {
- receiver.output(current);
- getNewInputBuffer();
- } else {
- if (compressed == null) {
- compressed = getNewOutputBuffer();
- } else if (overflow == null) {
- overflow = getNewOutputBuffer();
- }
- int sizePosn = compressed.position();
- compressed.position(compressed.position() + HEADER_SIZE);
- if (codec.compress(current, compressed, overflow)) {
- uncompressedBytes = 0;
- // move position back to after the header
- current.position(HEADER_SIZE);
- current.limit(current.capacity());
- // find the total bytes in the chunk
- int totalBytes = compressed.position() - sizePosn - HEADER_SIZE;
- if (overflow != null) {
- totalBytes += overflow.position();
- }
- compressedBytes += totalBytes + HEADER_SIZE;
- writeHeader(compressed, sizePosn, totalBytes, false);
- // if we have less than the next header left, spill it.
- if (compressed.remaining() < HEADER_SIZE) {
- compressed.flip();
- receiver.output(compressed);
- compressed = overflow;
- overflow = null;
- }
- } else {
- compressedBytes += uncompressedBytes + HEADER_SIZE;
- uncompressedBytes = 0;
- // we are using the original, but need to spill the current
- // compressed buffer first. So back up to where we started,
- // flip it and add it to done.
- if (sizePosn != 0) {
- compressed.position(sizePosn);
- compressed.flip();
- receiver.output(compressed);
- compressed = null;
- // if we have an overflow, clear it and make it the new compress
- // buffer
- if (overflow != null) {
- overflow.clear();
- compressed = overflow;
- overflow = null;
- }
- } else {
- compressed.clear();
- if (overflow != null) {
- overflow.clear();
- }
- }
-
- // now add the current buffer into the done list and get a new one.
- current.position(0);
- // update the header with the current length
- writeHeader(current, 0, current.limit() - HEADER_SIZE, true);
- receiver.output(current);
- getNewInputBuffer();
- }
- }
- }
-
- void getPosition(PositionRecorder recorder) throws IOException {
- if (codec == null) {
- recorder.addPosition(uncompressedBytes);
- } else {
- recorder.addPosition(compressedBytes);
- recorder.addPosition(uncompressedBytes);
- }
- }
-
- @Override
- public void flush() throws IOException {
- spill();
- if (compressed != null && compressed.position() != 0) {
- compressed.flip();
- receiver.output(compressed);
- compressed = null;
- }
- uncompressedBytes = 0;
- compressedBytes = 0;
- overflow = null;
- current = null;
- }
-
- @Override
- public String toString() {
- return name;
- }
-
- @Override
- public long getBufferSize() {
- long result = 0;
- if (current != null) {
- result += current.capacity();
- }
- if (compressed != null) {
- result += compressed.capacity();
- }
- if (overflow != null) {
- result += overflow.capacity();
- }
- return result;
- }
-
- /**
- * Set suppress flag
- */
- public void suppress() {
- suppress = true;
- }
-
- /**
- * Returns the state of suppress flag
- * @return value of suppress flag
- */
- public boolean isSuppressed() {
- return suppress;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
deleted file mode 100644
index a39926e..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * An interface for recording positions in a stream.
- */
-interface PositionRecorder {
- void addPosition(long offset);
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
deleted file mode 100644
index 748c98c..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-abstract class PositionedOutputStream extends OutputStream {
-
- /**
- * Record the current position to the recorder.
- * @param recorder the object that receives the position
- * @throws IOException
- */
- abstract void getPosition(PositionRecorder recorder) throws IOException;
-
- /**
- * Get the memory size currently allocated as buffer associated with this
- * stream.
- * @return the number of bytes used by buffers.
- */
- abstract long getBufferSize();
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java
new file mode 100644
index 0000000..bc882e0
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.orc;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.OrcProto;
+import org.apache.orc.impl.BufferChunk;
+import org.apache.orc.impl.DirectDecompressionCodec;
+import org.apache.orc.impl.OutStream;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RecordReaderUtils {
+
+ public static class DefaultDataReader implements DataReader {
+ private FSDataInputStream file;
+ private ByteBufferAllocatorPool pool;
+ private ZeroCopyAdapter zcr;
+ private FileSystem fs;
+ private Path path;
+ private boolean useZeroCopy;
+ private CompressionCodec codec;
+ private long readBytes = 0;
+
+ public DefaultDataReader(
+ FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
+ this.fs = fs;
+ this.path = path;
+ this.useZeroCopy = useZeroCopy;
+ this.codec = codec;
+ }
+
+ @Override
+ public void open() throws IOException {
+ this.file = fs.open(path);
+ if (useZeroCopy) {
+ pool = new ByteBufferAllocatorPool();
+ zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
+ } else {
+ pool = null;
+ zcr = null;
+ }
+ }
+
+ @Override
+ public DiskRangeList readFileData(
+ DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
+ return readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (file != null) {
+ file.close();
+ }
+ if (pool != null) {
+ pool.clear();
+ }
+ }
+
+ @Override
+ public boolean isTrackingDiskRanges() {
+ return zcr != null;
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ zcr.releaseBuffer(buffer);
+ }
+
+ public long getReadBytes() {
+ return readBytes;
+ }
+
+ /**
+ * Read the list of ranges from the file.
+ * @param file the file to read
+ * @param base the base of the stripe
+ * @param range the disk ranges within the stripe to read
+ * @return the bytes read for each disk range, which is the same length as
+ * ranges
+ * @throws IOException
+ */
+ private DiskRangeList readDiskRanges(FSDataInputStream file,
+ ZeroCopyAdapter zcr,
+ long base,
+ DiskRangeList range,
+ boolean doForceDirect) throws IOException {
+ if (range == null) return null;
+ DiskRangeList prev = range.prev;
+ if (prev == null) {
+ prev = new DiskRangeList.MutateHelper(range);
+ }
+ while (range != null) {
+ if (range.hasData()) {
+ range = range.next;
+ continue;
+ }
+ int len = (int) (range.getEnd() - range.getOffset());
+ long off = range.getOffset();
+ if (zcr != null) {
+ file.seek(base + off);
+ boolean hasReplaced = false;
+ while (len > 0) {
+ ByteBuffer partial = zcr.readBuffer(len, false);
+ readBytes += partial.remaining();
+ BufferChunk bc = new BufferChunk(partial, off);
+ if (!hasReplaced) {
+ range.replaceSelfWith(bc);
+ hasReplaced = true;
+ } else {
+ range.insertAfter(bc);
+ }
+ range = bc;
+ int read = partial.remaining();
+ len -= read;
+ off += read;
+ }
+ } else {
+ // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
+ byte[] buffer = new byte[len];
+ file.readFully((base + off), buffer, 0, buffer.length);
+ readBytes += buffer.length;
+ ByteBuffer bb = null;
+ if (doForceDirect) {
+ bb = ByteBuffer.allocateDirect(len);
+ bb.put(buffer);
+ bb.position(0);
+ bb.limit(len);
+ } else {
+ bb = ByteBuffer.wrap(buffer);
+ }
+ range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
+ }
+ range = range.next;
+ }
+ return prev.next;
+ }
+ }
+
+ public static DataReader createDefaultDataReader(
+ FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
+ return new DefaultDataReader(fs, path, useZeroCopy, codec);
+ }
+
+ public static boolean[] findPresentStreamsByColumn(
+ List streamList, List types) {
+ boolean[] hasNull = new boolean[types.size()];
+ for(OrcProto.Stream stream: streamList) {
+ if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
+ hasNull[stream.getColumn()] = true;
+ }
+ }
+ return hasNull;
+ }
+
+ /**
+ * Does region A overlap region B? The end points are inclusive on both sides.
+ * @param leftA A's left point
+ * @param rightA A's right point
+ * @param leftB B's left point
+ * @param rightB B's right point
+ * @return Does region A overlap region B?
+ */
+ static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+ if (leftA <= leftB) {
+ return rightA >= leftB;
+ }
+ return rightB >= leftA;
+ }
+
+ public static void addEntireStreamToRanges(
+ long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
+ list.addOrMerge(offset, offset + length, doMergeBuffers, false);
+ }
+
+ public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
+ boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
+ OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
+ long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
+ for (int group = 0; group < includedRowGroups.length; ++group) {
+ if (!includedRowGroups[group]) continue;
+ int posn = getIndexPosition(
+ encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
+ long start = index.getEntry(group).getPositions(posn);
+ final long nextGroupOffset;
+ boolean isLast = group == (includedRowGroups.length - 1);
+ nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
+
+ start += offset;
+ long end = offset + estimateRgEndOffset(
+ isCompressed, isLast, nextGroupOffset, length, compressionSize);
+ list.addOrMerge(start, end, doMergeBuffers, true);
+ }
+ }
+
+ public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
+ long nextGroupOffset, long streamLength, int bufferSize) {
+ // figure out the worst case last location
+ // if adjacent groups have the same compressed block offset then stretch the slop
+ // by factor of 2 to safely accommodate the next compression block.
+ // One for the current compression block and another for the next compression block.
+ long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
+ return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
+ }
+
+ private static final int BYTE_STREAM_POSITIONS = 1;
+ private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+ private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+ private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
+
+ /**
+ * Get the offset in the index positions for the column that the given
+ * stream starts.
+ * @param columnEncoding the encoding of the column
+ * @param columnType the type of the column
+ * @param streamType the kind of the stream
+ * @param isCompressed is the file compressed
+ * @param hasNulls does the column have a PRESENT stream?
+ * @return the number of positions that will be used for that stream
+ */
+ public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
+ OrcProto.Type.Kind columnType,
+ OrcProto.Stream.Kind streamType,
+ boolean isCompressed,
+ boolean hasNulls) {
+ if (streamType == OrcProto.Stream.Kind.PRESENT) {
+ return 0;
+ }
+ int compressionValue = isCompressed ? 1 : 0;
+ int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+ switch (columnType) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case STRUCT:
+ case MAP:
+ case LIST:
+ case UNION:
+ return base;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+ columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+ return base;
+ } else {
+ if (streamType == OrcProto.Stream.Kind.DATA) {
+ return base;
+ } else {
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ }
+ }
+ case BINARY:
+ if (streamType == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ case DECIMAL:
+ if (streamType == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ case TIMESTAMP:
+ if (streamType == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+ default:
+ throw new IllegalArgumentException("Unknown type " + columnType);
+ }
+ }
+
+ // for uncompressed streams, what is the most overlap with the following set
+ // of rows (long vint literal group).
+ static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+ /**
+ * Is this stream part of a dictionary?
+ * @return is this part of a dictionary?
+ */
+ public static boolean isDictionary(OrcProto.Stream.Kind kind,
+ OrcProto.ColumnEncoding encoding) {
+ assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
+ OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+ return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+ (kind == OrcProto.Stream.Kind.LENGTH &&
+ (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+ encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+ }
+
+ /**
+ * Build a string representation of a list of disk ranges.
+ * @param range ranges to stringify
+ * @return the resulting string
+ */
+ public static String stringifyDiskRanges(DiskRangeList range) {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("[");
+ boolean isFirst = true;
+ while (range != null) {
+ if (!isFirst) {
+ buffer.append(", {");
+ } else {
+ buffer.append("{");
+ }
+ isFirst = false;
+ buffer.append(range.toString());
+ buffer.append("}");
+ range = range.next;
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ public static List getStreamBuffers(DiskRangeList range, long offset, long length) {
+ // This assumes sorted ranges (as do many other parts of ORC code.
+ ArrayList buffers = new ArrayList();
+ if (length == 0) return buffers;
+ long streamEnd = offset + length;
+ boolean inRange = false;
+ while (range != null) {
+ if (!inRange) {
+ if (range.getEnd() <= offset) {
+ range = range.next;
+ continue; // Skip until we are in range.
+ }
+ inRange = true;
+ if (range.getOffset() < offset) {
+ // Partial first buffer, add a slice of it.
+ buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
+ if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
+ range = range.next;
+ continue;
+ }
+ } else if (range.getOffset() >= streamEnd) {
+ break;
+ }
+ if (range.getEnd() > streamEnd) {
+ // Partial last buffer (may also be the first buffer), add a slice of it.
+ buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
+ break;
+ }
+ // Buffer that belongs entirely to one stream.
+ // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
+ // because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
+ buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
+ if (range.getEnd() == streamEnd) break;
+ range = range.next;
+ }
+ return buffers;
+ }
+
+ static ZeroCopyAdapter createZeroCopyShim(FSDataInputStream file,
+ CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
+ if ((codec == null || ((codec instanceof DirectDecompressionCodec)
+ && ((DirectDecompressionCodec) codec).isAvailable()))) {
+ /* codec is null or is available */
+ return new ZeroCopyAdapter(file, pool);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
deleted file mode 100644
index 2482f93..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.orc;
-
-/**
- * A memory efficient red-black tree that does not allocate any objects per
- * an element. This class is abstract and assumes that the child class
- * handles the key and comparisons with the key.
- */
-abstract class RedBlackTree {
- public static final int NULL = -1;
-
- // Various values controlling the offset of the data within the array.
- private static final int LEFT_OFFSET = 0;
- private static final int RIGHT_OFFSET = 1;
- private static final int ELEMENT_SIZE = 2;
-
- protected int size = 0;
- private final DynamicIntArray data;
- protected int root = NULL;
- protected int lastAdd = 0;
- private boolean wasAdd = false;
-
- /**
- * Create a set with the given initial capacity.
- */
- public RedBlackTree(int initialCapacity) {
- data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE);
- }
-
- /**
- * Insert a new node into the data array, growing the array as necessary.
- *
- * @return Returns the position of the new node.
- */
- private int insert(int left, int right, boolean isRed) {
- int position = size;
- size += 1;
- setLeft(position, left, isRed);
- setRight(position, right);
- return position;
- }
-
- /**
- * Compare the value at the given position to the new value.
- * @return 0 if the values are the same, -1 if the new value is smaller and
- * 1 if the new value is larger.
- */
- protected abstract int compareValue(int position);
-
- /**
- * Is the given node red as opposed to black? To prevent having an extra word
- * in the data array, we just the low bit on the left child index.
- */
- protected boolean isRed(int position) {
- return position != NULL &&
- (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1;
- }
-
- /**
- * Set the red bit true or false.
- */
- private void setRed(int position, boolean isRed) {
- int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
- if (isRed) {
- data.set(offset, data.get(offset) | 1);
- } else {
- data.set(offset, data.get(offset) & ~1);
- }
- }
-
- /**
- * Get the left field of the given position.
- */
- protected int getLeft(int position) {
- return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1;
- }
-
- /**
- * Get the right field of the given position.
- */
- protected int getRight(int position) {
- return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET);
- }
-
- /**
- * Set the left field of the given position.
- * Note that we are storing the node color in the low bit of the left pointer.
- */
- private void setLeft(int position, int left) {
- int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
- data.set(offset, (left << 1) | (data.get(offset) & 1));
- }
-
- /**
- * Set the left field of the given position.
- * Note that we are storing the node color in the low bit of the left pointer.
- */
- private void setLeft(int position, int left, boolean isRed) {
- int offset = position * ELEMENT_SIZE + LEFT_OFFSET;
- data.set(offset, (left << 1) | (isRed ? 1 : 0));
- }
-
- /**
- * Set the right field of the given position.
- */
- private void setRight(int position, int right) {
- data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right);
- }
-
- /**
- * Insert or find a given key in the tree and rebalance the tree correctly.
- * Rebalancing restores the red-black aspect of the tree to maintain the
- * invariants:
- * 1. If a node is red, both of its children are black.
- * 2. Each child of a node has the same black height (the number of black
- * nodes between it and the leaves of the tree).
- *
- * Inserted nodes are at the leaves and are red, therefore there is at most a
- * violation of rule 1 at the node we just put in. Instead of always keeping
- * the parents, this routine passing down the context.
- *
- * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are
- * left-right mirror images of each other). See Algorighms by Cormen,
- * Leiserson, and Rivest for the explaination of the subcases.
- *
- * @param node The node that we are fixing right now.
- * @param fromLeft Did we come down from the left?
- * @param parent Nodes' parent
- * @param grandparent Parent's parent
- * @param greatGrandparent Grandparent's parent
- * @return Does parent also need to be checked and/or fixed?
- */
- private boolean add(int node, boolean fromLeft, int parent,
- int grandparent, int greatGrandparent) {
- if (node == NULL) {
- if (root == NULL) {
- lastAdd = insert(NULL, NULL, false);
- root = lastAdd;
- wasAdd = true;
- return false;
- } else {
- lastAdd = insert(NULL, NULL, true);
- node = lastAdd;
- wasAdd = true;
- // connect the new node into the tree
- if (fromLeft) {
- setLeft(parent, node);
- } else {
- setRight(parent, node);
- }
- }
- } else {
- int compare = compareValue(node);
- boolean keepGoing;
-
- // Recurse down to find where the node needs to be added
- if (compare < 0) {
- keepGoing = add(getLeft(node), true, node, parent, grandparent);
- } else if (compare > 0) {
- keepGoing = add(getRight(node), false, node, parent, grandparent);
- } else {
- lastAdd = node;
- wasAdd = false;
- return false;
- }
-
- // we don't need to fix the root (because it is always set to black)
- if (node == root || !keepGoing) {
- return false;
- }
- }
-
-
- // Do we need to fix this node? Only if there are two reds right under each
- // other.
- if (isRed(node) && isRed(parent)) {
- if (parent == getLeft(grandparent)) {
- int uncle = getRight(grandparent);
- if (isRed(uncle)) {
- // case 1.1
- setRed(parent, false);
- setRed(uncle, false);
- setRed(grandparent, true);
- return true;
- } else {
- if (node == getRight(parent)) {
- // case 1.2
- // swap node and parent
- int tmp = node;
- node = parent;
- parent = tmp;
- // left-rotate on node
- setLeft(grandparent, parent);
- setRight(node, getLeft(parent));
- setLeft(parent, node);
- }
-
- // case 1.2 and 1.3
- setRed(parent, false);
- setRed(grandparent, true);
-
- // right-rotate on grandparent
- if (greatGrandparent == NULL) {
- root = parent;
- } else if (getLeft(greatGrandparent) == grandparent) {
- setLeft(greatGrandparent, parent);
- } else {
- setRight(greatGrandparent, parent);
- }
- setLeft(grandparent, getRight(parent));
- setRight(parent, grandparent);
- return false;
- }
- } else {
- int uncle = getLeft(grandparent);
- if (isRed(uncle)) {
- // case 2.1
- setRed(parent, false);
- setRed(uncle, false);
- setRed(grandparent, true);
- return true;
- } else {
- if (node == getLeft(parent)) {
- // case 2.2
- // swap node and parent
- int tmp = node;
- node = parent;
- parent = tmp;
- // right-rotate on node
- setRight(grandparent, parent);
- setLeft(node, getRight(parent));
- setRight(parent, node);
- }
- // case 2.2 and 2.3
- setRed(parent, false);
- setRed(grandparent, true);
- // left-rotate on grandparent
- if (greatGrandparent == NULL) {
- root = parent;
- } else if (getRight(greatGrandparent) == grandparent) {
- setRight(greatGrandparent, parent);
- } else {
- setLeft(greatGrandparent, parent);
- }
- setRight(grandparent, getLeft(parent));
- setLeft(parent, grandparent);
- return false;
- }
- }
- } else {
- return true;
- }
- }
-
- /**
- * Add the new key to the tree.
- * @return true if the element is a new one.
- */
- protected boolean add() {
- add(root, false, NULL, NULL, NULL);
- if (wasAdd) {
- setRed(root, false);
- return true;
- } else {
- return false;
- }
- }
-
- /**
- * Get the number of elements in the set.
- */
- public int size() {
- return size;
- }
-
- /**
- * Reset the table to empty.
- */
- public void clear() {
- root = NULL;
- size = 0;
- data.clear();
- }
-
- /**
- * Get the buffer size in bytes.
- */
- public long getSizeInBytes() {
- return data.getSizeInBytes();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
deleted file mode 100644
index 0953cdd..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * A streamFactory that writes a sequence of bytes. A control byte is written before
- * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
- * bytes is -1 to -128, 1 to 128 literal byte values follow.
- */
-class RunLengthByteWriter {
- static final int MIN_REPEAT_SIZE = 3;
- static final int MAX_LITERAL_SIZE = 128;
- static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE;
- private final PositionedOutputStream output;
- private final byte[] literals = new byte[MAX_LITERAL_SIZE];
- private int numLiterals = 0;
- private boolean repeat = false;
- private int tailRunLength = 0;
-
- RunLengthByteWriter(PositionedOutputStream output) {
- this.output = output;
- }
-
- private void writeValues() throws IOException {
- if (numLiterals != 0) {
- if (repeat) {
- output.write(numLiterals - MIN_REPEAT_SIZE);
- output.write(literals, 0, 1);
- } else {
- output.write(-numLiterals);
- output.write(literals, 0, numLiterals);
- }
- repeat = false;
- tailRunLength = 0;
- numLiterals = 0;
- }
- }
-
- void flush() throws IOException {
- writeValues();
- output.flush();
- }
-
- void write(byte value) throws IOException {
- if (numLiterals == 0) {
- literals[numLiterals++] = value;
- tailRunLength = 1;
- } else if (repeat) {
- if (value == literals[0]) {
- numLiterals += 1;
- if (numLiterals == MAX_REPEAT_SIZE) {
- writeValues();
- }
- } else {
- writeValues();
- literals[numLiterals++] = value;
- tailRunLength = 1;
- }
- } else {
- if (value == literals[numLiterals - 1]) {
- tailRunLength += 1;
- } else {
- tailRunLength = 1;
- }
- if (tailRunLength == MIN_REPEAT_SIZE) {
- if (numLiterals + 1 == MIN_REPEAT_SIZE) {
- repeat = true;
- numLiterals += 1;
- } else {
- numLiterals -= MIN_REPEAT_SIZE - 1;
- writeValues();
- literals[0] = value;
- repeat = true;
- numLiterals = MIN_REPEAT_SIZE;
- }
- } else {
- literals[numLiterals++] = value;
- if (numLiterals == MAX_LITERAL_SIZE) {
- writeValues();
- }
- }
- }
- }
-
- void getPosition(PositionRecorder recorder) throws IOException {
- output.getPosition(recorder);
- recorder.addPosition(numLiterals);
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
deleted file mode 100644
index 867f041..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * A streamFactory that writes a sequence of integers. A control byte is written before
- * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each
- * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
- * literal vint values follow.
- */
-class RunLengthIntegerWriter implements IntegerWriter {
- static final int MIN_REPEAT_SIZE = 3;
- static final int MAX_DELTA = 127;
- static final int MIN_DELTA = -128;
- static final int MAX_LITERAL_SIZE = 128;
- private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
- private final PositionedOutputStream output;
- private final boolean signed;
- private final long[] literals = new long[MAX_LITERAL_SIZE];
- private int numLiterals = 0;
- private long delta = 0;
- private boolean repeat = false;
- private int tailRunLength = 0;
- private SerializationUtils utils;
-
- RunLengthIntegerWriter(PositionedOutputStream output,
- boolean signed) {
- this.output = output;
- this.signed = signed;
- this.utils = new SerializationUtils();
- }
-
- private void writeValues() throws IOException {
- if (numLiterals != 0) {
- if (repeat) {
- output.write(numLiterals - MIN_REPEAT_SIZE);
- output.write((byte) delta);
- if (signed) {
- utils.writeVslong(output, literals[0]);
- } else {
- utils.writeVulong(output, literals[0]);
- }
- } else {
- output.write(-numLiterals);
- for(int i=0; i < numLiterals; ++i) {
- if (signed) {
- utils.writeVslong(output, literals[i]);
- } else {
- utils.writeVulong(output, literals[i]);
- }
- }
- }
- repeat = false;
- numLiterals = 0;
- tailRunLength = 0;
- }
- }
-
- @Override
- public void flush() throws IOException {
- writeValues();
- output.flush();
- }
-
- @Override
- public void write(long value) throws IOException {
- if (numLiterals == 0) {
- literals[numLiterals++] = value;
- tailRunLength = 1;
- } else if (repeat) {
- if (value == literals[0] + delta * numLiterals) {
- numLiterals += 1;
- if (numLiterals == MAX_REPEAT_SIZE) {
- writeValues();
- }
- } else {
- writeValues();
- literals[numLiterals++] = value;
- tailRunLength = 1;
- }
- } else {
- if (tailRunLength == 1) {
- delta = value - literals[numLiterals - 1];
- if (delta < MIN_DELTA || delta > MAX_DELTA) {
- tailRunLength = 1;
- } else {
- tailRunLength = 2;
- }
- } else if (value == literals[numLiterals - 1] + delta) {
- tailRunLength += 1;
- } else {
- delta = value - literals[numLiterals - 1];
- if (delta < MIN_DELTA || delta > MAX_DELTA) {
- tailRunLength = 1;
- } else {
- tailRunLength = 2;
- }
- }
- if (tailRunLength == MIN_REPEAT_SIZE) {
- if (numLiterals + 1 == MIN_REPEAT_SIZE) {
- repeat = true;
- numLiterals += 1;
- } else {
- numLiterals -= MIN_REPEAT_SIZE - 1;
- long base = literals[numLiterals];
- writeValues();
- literals[0] = base;
- repeat = true;
- numLiterals = MIN_REPEAT_SIZE;
- }
- } else {
- literals[numLiterals++] = value;
- if (numLiterals == MAX_LITERAL_SIZE) {
- writeValues();
- }
- }
- }
- }
-
- @Override
- public void getPosition(PositionRecorder recorder) throws IOException {
- output.getPosition(recorder);
- recorder.addPosition(numLiterals);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
deleted file mode 100644
index 7237b2e..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java
+++ /dev/null
@@ -1,832 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tajo.storage.thirdparty.orc;
-
-import java.io.IOException;
-
-/**
- * A writer that performs light weight compression over sequence of integers.
- *
- * There are four types of lightweight integer compression
- *
- * - SHORT_REPEAT
- * - DIRECT
- * - PATCHED_BASE
- * - DELTA
- *
- *
- * The description and format for these types are as below:
- *
- * SHORT_REPEAT: Used for short repeated integer sequences.
- *
- * - 1 byte header
- *
- * - 2 bits for encoding type
- * - 3 bits for bytes required for repeating value
- * - 3 bits for repeat count (MIN_REPEAT + run length)
- *
- *
- * - Blob - repeat value (fixed bytes)
- *
- *
- *
- * DIRECT: Used for random integer sequences whose number of bit
- * requirement doesn't vary a lot.
- *
- * - 2 bytes header
- *
- * 1st byte
- * - 2 bits for encoding type
- * - 5 bits for fixed bit width of values in blob
- * - 1 bit for storing MSB of run length
- *
- *
- * 2nd byte
- * - 8 bits for lower run length bits
- *
- *
- * - Blob - stores the direct values using fixed bit width. The length of the
- * data blob is (fixed width * run length) bits long
- *
- *
- *
- * PATCHED_BASE: Used for random integer sequences whose number of bit
- * requirement varies beyond a threshold.
- *
- * - 4 bytes header
- *
- * 1st byte
- * - 2 bits for encoding type
- * - 5 bits for fixed bit width of values in blob
- * - 1 bit for storing MSB of run length
- *
- *
- * 2nd byte
- * - 8 bits for lower run length bits
- *
- *
- * 3rd byte
- * - 3 bits for bytes required to encode base value
- * - 5 bits for patch width
- *
- *
- * 4th byte
- * - 3 bits for patch gap width
- * - 5 bits for patch length
- *
- *
- * - Base value - Stored using fixed number of bytes. If MSB is set, base
- * value is negative else positive. Length of base value is (base width * 8)
- * bits.
- * - Data blob - Base reduced values as stored using fixed bit width. Length
- * of data blob is (fixed width * run length) bits.
- * - Patch blob - Patch blob is a list of gap and patch value. Each entry in
- * the patch list is (patch width + patch gap width) bits long. Gap between the
- * subsequent elements to be patched are stored in upper part of entry whereas
- * patch values are stored in lower part of entry. Length of patch blob is
- * ((patch width + patch gap width) * patch length) bits.
- *
- *
- *
- * DELTA Used for monotonically increasing or decreasing sequences,
- * sequences with fixed delta values or long repeated sequences.
- *
- * - 2 bytes header
- *
- * 1st byte
- * - 2 bits for encoding type
- * - 5 bits for fixed bit width of values in blob
- * - 1 bit for storing MSB of run length
- *
- *
- * 2nd byte
- * - 8 bits for lower run length bits
- *
- *
- * - Base value - encoded as varint
- * - Delta base - encoded as varint
- * - Delta blob - only positive values. monotonicity and orderness are decided
- * based on the sign of the base value and delta base
- *
- *
- */
-class RunLengthIntegerWriterV2 implements IntegerWriter {
-
- public enum EncodingType {
- SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA
- }
-
- static final int MAX_SCOPE = 512;
- static final int MIN_REPEAT = 3;
- private static final int MAX_SHORT_REPEAT_LENGTH = 10;
- private long prevDelta = 0;
- private int fixedRunLength = 0;
- private int variableRunLength = 0;
- private final long[] literals = new long[MAX_SCOPE];
- private final PositionedOutputStream output;
- private final boolean signed;
- private EncodingType encoding;
- private int numLiterals;
- private final long[] zigzagLiterals = new long[MAX_SCOPE];
- private final long[] baseRedLiterals = new long[MAX_SCOPE];
- private final long[] adjDeltas = new long[MAX_SCOPE];
- private long fixedDelta;
- private int zzBits90p;
- private int zzBits100p;
- private int brBits95p;
- private int brBits100p;
- private int bitsDeltaMax;
- private int patchWidth;
- private int patchGapWidth;
- private int patchLength;
- private long[] gapVsPatchList;
- private long min;
- private boolean isFixedDelta;
- private SerializationUtils utils;
- private boolean alignedBitpacking;
-
- RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) {
- this(output, signed, true);
- }
-
- RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed,
- boolean alignedBitpacking) {
- this.output = output;
- this.signed = signed;
- this.alignedBitpacking = alignedBitpacking;
- this.utils = new SerializationUtils();
- clear();
- }
-
- private void writeValues() throws IOException {
- if (numLiterals != 0) {
-
- if (encoding.equals(EncodingType.SHORT_REPEAT)) {
- writeShortRepeatValues();
- } else if (encoding.equals(EncodingType.DIRECT)) {
- writeDirectValues();
- } else if (encoding.equals(EncodingType.PATCHED_BASE)) {
- writePatchedBaseValues();
- } else {
- writeDeltaValues();
- }
-
- // clear all the variables
- clear();
- }
- }
-
- private void writeDeltaValues() throws IOException {
- int len = 0;
- int fb = bitsDeltaMax;
- int efb = 0;
-
- if (alignedBitpacking) {
- fb = utils.getClosestAlignedFixedBits(fb);
- }
-
- if (isFixedDelta) {
- // if fixed run length is greater than threshold then it will be fixed
- // delta sequence with delta value 0 else fixed delta sequence with
- // non-zero delta value
- if (fixedRunLength > MIN_REPEAT) {
- // ex. sequence: 2 2 2 2 2 2 2 2
- len = fixedRunLength - 1;
- fixedRunLength = 0;
- } else {
- // ex. sequence: 4 6 8 10 12 14 16
- len = variableRunLength - 1;
- variableRunLength = 0;
- }
- } else {
- // fixed width 0 is used for long repeating values.
- // sequences that require only 1 bit to encode will have an additional bit
- if (fb == 1) {
- fb = 2;
- }
- efb = utils.encodeBitWidth(fb);
- efb = efb << 1;
- len = variableRunLength - 1;
- variableRunLength = 0;
- }
-
- // extract the 9th bit of run length
- final int tailBits = (len & 0x100) >>> 8;
-
- // create first byte of the header
- final int headerFirstByte = getOpcode() | efb | tailBits;
-
- // second byte of the header stores the remaining 8 bits of runlength
- final int headerSecondByte = len & 0xff;
-
- // write header
- output.write(headerFirstByte);
- output.write(headerSecondByte);
-
- // store the first value from zigzag literal array
- if (signed) {
- utils.writeVslong(output, literals[0]);
- } else {
- utils.writeVulong(output, literals[0]);
- }
-
- if (isFixedDelta) {
- // if delta is fixed then we don't need to store delta blob
- utils.writeVslong(output, fixedDelta);
- } else {
- // store the first value as delta value using zigzag encoding
- utils.writeVslong(output, adjDeltas[0]);
-
- // adjacent delta values are bit packed. The length of adjDeltas array is
- // always one less than the number of literals (delta difference for n
- // elements is n-1). We have already written one element, write the
- // remaining numLiterals - 2 elements here
- utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output);
- }
- }
-
- private void writePatchedBaseValues() throws IOException {
-
- // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding
- // because patch is applied to MSB bits. For example: If fixed bit width of
- // base value is 7 bits and if patch is 3 bits, the actual value is
- // constructed by shifting the patch to left by 7 positions.
- // actual_value = patch << 7 | base_value
- // So, if we align base_value then actual_value can not be reconstructed.
-
- // write the number of fixed bits required in next 5 bits
- final int fb = brBits95p;
- final int efb = utils.encodeBitWidth(fb) << 1;
-
- // adjust variable run length, they are one off
- variableRunLength -= 1;
-
- // extract the 9th bit of run length
- final int tailBits = (variableRunLength & 0x100) >>> 8;
-
- // create first byte of the header
- final int headerFirstByte = getOpcode() | efb | tailBits;
-
- // second byte of the header stores the remaining 8 bits of runlength
- final int headerSecondByte = variableRunLength & 0xff;
-
- // if the min value is negative toggle the sign
- final boolean isNegative = min < 0 ? true : false;
- if (isNegative) {
- min = -min;
- }
-
- // find the number of bytes required for base and shift it by 5 bits
- // to accommodate patch width. The additional bit is used to store the sign
- // of the base value.
- final int baseWidth = utils.findClosestNumBits(min) + 1;
- final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1;
- final int bb = (baseBytes - 1) << 5;
-
- // if the base value is negative then set MSB to 1
- if (isNegative) {
- min |= (1L << ((baseBytes * 8) - 1));
- }
-
- // third byte contains 3 bits for number of bytes occupied by base
- // and 5 bits for patchWidth
- final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth);
-
- // fourth byte contains 3 bits for page gap width and 5 bits for
- // patch length
- final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength;
-
- // write header
- output.write(headerFirstByte);
- output.write(headerSecondByte);
- output.write(headerThirdByte);
- output.write(headerFourthByte);
-
- // write the base value using fixed bytes in big endian order
- for(int i = baseBytes - 1; i >= 0; i--) {
- byte b = (byte) ((min >>> (i * 8)) & 0xff);
- output.write(b);
- }
-
- // base reduced literals are bit packed
- int closestFixedBits = utils.getClosestFixedBits(fb);
-
- utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits,
- output);
-
- // write patch list
- closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth);
-
- utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits,
- output);
-
- // reset run length
- variableRunLength = 0;
- }
-
- /**
- * Store the opcode in 2 MSB bits
- * @return opcode
- */
- private int getOpcode() {
- return encoding.ordinal() << 6;
- }
-
- private void writeDirectValues() throws IOException {
-
- // write the number of fixed bits required in next 5 bits
- int fb = zzBits100p;
-
- if (alignedBitpacking) {
- fb = utils.getClosestAlignedFixedBits(fb);
- }
-
- final int efb = utils.encodeBitWidth(fb) << 1;
-
- // adjust variable run length
- variableRunLength -= 1;
-
- // extract the 9th bit of run length
- final int tailBits = (variableRunLength & 0x100) >>> 8;
-
- // create first byte of the header
- final int headerFirstByte = getOpcode() | efb | tailBits;
-
- // second byte of the header stores the remaining 8 bits of runlength
- final int headerSecondByte = variableRunLength & 0xff;
-
- // write header
- output.write(headerFirstByte);
- output.write(headerSecondByte);
-
- // bit packing the zigzag encoded literals
- utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output);
-
- // reset run length
- variableRunLength = 0;
- }
-
- private void writeShortRepeatValues() throws IOException {
- // get the value that is repeating, compute the bits and bytes required
- long repeatVal = 0;
- if (signed) {
- repeatVal = utils.zigzagEncode(literals[0]);
- } else {
- repeatVal = literals[0];
- }
-
- final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal);
- final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3
- : (numBitsRepeatVal >>> 3) + 1;
-
- // write encoding type in top 2 bits
- int header = getOpcode();
-
- // write the number of bytes required for the value
- header |= ((numBytesRepeatVal - 1) << 3);
-
- // write the run length
- fixedRunLength -= MIN_REPEAT;
- header |= fixedRunLength;
-
- // write the header
- output.write(header);
-
- // write the repeating value in big endian byte order
- for(int i = numBytesRepeatVal - 1; i >= 0; i--) {
- int b = (int) ((repeatVal >>> (i * 8)) & 0xff);
- output.write(b);
- }
-
- fixedRunLength = 0;
- }
-
- private void determineEncoding() {
-
- // we need to compute zigzag values for DIRECT encoding if we decide to
- // break early for delta overflows or for shorter runs
- computeZigZagLiterals();
-
- zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
-
- // not a big win for shorter runs to determine encoding
- if (numLiterals <= MIN_REPEAT) {
- encoding = EncodingType.DIRECT;
- return;
- }
-
- // DELTA encoding check
-
- // for identifying monotonic sequences
- boolean isIncreasing = true;
- boolean isDecreasing = true;
- this.isFixedDelta = true;
-
- this.min = literals[0];
- long max = literals[0];
- final long initialDelta = literals[1] - literals[0];
- long currDelta = initialDelta;
- long deltaMax = initialDelta;
- this.adjDeltas[0] = initialDelta;
-
- for (int i = 1; i < numLiterals; i++) {
- final long l1 = literals[i];
- final long l0 = literals[i - 1];
- currDelta = l1 - l0;
- min = Math.min(min, l1);
- max = Math.max(max, l1);
-
- isIncreasing &= (l0 <= l1);
- isDecreasing &= (l0 >= l1);
-
- isFixedDelta &= (currDelta == initialDelta);
- if (i > 1) {
- adjDeltas[i - 1] = Math.abs(currDelta);
- deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
- }
- }
-
- // its faster to exit under delta overflow condition without checking for
- // PATCHED_BASE condition as encoding using DIRECT is faster and has less
- // overhead than PATCHED_BASE
- if (!utils.isSafeSubtract(max, min)) {
- encoding = EncodingType.DIRECT;
- return;
- }
-
- // invariant - subtracting any number from any other in the literals after
- // this point won't overflow
-
- // if initialDelta is 0 then we cannot delta encode as we cannot identify
- // the sign of deltas (increasing or decreasing)
- if (initialDelta != 0) {
-
- // if min is equal to max then the delta is 0, this condition happens for
- // fixed values run >10 which cannot be encoded with SHORT_REPEAT
- if (min == max) {
- assert isFixedDelta : min + "==" + max +
- ", isFixedDelta cannot be false";
- assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
- fixedDelta = 0;
- encoding = EncodingType.DELTA;
- return;
- }
-
- if (isFixedDelta) {
- assert currDelta == initialDelta
- : "currDelta should be equal to initialDelta for fixed delta encoding";
- encoding = EncodingType.DELTA;
- fixedDelta = currDelta;
- return;
- }
-
- // stores the number of bits required for packing delta blob in
- // delta encoding
- bitsDeltaMax = utils.findClosestNumBits(deltaMax);
-
- // monotonic condition
- if (isIncreasing || isDecreasing) {
- encoding = EncodingType.DELTA;
- return;
- }
- }
-
- // PATCHED_BASE encoding check
-
- // percentile values are computed for the zigzag encoded values. if the
- // number of bit requirement between 90th and 100th percentile varies
- // beyond a threshold then we need to patch the values. if the variation
- // is not significant then we can use direct encoding
-
- zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
- int diffBitsLH = zzBits100p - zzBits90p;
-
- // if the difference between 90th percentile and 100th percentile fixed
- // bits is > 1 then we need patch the values
- if (diffBitsLH > 1) {
-
- // patching is done only on base reduced values.
- // remove base from literals
- for (int i = 0; i < numLiterals; i++) {
- baseRedLiterals[i] = literals[i] - min;
- }
-
- // 95th percentile width is used to determine max allowed value
- // after which patching will be done
- brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
-
- // 100th percentile is used to compute the max patch width
- brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
-
- // after base reducing the values, if the difference in bits between
- // 95th percentile and 100th percentile value is zero then there
- // is no point in patching the values, in which case we will
- // fallback to DIRECT encoding.
- // The decision to use patched base was based on zigzag values, but the
- // actual patching is done on base reduced literals.
- if ((brBits100p - brBits95p) != 0) {
- encoding = EncodingType.PATCHED_BASE;
- preparePatchedBlob();
- return;
- } else {
- encoding = EncodingType.DIRECT;
- return;
- }
- } else {
- // if difference in bits between 95th percentile and 100th percentile is
- // 0, then patch length will become 0. Hence we will fallback to direct
- encoding = EncodingType.DIRECT;
- return;
- }
- }
-
- private void computeZigZagLiterals() {
- // populate zigzag encoded literals
- long zzEncVal = 0;
- for (int i = 0; i < numLiterals; i++) {
- if (signed) {
- zzEncVal = utils.zigzagEncode(literals[i]);
- } else {
- zzEncVal = literals[i];
- }
- zigzagLiterals[i] = zzEncVal;
- }
- }
-
- private void preparePatchedBlob() {
- // mask will be max value beyond which patch will be generated
- long mask = (1L << brBits95p) - 1;
-
- // since we are considering only 95 percentile, the size of gap and
- // patch array can contain only be 5% values
- patchLength = (int) Math.ceil((numLiterals * 0.05));
-
- int[] gapList = new int[patchLength];
- long[] patchList = new long[patchLength];
-
- // #bit for patch
- patchWidth = brBits100p - brBits95p;
- patchWidth = utils.getClosestFixedBits(patchWidth);
-
- // if patch bit requirement is 64 then it will not possible to pack
- // gap and patch together in a long. To make sure gap and patch can be
- // packed together adjust the patch width
- if (patchWidth == 64) {
- patchWidth = 56;
- brBits95p = 8;
- mask = (1L << brBits95p) - 1;
- }
-
- int gapIdx = 0;
- int patchIdx = 0;
- int prev = 0;
- int gap = 0;
- int maxGap = 0;
-
- for(int i = 0; i < numLiterals; i++) {
- // if value is above mask then create the patch and record the gap
- if (baseRedLiterals[i] > mask) {
- gap = i - prev;
- if (gap > maxGap) {
- maxGap = gap;
- }
-
- // gaps are relative, so store the previous patched value index
- prev = i;
- gapList[gapIdx++] = gap;
-
- // extract the most significant bits that are over mask bits
- long patch = baseRedLiterals[i] >>> brBits95p;
- patchList[patchIdx++] = patch;
-
- // strip off the MSB to enable safe bit packing
- baseRedLiterals[i] &= mask;
- }
- }
-
- // adjust the patch length to number of entries in gap list
- patchLength = gapIdx;
-
- // if the element to be patched is the first and only element then
- // max gap will be 0, but to store the gap as 0 we need atleast 1 bit
- if (maxGap == 0 && patchLength != 0) {
- patchGapWidth = 1;
- } else {
- patchGapWidth = utils.findClosestNumBits(maxGap);
- }
-
- // special case: if the patch gap width is greater than 256, then
- // we need 9 bits to encode the gap width. But we only have 3 bits in
- // header to record the gap width. To deal with this case, we will save
- // two entries in patch list in the following way
- // 256 gap width => 0 for patch value
- // actual gap - 256 => actual patch value
- // We will do the same for gap width = 511. If the element to be patched is
- // the last element in the scope then gap width will be 511. In this case we
- // will have 3 entries in the patch list in the following way
- // 255 gap width => 0 for patch value
- // 255 gap width => 0 for patch value
- // 1 gap width => actual patch value
- if (patchGapWidth > 8) {
- patchGapWidth = 8;
- // for gap = 511, we need two additional entries in patch list
- if (maxGap == 511) {
- patchLength += 2;
- } else {
- patchLength += 1;
- }
- }
-
- // create gap vs patch list
- gapIdx = 0;
- patchIdx = 0;
- gapVsPatchList = new long[patchLength];
- for(int i = 0; i < patchLength; i++) {
- long g = gapList[gapIdx++];
- long p = patchList[patchIdx++];
- while (g > 255) {
- gapVsPatchList[i++] = (255L << patchWidth);
- g -= 255;
- }
-
- // store patch value in LSBs and gap in MSBs
- gapVsPatchList[i] = (g << patchWidth) | p;
- }
- }
-
- /**
- * clears all the variables
- */
- private void clear() {
- numLiterals = 0;
- encoding = null;
- prevDelta = 0;
- fixedDelta = 0;
- zzBits90p = 0;
- zzBits100p = 0;
- brBits95p = 0;
- brBits100p = 0;
- bitsDeltaMax = 0;
- patchGapWidth = 0;
- patchLength = 0;
- patchWidth = 0;
- gapVsPatchList = null;
- min = 0;
- isFixedDelta = true;
- }
-
- @Override
- public void flush() throws IOException {
- if (numLiterals != 0) {
- if (variableRunLength != 0) {
- determineEncoding();
- writeValues();
- } else if (fixedRunLength != 0) {
- if (fixedRunLength < MIN_REPEAT) {
- variableRunLength = fixedRunLength;
- fixedRunLength = 0;
- determineEncoding();
- writeValues();
- } else if (fixedRunLength >= MIN_REPEAT
- && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
- encoding = EncodingType.SHORT_REPEAT;
- writeValues();
- } else {
- encoding = EncodingType.DELTA;
- isFixedDelta = true;
- writeValues();
- }
- }
- }
- output.flush();
- }
-
- @Override
- public void write(long val) throws IOException {
- if (numLiterals == 0) {
- initializeLiterals(val);
- } else {
- if (numLiterals == 1) {
- prevDelta = val - literals[0];
- literals[numLiterals++] = val;
- // if both values are same count as fixed run else variable run
- if (val == literals[0]) {
- fixedRunLength = 2;
- variableRunLength = 0;
- } else {
- fixedRunLength = 0;
- variableRunLength = 2;
- }
- } else {
- long currentDelta = val - literals[numLiterals - 1];
- if (prevDelta == 0 && currentDelta == 0) {
- // fixed delta run
-
- literals[numLiterals++] = val;
-
- // if variable run is non-zero then we are seeing repeating
- // values at the end of variable run in which case keep
- // updating variable and fixed runs
- if (variableRunLength > 0) {
- fixedRunLength = 2;
- }
- fixedRunLength += 1;
-
- // if fixed run met the minimum condition and if variable
- // run is non-zero then flush the variable run and shift the
- // tail fixed runs to start of the buffer
- if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) {
- numLiterals -= MIN_REPEAT;
- variableRunLength -= MIN_REPEAT - 1;
- // copy the tail fixed runs
- long[] tailVals = new long[MIN_REPEAT];
- System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT);
-
- // determine variable encoding and flush values
- determineEncoding();
- writeValues();
-
- // shift tail fixed runs to beginning of the buffer
- for(long l : tailVals) {
- literals[numLiterals++] = l;
- }
- }
-
- // if fixed runs reached max repeat length then write values
- if (fixedRunLength == MAX_SCOPE) {
- determineEncoding();
- writeValues();
- }
- } else {
- // variable delta run
-
- // if fixed run length is non-zero and if it satisfies the
- // short repeat conditions then write the values as short repeats
- // else use delta encoding
- if (fixedRunLength >= MIN_REPEAT) {
- if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) {
- encoding = EncodingType.SHORT_REPEAT;
- writeValues();
- } else {
- encoding = EncodingType.DELTA;
- isFixedDelta = true;
- writeValues();
- }
- }
-
- // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) {
- if (val != literals[numLiterals - 1]) {
- variableRunLength = fixedRunLength;
- fixedRunLength = 0;
- }
- }
-
- // after writing values re-initialize the variables
- if (numLiterals == 0) {
- initializeLiterals(val);
- } else {
- // keep updating variable run lengths
- prevDelta = val - literals[numLiterals - 1];
- literals[numLiterals++] = val;
- variableRunLength += 1;
-
- // if variable run length reach the max scope, write it
- if (variableRunLength == MAX_SCOPE) {
- determineEncoding();
- writeValues();
- }
- }
- }
- }
- }
- }
-
- private void initializeLiterals(long val) {
- literals[numLiterals++] = val;
- fixedRunLength = 1;
- variableRunLength = 1;
- }
-
- @Override
- public void getPosition(PositionRecorder recorder) throws IOException {
- output.getPosition(recorder);
- recorder.addPosition(numLiterals);
- }
-}