Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB0C218FEF for ; Wed, 23 Mar 2016 01:41:46 +0000 (UTC) Received: (qmail 19306 invoked by uid 500); 23 Mar 2016 01:41:46 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 19229 invoked by uid 500); 23 Mar 2016 01:41:46 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 19079 invoked by uid 99); 23 Mar 2016 01:41:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Mar 2016 01:41:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 609C2E97A6; Wed, 23 Mar 2016 01:41:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Wed, 23 Mar 2016 01:41:49 -0000 Message-Id: <3d746f9da2b14e84baa987a15ce3c32f@git.apache.org> In-Reply-To: <2920c5652dd84ceaa2bf594806b32d07@git.apache.org> References: <2920c5652dd84ceaa2bf594806b32d07@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/7] tajo git commit: TAJO-2102: Migrate to Apache Orc from Presto's one. http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java deleted file mode 100644 index f6cfd57..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OutStream.java +++ /dev/null @@ -1,286 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import java.io.IOException; -import java.nio.ByteBuffer; - -class OutStream extends PositionedOutputStream { - - interface OutputReceiver { - /** - * Output the given buffer to the final destination - * @param buffer the buffer to output - * @throws IOException - */ - void output(ByteBuffer buffer) throws IOException; - } - - static final int HEADER_SIZE = 3; - private final String name; - private final OutputReceiver receiver; - // if enabled the stream will be suppressed when writing stripe - private boolean suppress; - - /** - * Stores the uncompressed bytes that have been serialized, but not - * compressed yet. When this fills, we compress the entire buffer. - */ - private ByteBuffer current = null; - - /** - * Stores the compressed bytes until we have a full buffer and then outputs - * them to the receiver. If no compression is being done, this (and overflow) - * will always be null and the current buffer will be sent directly to the - * receiver. - */ - private ByteBuffer compressed = null; - - /** - * Since the compressed buffer may start with contents from previous - * compression blocks, we allocate an overflow buffer so that the - * output of the codec can be split between the two buffers. After the - * compressed buffer is sent to the receiver, the overflow buffer becomes - * the new compressed buffer. - */ - private ByteBuffer overflow = null; - private final int bufferSize; - private final CompressionCodec codec; - private long compressedBytes = 0; - private long uncompressedBytes = 0; - - OutStream(String name, - int bufferSize, - CompressionCodec codec, - OutputReceiver receiver) throws IOException { - this.name = name; - this.bufferSize = bufferSize; - this.codec = codec; - this.receiver = receiver; - this.suppress = false; - } - - public void clear() throws IOException { - flush(); - suppress = false; - } - - /** - * Write the length of the compressed bytes. Life is much easier if the - * header is constant length, so just use 3 bytes. Considering most of the - * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should - * be plenty. We also use the low bit for whether it is the original or - * compressed bytes. - * @param buffer the buffer to write the header to - * @param position the position in the buffer to write at - * @param val the size in the file - * @param original is it uncompressed - */ - private static void writeHeader(ByteBuffer buffer, - int position, - int val, - boolean original) { - buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0))); - buffer.put(position + 1, (byte) (val >> 7)); - buffer.put(position + 2, (byte) (val >> 15)); - } - - private void getNewInputBuffer() throws IOException { - if (codec == null) { - current = ByteBuffer.allocate(bufferSize); - } else { - current = ByteBuffer.allocate(bufferSize + HEADER_SIZE); - writeHeader(current, 0, bufferSize, true); - current.position(HEADER_SIZE); - } - } - - /** - * Allocate a new output buffer if we are compressing. - */ - private ByteBuffer getNewOutputBuffer() throws IOException { - return ByteBuffer.allocate(bufferSize + HEADER_SIZE); - } - - private void flip() throws IOException { - current.limit(current.position()); - current.position(codec == null ? 0 : HEADER_SIZE); - } - - @Override - public void write(int i) throws IOException { - if (current == null) { - getNewInputBuffer(); - } - if (current.remaining() < 1) { - spill(); - } - uncompressedBytes += 1; - current.put((byte) i); - } - - @Override - public void write(byte[] bytes, int offset, int length) throws IOException { - if (current == null) { - getNewInputBuffer(); - } - int remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); - uncompressedBytes += remaining; - length -= remaining; - while (length != 0) { - spill(); - offset += remaining; - remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); - uncompressedBytes += remaining; - length -= remaining; - } - } - - private void spill() throws IOException { - // if there isn't anything in the current buffer, don't spill - if (current == null || - current.position() == (codec == null ? 0 : HEADER_SIZE)) { - return; - } - flip(); - if (codec == null) { - receiver.output(current); - getNewInputBuffer(); - } else { - if (compressed == null) { - compressed = getNewOutputBuffer(); - } else if (overflow == null) { - overflow = getNewOutputBuffer(); - } - int sizePosn = compressed.position(); - compressed.position(compressed.position() + HEADER_SIZE); - if (codec.compress(current, compressed, overflow)) { - uncompressedBytes = 0; - // move position back to after the header - current.position(HEADER_SIZE); - current.limit(current.capacity()); - // find the total bytes in the chunk - int totalBytes = compressed.position() - sizePosn - HEADER_SIZE; - if (overflow != null) { - totalBytes += overflow.position(); - } - compressedBytes += totalBytes + HEADER_SIZE; - writeHeader(compressed, sizePosn, totalBytes, false); - // if we have less than the next header left, spill it. - if (compressed.remaining() < HEADER_SIZE) { - compressed.flip(); - receiver.output(compressed); - compressed = overflow; - overflow = null; - } - } else { - compressedBytes += uncompressedBytes + HEADER_SIZE; - uncompressedBytes = 0; - // we are using the original, but need to spill the current - // compressed buffer first. So back up to where we started, - // flip it and add it to done. - if (sizePosn != 0) { - compressed.position(sizePosn); - compressed.flip(); - receiver.output(compressed); - compressed = null; - // if we have an overflow, clear it and make it the new compress - // buffer - if (overflow != null) { - overflow.clear(); - compressed = overflow; - overflow = null; - } - } else { - compressed.clear(); - if (overflow != null) { - overflow.clear(); - } - } - - // now add the current buffer into the done list and get a new one. - current.position(0); - // update the header with the current length - writeHeader(current, 0, current.limit() - HEADER_SIZE, true); - receiver.output(current); - getNewInputBuffer(); - } - } - } - - void getPosition(PositionRecorder recorder) throws IOException { - if (codec == null) { - recorder.addPosition(uncompressedBytes); - } else { - recorder.addPosition(compressedBytes); - recorder.addPosition(uncompressedBytes); - } - } - - @Override - public void flush() throws IOException { - spill(); - if (compressed != null && compressed.position() != 0) { - compressed.flip(); - receiver.output(compressed); - compressed = null; - } - uncompressedBytes = 0; - compressedBytes = 0; - overflow = null; - current = null; - } - - @Override - public String toString() { - return name; - } - - @Override - public long getBufferSize() { - long result = 0; - if (current != null) { - result += current.capacity(); - } - if (compressed != null) { - result += compressed.capacity(); - } - if (overflow != null) { - result += overflow.capacity(); - } - return result; - } - - /** - * Set suppress flag - */ - public void suppress() { - suppress = true; - } - - /** - * Returns the state of suppress flag - * @return value of suppress flag - */ - public boolean isSuppressed() { - return suppress; - } -} - http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java deleted file mode 100644 index a39926e..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionRecorder.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -/** - * An interface for recording positions in a stream. - */ -interface PositionRecorder { - void addPosition(long offset); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java deleted file mode 100644 index 748c98c..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/PositionedOutputStream.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import java.io.IOException; -import java.io.OutputStream; - -abstract class PositionedOutputStream extends OutputStream { - - /** - * Record the current position to the recorder. - * @param recorder the object that receives the position - * @throws IOException - */ - abstract void getPosition(PositionRecorder recorder) throws IOException; - - /** - * Get the memory size currently allocated as buffer associated with this - * stream. - * @return the number of bytes used by buffers. - */ - abstract long getBufferSize(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java new file mode 100644 index 0000000..bc882e0 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RecordReaderUtils.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.orc.CompressionCodec; +import org.apache.orc.DataReader; +import org.apache.orc.OrcProto; +import org.apache.orc.impl.BufferChunk; +import org.apache.orc.impl.DirectDecompressionCodec; +import org.apache.orc.impl.OutStream; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class RecordReaderUtils { + + public static class DefaultDataReader implements DataReader { + private FSDataInputStream file; + private ByteBufferAllocatorPool pool; + private ZeroCopyAdapter zcr; + private FileSystem fs; + private Path path; + private boolean useZeroCopy; + private CompressionCodec codec; + private long readBytes = 0; + + public DefaultDataReader( + FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { + this.fs = fs; + this.path = path; + this.useZeroCopy = useZeroCopy; + this.codec = codec; + } + + @Override + public void open() throws IOException { + this.file = fs.open(path); + if (useZeroCopy) { + pool = new ByteBufferAllocatorPool(); + zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); + } else { + pool = null; + zcr = null; + } + } + + @Override + public DiskRangeList readFileData( + DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { + return readDiskRanges(file, zcr, baseOffset, range, doForceDirect); + } + + @Override + public void close() throws IOException { + if (file != null) { + file.close(); + } + if (pool != null) { + pool.clear(); + } + } + + @Override + public boolean isTrackingDiskRanges() { + return zcr != null; + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + zcr.releaseBuffer(buffer); + } + + public long getReadBytes() { + return readBytes; + } + + /** + * Read the list of ranges from the file. + * @param file the file to read + * @param base the base of the stripe + * @param range the disk ranges within the stripe to read + * @return the bytes read for each disk range, which is the same length as + * ranges + * @throws IOException + */ + private DiskRangeList readDiskRanges(FSDataInputStream file, + ZeroCopyAdapter zcr, + long base, + DiskRangeList range, + boolean doForceDirect) throws IOException { + if (range == null) return null; + DiskRangeList prev = range.prev; + if (prev == null) { + prev = new DiskRangeList.MutateHelper(range); + } + while (range != null) { + if (range.hasData()) { + range = range.next; + continue; + } + int len = (int) (range.getEnd() - range.getOffset()); + long off = range.getOffset(); + if (zcr != null) { + file.seek(base + off); + boolean hasReplaced = false; + while (len > 0) { + ByteBuffer partial = zcr.readBuffer(len, false); + readBytes += partial.remaining(); + BufferChunk bc = new BufferChunk(partial, off); + if (!hasReplaced) { + range.replaceSelfWith(bc); + hasReplaced = true; + } else { + range.insertAfter(bc); + } + range = bc; + int read = partial.remaining(); + len -= read; + off += read; + } + } else { + // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless. + byte[] buffer = new byte[len]; + file.readFully((base + off), buffer, 0, buffer.length); + readBytes += buffer.length; + ByteBuffer bb = null; + if (doForceDirect) { + bb = ByteBuffer.allocateDirect(len); + bb.put(buffer); + bb.position(0); + bb.limit(len); + } else { + bb = ByteBuffer.wrap(buffer); + } + range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset())); + } + range = range.next; + } + return prev.next; + } + } + + public static DataReader createDefaultDataReader( + FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { + return new DefaultDataReader(fs, path, useZeroCopy, codec); + } + + public static boolean[] findPresentStreamsByColumn( + List streamList, List types) { + boolean[] hasNull = new boolean[types.size()]; + for(OrcProto.Stream stream: streamList) { + if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) { + hasNull[stream.getColumn()] = true; + } + } + return hasNull; + } + + /** + * Does region A overlap region B? The end points are inclusive on both sides. + * @param leftA A's left point + * @param rightA A's right point + * @param leftB B's left point + * @param rightB B's right point + * @return Does region A overlap region B? + */ + static boolean overlap(long leftA, long rightA, long leftB, long rightB) { + if (leftA <= leftB) { + return rightA >= leftB; + } + return rightB >= leftA; + } + + public static void addEntireStreamToRanges( + long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) { + list.addOrMerge(offset, offset + length, doMergeBuffers, false); + } + + public static void addRgFilteredStreamToRanges(OrcProto.Stream stream, + boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index, + OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull, + long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) { + for (int group = 0; group < includedRowGroups.length; ++group) { + if (!includedRowGroups[group]) continue; + int posn = getIndexPosition( + encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull); + long start = index.getEntry(group).getPositions(posn); + final long nextGroupOffset; + boolean isLast = group == (includedRowGroups.length - 1); + nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn); + + start += offset; + long end = offset + estimateRgEndOffset( + isCompressed, isLast, nextGroupOffset, length, compressionSize); + list.addOrMerge(start, end, doMergeBuffers, true); + } + } + + public static long estimateRgEndOffset(boolean isCompressed, boolean isLast, + long nextGroupOffset, long streamLength, int bufferSize) { + // figure out the worst case last location + // if adjacent groups have the same compressed block offset then stretch the slop + // by factor of 2 to safely accommodate the next compression block. + // One for the current compression block and another for the next compression block. + long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP; + return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop); + } + + private static final int BYTE_STREAM_POSITIONS = 1; + private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1; + private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1; + private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1; + + /** + * Get the offset in the index positions for the column that the given + * stream starts. + * @param columnEncoding the encoding of the column + * @param columnType the type of the column + * @param streamType the kind of the stream + * @param isCompressed is the file compressed + * @param hasNulls does the column have a PRESENT stream? + * @return the number of positions that will be used for that stream + */ + public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding, + OrcProto.Type.Kind columnType, + OrcProto.Stream.Kind streamType, + boolean isCompressed, + boolean hasNulls) { + if (streamType == OrcProto.Stream.Kind.PRESENT) { + return 0; + } + int compressionValue = isCompressed ? 1 : 0; + int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0; + switch (columnType) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case STRUCT: + case MAP: + case LIST: + case UNION: + return base; + case CHAR: + case VARCHAR: + case STRING: + if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY || + columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + return base; + } else { + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } else { + return base + BYTE_STREAM_POSITIONS + compressionValue; + } + } + case BINARY: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + BYTE_STREAM_POSITIONS + compressionValue; + case DECIMAL: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + BYTE_STREAM_POSITIONS + compressionValue; + case TIMESTAMP: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + RUN_LENGTH_INT_POSITIONS + compressionValue; + default: + throw new IllegalArgumentException("Unknown type " + columnType); + } + } + + // for uncompressed streams, what is the most overlap with the following set + // of rows (long vint literal group). + static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512; + + /** + * Is this stream part of a dictionary? + * @return is this part of a dictionary? + */ + public static boolean isDictionary(OrcProto.Stream.Kind kind, + OrcProto.ColumnEncoding encoding) { + assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT; + OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind(); + return kind == OrcProto.Stream.Kind.DICTIONARY_DATA || + (kind == OrcProto.Stream.Kind.LENGTH && + (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)); + } + + /** + * Build a string representation of a list of disk ranges. + * @param range ranges to stringify + * @return the resulting string + */ + public static String stringifyDiskRanges(DiskRangeList range) { + StringBuilder buffer = new StringBuilder(); + buffer.append("["); + boolean isFirst = true; + while (range != null) { + if (!isFirst) { + buffer.append(", {"); + } else { + buffer.append("{"); + } + isFirst = false; + buffer.append(range.toString()); + buffer.append("}"); + range = range.next; + } + buffer.append("]"); + return buffer.toString(); + } + + public static List getStreamBuffers(DiskRangeList range, long offset, long length) { + // This assumes sorted ranges (as do many other parts of ORC code. + ArrayList buffers = new ArrayList(); + if (length == 0) return buffers; + long streamEnd = offset + length; + boolean inRange = false; + while (range != null) { + if (!inRange) { + if (range.getEnd() <= offset) { + range = range.next; + continue; // Skip until we are in range. + } + inRange = true; + if (range.getOffset() < offset) { + // Partial first buffer, add a slice of it. + buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset)); + if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer. + range = range.next; + continue; + } + } else if (range.getOffset() >= streamEnd) { + break; + } + if (range.getEnd() > streamEnd) { + // Partial last buffer (may also be the first buffer), add a slice of it. + buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset)); + break; + } + // Buffer that belongs entirely to one stream. + // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot + // because bufferChunks is also used by clearStreams for zcr. Create a useless dup. + buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset)); + if (range.getEnd() == streamEnd) break; + range = range.next; + } + return buffers; + } + + static ZeroCopyAdapter createZeroCopyShim(FSDataInputStream file, + CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException { + if ((codec == null || ((codec instanceof DirectDecompressionCodec) + && ((DirectDecompressionCodec) codec).isAvailable()))) { + /* codec is null or is available */ + return new ZeroCopyAdapter(file, pool); + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java deleted file mode 100644 index 2482f93..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RedBlackTree.java +++ /dev/null @@ -1,309 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.orc; - -/** - * A memory efficient red-black tree that does not allocate any objects per - * an element. This class is abstract and assumes that the child class - * handles the key and comparisons with the key. - */ -abstract class RedBlackTree { - public static final int NULL = -1; - - // Various values controlling the offset of the data within the array. - private static final int LEFT_OFFSET = 0; - private static final int RIGHT_OFFSET = 1; - private static final int ELEMENT_SIZE = 2; - - protected int size = 0; - private final DynamicIntArray data; - protected int root = NULL; - protected int lastAdd = 0; - private boolean wasAdd = false; - - /** - * Create a set with the given initial capacity. - */ - public RedBlackTree(int initialCapacity) { - data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE); - } - - /** - * Insert a new node into the data array, growing the array as necessary. - * - * @return Returns the position of the new node. - */ - private int insert(int left, int right, boolean isRed) { - int position = size; - size += 1; - setLeft(position, left, isRed); - setRight(position, right); - return position; - } - - /** - * Compare the value at the given position to the new value. - * @return 0 if the values are the same, -1 if the new value is smaller and - * 1 if the new value is larger. - */ - protected abstract int compareValue(int position); - - /** - * Is the given node red as opposed to black? To prevent having an extra word - * in the data array, we just the low bit on the left child index. - */ - protected boolean isRed(int position) { - return position != NULL && - (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1; - } - - /** - * Set the red bit true or false. - */ - private void setRed(int position, boolean isRed) { - int offset = position * ELEMENT_SIZE + LEFT_OFFSET; - if (isRed) { - data.set(offset, data.get(offset) | 1); - } else { - data.set(offset, data.get(offset) & ~1); - } - } - - /** - * Get the left field of the given position. - */ - protected int getLeft(int position) { - return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1; - } - - /** - * Get the right field of the given position. - */ - protected int getRight(int position) { - return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET); - } - - /** - * Set the left field of the given position. - * Note that we are storing the node color in the low bit of the left pointer. - */ - private void setLeft(int position, int left) { - int offset = position * ELEMENT_SIZE + LEFT_OFFSET; - data.set(offset, (left << 1) | (data.get(offset) & 1)); - } - - /** - * Set the left field of the given position. - * Note that we are storing the node color in the low bit of the left pointer. - */ - private void setLeft(int position, int left, boolean isRed) { - int offset = position * ELEMENT_SIZE + LEFT_OFFSET; - data.set(offset, (left << 1) | (isRed ? 1 : 0)); - } - - /** - * Set the right field of the given position. - */ - private void setRight(int position, int right) { - data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right); - } - - /** - * Insert or find a given key in the tree and rebalance the tree correctly. - * Rebalancing restores the red-black aspect of the tree to maintain the - * invariants: - * 1. If a node is red, both of its children are black. - * 2. Each child of a node has the same black height (the number of black - * nodes between it and the leaves of the tree). - * - * Inserted nodes are at the leaves and are red, therefore there is at most a - * violation of rule 1 at the node we just put in. Instead of always keeping - * the parents, this routine passing down the context. - * - * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are - * left-right mirror images of each other). See Algorighms by Cormen, - * Leiserson, and Rivest for the explaination of the subcases. - * - * @param node The node that we are fixing right now. - * @param fromLeft Did we come down from the left? - * @param parent Nodes' parent - * @param grandparent Parent's parent - * @param greatGrandparent Grandparent's parent - * @return Does parent also need to be checked and/or fixed? - */ - private boolean add(int node, boolean fromLeft, int parent, - int grandparent, int greatGrandparent) { - if (node == NULL) { - if (root == NULL) { - lastAdd = insert(NULL, NULL, false); - root = lastAdd; - wasAdd = true; - return false; - } else { - lastAdd = insert(NULL, NULL, true); - node = lastAdd; - wasAdd = true; - // connect the new node into the tree - if (fromLeft) { - setLeft(parent, node); - } else { - setRight(parent, node); - } - } - } else { - int compare = compareValue(node); - boolean keepGoing; - - // Recurse down to find where the node needs to be added - if (compare < 0) { - keepGoing = add(getLeft(node), true, node, parent, grandparent); - } else if (compare > 0) { - keepGoing = add(getRight(node), false, node, parent, grandparent); - } else { - lastAdd = node; - wasAdd = false; - return false; - } - - // we don't need to fix the root (because it is always set to black) - if (node == root || !keepGoing) { - return false; - } - } - - - // Do we need to fix this node? Only if there are two reds right under each - // other. - if (isRed(node) && isRed(parent)) { - if (parent == getLeft(grandparent)) { - int uncle = getRight(grandparent); - if (isRed(uncle)) { - // case 1.1 - setRed(parent, false); - setRed(uncle, false); - setRed(grandparent, true); - return true; - } else { - if (node == getRight(parent)) { - // case 1.2 - // swap node and parent - int tmp = node; - node = parent; - parent = tmp; - // left-rotate on node - setLeft(grandparent, parent); - setRight(node, getLeft(parent)); - setLeft(parent, node); - } - - // case 1.2 and 1.3 - setRed(parent, false); - setRed(grandparent, true); - - // right-rotate on grandparent - if (greatGrandparent == NULL) { - root = parent; - } else if (getLeft(greatGrandparent) == grandparent) { - setLeft(greatGrandparent, parent); - } else { - setRight(greatGrandparent, parent); - } - setLeft(grandparent, getRight(parent)); - setRight(parent, grandparent); - return false; - } - } else { - int uncle = getLeft(grandparent); - if (isRed(uncle)) { - // case 2.1 - setRed(parent, false); - setRed(uncle, false); - setRed(grandparent, true); - return true; - } else { - if (node == getLeft(parent)) { - // case 2.2 - // swap node and parent - int tmp = node; - node = parent; - parent = tmp; - // right-rotate on node - setRight(grandparent, parent); - setLeft(node, getRight(parent)); - setRight(parent, node); - } - // case 2.2 and 2.3 - setRed(parent, false); - setRed(grandparent, true); - // left-rotate on grandparent - if (greatGrandparent == NULL) { - root = parent; - } else if (getRight(greatGrandparent) == grandparent) { - setRight(greatGrandparent, parent); - } else { - setLeft(greatGrandparent, parent); - } - setRight(grandparent, getLeft(parent)); - setLeft(parent, grandparent); - return false; - } - } - } else { - return true; - } - } - - /** - * Add the new key to the tree. - * @return true if the element is a new one. - */ - protected boolean add() { - add(root, false, NULL, NULL, NULL); - if (wasAdd) { - setRed(root, false); - return true; - } else { - return false; - } - } - - /** - * Get the number of elements in the set. - */ - public int size() { - return size; - } - - /** - * Reset the table to empty. - */ - public void clear() { - root = NULL; - size = 0; - data.clear(); - } - - /** - * Get the buffer size in bytes. - */ - public long getSizeInBytes() { - return data.getSizeInBytes(); - } -} - http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java deleted file mode 100644 index 0953cdd..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthByteWriter.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import java.io.IOException; - -/** - * A streamFactory that writes a sequence of bytes. A control byte is written before - * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the - * bytes is -1 to -128, 1 to 128 literal byte values follow. - */ -class RunLengthByteWriter { - static final int MIN_REPEAT_SIZE = 3; - static final int MAX_LITERAL_SIZE = 128; - static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE; - private final PositionedOutputStream output; - private final byte[] literals = new byte[MAX_LITERAL_SIZE]; - private int numLiterals = 0; - private boolean repeat = false; - private int tailRunLength = 0; - - RunLengthByteWriter(PositionedOutputStream output) { - this.output = output; - } - - private void writeValues() throws IOException { - if (numLiterals != 0) { - if (repeat) { - output.write(numLiterals - MIN_REPEAT_SIZE); - output.write(literals, 0, 1); - } else { - output.write(-numLiterals); - output.write(literals, 0, numLiterals); - } - repeat = false; - tailRunLength = 0; - numLiterals = 0; - } - } - - void flush() throws IOException { - writeValues(); - output.flush(); - } - - void write(byte value) throws IOException { - if (numLiterals == 0) { - literals[numLiterals++] = value; - tailRunLength = 1; - } else if (repeat) { - if (value == literals[0]) { - numLiterals += 1; - if (numLiterals == MAX_REPEAT_SIZE) { - writeValues(); - } - } else { - writeValues(); - literals[numLiterals++] = value; - tailRunLength = 1; - } - } else { - if (value == literals[numLiterals - 1]) { - tailRunLength += 1; - } else { - tailRunLength = 1; - } - if (tailRunLength == MIN_REPEAT_SIZE) { - if (numLiterals + 1 == MIN_REPEAT_SIZE) { - repeat = true; - numLiterals += 1; - } else { - numLiterals -= MIN_REPEAT_SIZE - 1; - writeValues(); - literals[0] = value; - repeat = true; - numLiterals = MIN_REPEAT_SIZE; - } - } else { - literals[numLiterals++] = value; - if (numLiterals == MAX_LITERAL_SIZE) { - writeValues(); - } - } - } - } - - void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java deleted file mode 100644 index 867f041..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriter.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import java.io.IOException; - -/** - * A streamFactory that writes a sequence of integers. A control byte is written before - * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each - * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 - * literal vint values follow. - */ -class RunLengthIntegerWriter implements IntegerWriter { - static final int MIN_REPEAT_SIZE = 3; - static final int MAX_DELTA = 127; - static final int MIN_DELTA = -128; - static final int MAX_LITERAL_SIZE = 128; - private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE; - private final PositionedOutputStream output; - private final boolean signed; - private final long[] literals = new long[MAX_LITERAL_SIZE]; - private int numLiterals = 0; - private long delta = 0; - private boolean repeat = false; - private int tailRunLength = 0; - private SerializationUtils utils; - - RunLengthIntegerWriter(PositionedOutputStream output, - boolean signed) { - this.output = output; - this.signed = signed; - this.utils = new SerializationUtils(); - } - - private void writeValues() throws IOException { - if (numLiterals != 0) { - if (repeat) { - output.write(numLiterals - MIN_REPEAT_SIZE); - output.write((byte) delta); - if (signed) { - utils.writeVslong(output, literals[0]); - } else { - utils.writeVulong(output, literals[0]); - } - } else { - output.write(-numLiterals); - for(int i=0; i < numLiterals; ++i) { - if (signed) { - utils.writeVslong(output, literals[i]); - } else { - utils.writeVulong(output, literals[i]); - } - } - } - repeat = false; - numLiterals = 0; - tailRunLength = 0; - } - } - - @Override - public void flush() throws IOException { - writeValues(); - output.flush(); - } - - @Override - public void write(long value) throws IOException { - if (numLiterals == 0) { - literals[numLiterals++] = value; - tailRunLength = 1; - } else if (repeat) { - if (value == literals[0] + delta * numLiterals) { - numLiterals += 1; - if (numLiterals == MAX_REPEAT_SIZE) { - writeValues(); - } - } else { - writeValues(); - literals[numLiterals++] = value; - tailRunLength = 1; - } - } else { - if (tailRunLength == 1) { - delta = value - literals[numLiterals - 1]; - if (delta < MIN_DELTA || delta > MAX_DELTA) { - tailRunLength = 1; - } else { - tailRunLength = 2; - } - } else if (value == literals[numLiterals - 1] + delta) { - tailRunLength += 1; - } else { - delta = value - literals[numLiterals - 1]; - if (delta < MIN_DELTA || delta > MAX_DELTA) { - tailRunLength = 1; - } else { - tailRunLength = 2; - } - } - if (tailRunLength == MIN_REPEAT_SIZE) { - if (numLiterals + 1 == MIN_REPEAT_SIZE) { - repeat = true; - numLiterals += 1; - } else { - numLiterals -= MIN_REPEAT_SIZE - 1; - long base = literals[numLiterals]; - writeValues(); - literals[0] = base; - repeat = true; - numLiterals = MIN_REPEAT_SIZE; - } - } else { - literals[numLiterals++] = value; - if (numLiterals == MAX_LITERAL_SIZE) { - writeValues(); - } - } - } - } - - @Override - public void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java deleted file mode 100644 index 7237b2e..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java +++ /dev/null @@ -1,832 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import java.io.IOException; - -/** - * A writer that performs light weight compression over sequence of integers. - *

- * There are four types of lightweight integer compression - *

    - *
  • SHORT_REPEAT
  • - *
  • DIRECT
  • - *
  • PATCHED_BASE
  • - *
  • DELTA
  • - *
- *

- * The description and format for these types are as below: - *

- * SHORT_REPEAT: Used for short repeated integer sequences. - *

    - *
  • 1 byte header - *
      - *
    • 2 bits for encoding type
    • - *
    • 3 bits for bytes required for repeating value
    • - *
    • 3 bits for repeat count (MIN_REPEAT + run length)
    • - *
    - *
  • - *
  • Blob - repeat value (fixed bytes)
  • - *
- *

- *

- * DIRECT: Used for random integer sequences whose number of bit - * requirement doesn't vary a lot. - *

    - *
  • 2 bytes header - *
      - * 1st byte - *
    • 2 bits for encoding type
    • - *
    • 5 bits for fixed bit width of values in blob
    • - *
    • 1 bit for storing MSB of run length
    • - *
    - *
      - * 2nd byte - *
    • 8 bits for lower run length bits
    • - *
    - *
  • - *
  • Blob - stores the direct values using fixed bit width. The length of the - * data blob is (fixed width * run length) bits long
  • - *
- *

- *

- * PATCHED_BASE: Used for random integer sequences whose number of bit - * requirement varies beyond a threshold. - *

    - *
  • 4 bytes header - *
      - * 1st byte - *
    • 2 bits for encoding type
    • - *
    • 5 bits for fixed bit width of values in blob
    • - *
    • 1 bit for storing MSB of run length
    • - *
    - *
      - * 2nd byte - *
    • 8 bits for lower run length bits
    • - *
    - *
      - * 3rd byte - *
    • 3 bits for bytes required to encode base value
    • - *
    • 5 bits for patch width
    • - *
    - *
      - * 4th byte - *
    • 3 bits for patch gap width
    • - *
    • 5 bits for patch length
    • - *
    - *
  • - *
  • Base value - Stored using fixed number of bytes. If MSB is set, base - * value is negative else positive. Length of base value is (base width * 8) - * bits.
  • - *
  • Data blob - Base reduced values as stored using fixed bit width. Length - * of data blob is (fixed width * run length) bits.
  • - *
  • Patch blob - Patch blob is a list of gap and patch value. Each entry in - * the patch list is (patch width + patch gap width) bits long. Gap between the - * subsequent elements to be patched are stored in upper part of entry whereas - * patch values are stored in lower part of entry. Length of patch blob is - * ((patch width + patch gap width) * patch length) bits.
  • - *
- *

- *

- * DELTA Used for monotonically increasing or decreasing sequences, - * sequences with fixed delta values or long repeated sequences. - *

    - *
  • 2 bytes header - *
      - * 1st byte - *
    • 2 bits for encoding type
    • - *
    • 5 bits for fixed bit width of values in blob
    • - *
    • 1 bit for storing MSB of run length
    • - *
    - *
      - * 2nd byte - *
    • 8 bits for lower run length bits
    • - *
    - *
  • - *
  • Base value - encoded as varint
  • - *
  • Delta base - encoded as varint
  • - *
  • Delta blob - only positive values. monotonicity and orderness are decided - * based on the sign of the base value and delta base
  • - *
- *

- */ -class RunLengthIntegerWriterV2 implements IntegerWriter { - - public enum EncodingType { - SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA - } - - static final int MAX_SCOPE = 512; - static final int MIN_REPEAT = 3; - private static final int MAX_SHORT_REPEAT_LENGTH = 10; - private long prevDelta = 0; - private int fixedRunLength = 0; - private int variableRunLength = 0; - private final long[] literals = new long[MAX_SCOPE]; - private final PositionedOutputStream output; - private final boolean signed; - private EncodingType encoding; - private int numLiterals; - private final long[] zigzagLiterals = new long[MAX_SCOPE]; - private final long[] baseRedLiterals = new long[MAX_SCOPE]; - private final long[] adjDeltas = new long[MAX_SCOPE]; - private long fixedDelta; - private int zzBits90p; - private int zzBits100p; - private int brBits95p; - private int brBits100p; - private int bitsDeltaMax; - private int patchWidth; - private int patchGapWidth; - private int patchLength; - private long[] gapVsPatchList; - private long min; - private boolean isFixedDelta; - private SerializationUtils utils; - private boolean alignedBitpacking; - - RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { - this(output, signed, true); - } - - RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed, - boolean alignedBitpacking) { - this.output = output; - this.signed = signed; - this.alignedBitpacking = alignedBitpacking; - this.utils = new SerializationUtils(); - clear(); - } - - private void writeValues() throws IOException { - if (numLiterals != 0) { - - if (encoding.equals(EncodingType.SHORT_REPEAT)) { - writeShortRepeatValues(); - } else if (encoding.equals(EncodingType.DIRECT)) { - writeDirectValues(); - } else if (encoding.equals(EncodingType.PATCHED_BASE)) { - writePatchedBaseValues(); - } else { - writeDeltaValues(); - } - - // clear all the variables - clear(); - } - } - - private void writeDeltaValues() throws IOException { - int len = 0; - int fb = bitsDeltaMax; - int efb = 0; - - if (alignedBitpacking) { - fb = utils.getClosestAlignedFixedBits(fb); - } - - if (isFixedDelta) { - // if fixed run length is greater than threshold then it will be fixed - // delta sequence with delta value 0 else fixed delta sequence with - // non-zero delta value - if (fixedRunLength > MIN_REPEAT) { - // ex. sequence: 2 2 2 2 2 2 2 2 - len = fixedRunLength - 1; - fixedRunLength = 0; - } else { - // ex. sequence: 4 6 8 10 12 14 16 - len = variableRunLength - 1; - variableRunLength = 0; - } - } else { - // fixed width 0 is used for long repeating values. - // sequences that require only 1 bit to encode will have an additional bit - if (fb == 1) { - fb = 2; - } - efb = utils.encodeBitWidth(fb); - efb = efb << 1; - len = variableRunLength - 1; - variableRunLength = 0; - } - - // extract the 9th bit of run length - final int tailBits = (len & 0x100) >>> 8; - - // create first byte of the header - final int headerFirstByte = getOpcode() | efb | tailBits; - - // second byte of the header stores the remaining 8 bits of runlength - final int headerSecondByte = len & 0xff; - - // write header - output.write(headerFirstByte); - output.write(headerSecondByte); - - // store the first value from zigzag literal array - if (signed) { - utils.writeVslong(output, literals[0]); - } else { - utils.writeVulong(output, literals[0]); - } - - if (isFixedDelta) { - // if delta is fixed then we don't need to store delta blob - utils.writeVslong(output, fixedDelta); - } else { - // store the first value as delta value using zigzag encoding - utils.writeVslong(output, adjDeltas[0]); - - // adjacent delta values are bit packed. The length of adjDeltas array is - // always one less than the number of literals (delta difference for n - // elements is n-1). We have already written one element, write the - // remaining numLiterals - 2 elements here - utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output); - } - } - - private void writePatchedBaseValues() throws IOException { - - // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding - // because patch is applied to MSB bits. For example: If fixed bit width of - // base value is 7 bits and if patch is 3 bits, the actual value is - // constructed by shifting the patch to left by 7 positions. - // actual_value = patch << 7 | base_value - // So, if we align base_value then actual_value can not be reconstructed. - - // write the number of fixed bits required in next 5 bits - final int fb = brBits95p; - final int efb = utils.encodeBitWidth(fb) << 1; - - // adjust variable run length, they are one off - variableRunLength -= 1; - - // extract the 9th bit of run length - final int tailBits = (variableRunLength & 0x100) >>> 8; - - // create first byte of the header - final int headerFirstByte = getOpcode() | efb | tailBits; - - // second byte of the header stores the remaining 8 bits of runlength - final int headerSecondByte = variableRunLength & 0xff; - - // if the min value is negative toggle the sign - final boolean isNegative = min < 0 ? true : false; - if (isNegative) { - min = -min; - } - - // find the number of bytes required for base and shift it by 5 bits - // to accommodate patch width. The additional bit is used to store the sign - // of the base value. - final int baseWidth = utils.findClosestNumBits(min) + 1; - final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; - final int bb = (baseBytes - 1) << 5; - - // if the base value is negative then set MSB to 1 - if (isNegative) { - min |= (1L << ((baseBytes * 8) - 1)); - } - - // third byte contains 3 bits for number of bytes occupied by base - // and 5 bits for patchWidth - final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth); - - // fourth byte contains 3 bits for page gap width and 5 bits for - // patch length - final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; - - // write header - output.write(headerFirstByte); - output.write(headerSecondByte); - output.write(headerThirdByte); - output.write(headerFourthByte); - - // write the base value using fixed bytes in big endian order - for(int i = baseBytes - 1; i >= 0; i--) { - byte b = (byte) ((min >>> (i * 8)) & 0xff); - output.write(b); - } - - // base reduced literals are bit packed - int closestFixedBits = utils.getClosestFixedBits(fb); - - utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits, - output); - - // write patch list - closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth); - - utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, - output); - - // reset run length - variableRunLength = 0; - } - - /** - * Store the opcode in 2 MSB bits - * @return opcode - */ - private int getOpcode() { - return encoding.ordinal() << 6; - } - - private void writeDirectValues() throws IOException { - - // write the number of fixed bits required in next 5 bits - int fb = zzBits100p; - - if (alignedBitpacking) { - fb = utils.getClosestAlignedFixedBits(fb); - } - - final int efb = utils.encodeBitWidth(fb) << 1; - - // adjust variable run length - variableRunLength -= 1; - - // extract the 9th bit of run length - final int tailBits = (variableRunLength & 0x100) >>> 8; - - // create first byte of the header - final int headerFirstByte = getOpcode() | efb | tailBits; - - // second byte of the header stores the remaining 8 bits of runlength - final int headerSecondByte = variableRunLength & 0xff; - - // write header - output.write(headerFirstByte); - output.write(headerSecondByte); - - // bit packing the zigzag encoded literals - utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output); - - // reset run length - variableRunLength = 0; - } - - private void writeShortRepeatValues() throws IOException { - // get the value that is repeating, compute the bits and bytes required - long repeatVal = 0; - if (signed) { - repeatVal = utils.zigzagEncode(literals[0]); - } else { - repeatVal = literals[0]; - } - - final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal); - final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 - : (numBitsRepeatVal >>> 3) + 1; - - // write encoding type in top 2 bits - int header = getOpcode(); - - // write the number of bytes required for the value - header |= ((numBytesRepeatVal - 1) << 3); - - // write the run length - fixedRunLength -= MIN_REPEAT; - header |= fixedRunLength; - - // write the header - output.write(header); - - // write the repeating value in big endian byte order - for(int i = numBytesRepeatVal - 1; i >= 0; i--) { - int b = (int) ((repeatVal >>> (i * 8)) & 0xff); - output.write(b); - } - - fixedRunLength = 0; - } - - private void determineEncoding() { - - // we need to compute zigzag values for DIRECT encoding if we decide to - // break early for delta overflows or for shorter runs - computeZigZagLiterals(); - - zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0); - - // not a big win for shorter runs to determine encoding - if (numLiterals <= MIN_REPEAT) { - encoding = EncodingType.DIRECT; - return; - } - - // DELTA encoding check - - // for identifying monotonic sequences - boolean isIncreasing = true; - boolean isDecreasing = true; - this.isFixedDelta = true; - - this.min = literals[0]; - long max = literals[0]; - final long initialDelta = literals[1] - literals[0]; - long currDelta = initialDelta; - long deltaMax = initialDelta; - this.adjDeltas[0] = initialDelta; - - for (int i = 1; i < numLiterals; i++) { - final long l1 = literals[i]; - final long l0 = literals[i - 1]; - currDelta = l1 - l0; - min = Math.min(min, l1); - max = Math.max(max, l1); - - isIncreasing &= (l0 <= l1); - isDecreasing &= (l0 >= l1); - - isFixedDelta &= (currDelta == initialDelta); - if (i > 1) { - adjDeltas[i - 1] = Math.abs(currDelta); - deltaMax = Math.max(deltaMax, adjDeltas[i - 1]); - } - } - - // its faster to exit under delta overflow condition without checking for - // PATCHED_BASE condition as encoding using DIRECT is faster and has less - // overhead than PATCHED_BASE - if (!utils.isSafeSubtract(max, min)) { - encoding = EncodingType.DIRECT; - return; - } - - // invariant - subtracting any number from any other in the literals after - // this point won't overflow - - // if initialDelta is 0 then we cannot delta encode as we cannot identify - // the sign of deltas (increasing or decreasing) - if (initialDelta != 0) { - - // if min is equal to max then the delta is 0, this condition happens for - // fixed values run >10 which cannot be encoded with SHORT_REPEAT - if (min == max) { - assert isFixedDelta : min + "==" + max + - ", isFixedDelta cannot be false"; - assert currDelta == 0 : min + "==" + max + ", currDelta should be zero"; - fixedDelta = 0; - encoding = EncodingType.DELTA; - return; - } - - if (isFixedDelta) { - assert currDelta == initialDelta - : "currDelta should be equal to initialDelta for fixed delta encoding"; - encoding = EncodingType.DELTA; - fixedDelta = currDelta; - return; - } - - // stores the number of bits required for packing delta blob in - // delta encoding - bitsDeltaMax = utils.findClosestNumBits(deltaMax); - - // monotonic condition - if (isIncreasing || isDecreasing) { - encoding = EncodingType.DELTA; - return; - } - } - - // PATCHED_BASE encoding check - - // percentile values are computed for the zigzag encoded values. if the - // number of bit requirement between 90th and 100th percentile varies - // beyond a threshold then we need to patch the values. if the variation - // is not significant then we can use direct encoding - - zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9); - int diffBitsLH = zzBits100p - zzBits90p; - - // if the difference between 90th percentile and 100th percentile fixed - // bits is > 1 then we need patch the values - if (diffBitsLH > 1) { - - // patching is done only on base reduced values. - // remove base from literals - for (int i = 0; i < numLiterals; i++) { - baseRedLiterals[i] = literals[i] - min; - } - - // 95th percentile width is used to determine max allowed value - // after which patching will be done - brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95); - - // 100th percentile is used to compute the max patch width - brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0); - - // after base reducing the values, if the difference in bits between - // 95th percentile and 100th percentile value is zero then there - // is no point in patching the values, in which case we will - // fallback to DIRECT encoding. - // The decision to use patched base was based on zigzag values, but the - // actual patching is done on base reduced literals. - if ((brBits100p - brBits95p) != 0) { - encoding = EncodingType.PATCHED_BASE; - preparePatchedBlob(); - return; - } else { - encoding = EncodingType.DIRECT; - return; - } - } else { - // if difference in bits between 95th percentile and 100th percentile is - // 0, then patch length will become 0. Hence we will fallback to direct - encoding = EncodingType.DIRECT; - return; - } - } - - private void computeZigZagLiterals() { - // populate zigzag encoded literals - long zzEncVal = 0; - for (int i = 0; i < numLiterals; i++) { - if (signed) { - zzEncVal = utils.zigzagEncode(literals[i]); - } else { - zzEncVal = literals[i]; - } - zigzagLiterals[i] = zzEncVal; - } - } - - private void preparePatchedBlob() { - // mask will be max value beyond which patch will be generated - long mask = (1L << brBits95p) - 1; - - // since we are considering only 95 percentile, the size of gap and - // patch array can contain only be 5% values - patchLength = (int) Math.ceil((numLiterals * 0.05)); - - int[] gapList = new int[patchLength]; - long[] patchList = new long[patchLength]; - - // #bit for patch - patchWidth = brBits100p - brBits95p; - patchWidth = utils.getClosestFixedBits(patchWidth); - - // if patch bit requirement is 64 then it will not possible to pack - // gap and patch together in a long. To make sure gap and patch can be - // packed together adjust the patch width - if (patchWidth == 64) { - patchWidth = 56; - brBits95p = 8; - mask = (1L << brBits95p) - 1; - } - - int gapIdx = 0; - int patchIdx = 0; - int prev = 0; - int gap = 0; - int maxGap = 0; - - for(int i = 0; i < numLiterals; i++) { - // if value is above mask then create the patch and record the gap - if (baseRedLiterals[i] > mask) { - gap = i - prev; - if (gap > maxGap) { - maxGap = gap; - } - - // gaps are relative, so store the previous patched value index - prev = i; - gapList[gapIdx++] = gap; - - // extract the most significant bits that are over mask bits - long patch = baseRedLiterals[i] >>> brBits95p; - patchList[patchIdx++] = patch; - - // strip off the MSB to enable safe bit packing - baseRedLiterals[i] &= mask; - } - } - - // adjust the patch length to number of entries in gap list - patchLength = gapIdx; - - // if the element to be patched is the first and only element then - // max gap will be 0, but to store the gap as 0 we need atleast 1 bit - if (maxGap == 0 && patchLength != 0) { - patchGapWidth = 1; - } else { - patchGapWidth = utils.findClosestNumBits(maxGap); - } - - // special case: if the patch gap width is greater than 256, then - // we need 9 bits to encode the gap width. But we only have 3 bits in - // header to record the gap width. To deal with this case, we will save - // two entries in patch list in the following way - // 256 gap width => 0 for patch value - // actual gap - 256 => actual patch value - // We will do the same for gap width = 511. If the element to be patched is - // the last element in the scope then gap width will be 511. In this case we - // will have 3 entries in the patch list in the following way - // 255 gap width => 0 for patch value - // 255 gap width => 0 for patch value - // 1 gap width => actual patch value - if (patchGapWidth > 8) { - patchGapWidth = 8; - // for gap = 511, we need two additional entries in patch list - if (maxGap == 511) { - patchLength += 2; - } else { - patchLength += 1; - } - } - - // create gap vs patch list - gapIdx = 0; - patchIdx = 0; - gapVsPatchList = new long[patchLength]; - for(int i = 0; i < patchLength; i++) { - long g = gapList[gapIdx++]; - long p = patchList[patchIdx++]; - while (g > 255) { - gapVsPatchList[i++] = (255L << patchWidth); - g -= 255; - } - - // store patch value in LSBs and gap in MSBs - gapVsPatchList[i] = (g << patchWidth) | p; - } - } - - /** - * clears all the variables - */ - private void clear() { - numLiterals = 0; - encoding = null; - prevDelta = 0; - fixedDelta = 0; - zzBits90p = 0; - zzBits100p = 0; - brBits95p = 0; - brBits100p = 0; - bitsDeltaMax = 0; - patchGapWidth = 0; - patchLength = 0; - patchWidth = 0; - gapVsPatchList = null; - min = 0; - isFixedDelta = true; - } - - @Override - public void flush() throws IOException { - if (numLiterals != 0) { - if (variableRunLength != 0) { - determineEncoding(); - writeValues(); - } else if (fixedRunLength != 0) { - if (fixedRunLength < MIN_REPEAT) { - variableRunLength = fixedRunLength; - fixedRunLength = 0; - determineEncoding(); - writeValues(); - } else if (fixedRunLength >= MIN_REPEAT - && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { - encoding = EncodingType.SHORT_REPEAT; - writeValues(); - } else { - encoding = EncodingType.DELTA; - isFixedDelta = true; - writeValues(); - } - } - } - output.flush(); - } - - @Override - public void write(long val) throws IOException { - if (numLiterals == 0) { - initializeLiterals(val); - } else { - if (numLiterals == 1) { - prevDelta = val - literals[0]; - literals[numLiterals++] = val; - // if both values are same count as fixed run else variable run - if (val == literals[0]) { - fixedRunLength = 2; - variableRunLength = 0; - } else { - fixedRunLength = 0; - variableRunLength = 2; - } - } else { - long currentDelta = val - literals[numLiterals - 1]; - if (prevDelta == 0 && currentDelta == 0) { - // fixed delta run - - literals[numLiterals++] = val; - - // if variable run is non-zero then we are seeing repeating - // values at the end of variable run in which case keep - // updating variable and fixed runs - if (variableRunLength > 0) { - fixedRunLength = 2; - } - fixedRunLength += 1; - - // if fixed run met the minimum condition and if variable - // run is non-zero then flush the variable run and shift the - // tail fixed runs to start of the buffer - if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { - numLiterals -= MIN_REPEAT; - variableRunLength -= MIN_REPEAT - 1; - // copy the tail fixed runs - long[] tailVals = new long[MIN_REPEAT]; - System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); - - // determine variable encoding and flush values - determineEncoding(); - writeValues(); - - // shift tail fixed runs to beginning of the buffer - for(long l : tailVals) { - literals[numLiterals++] = l; - } - } - - // if fixed runs reached max repeat length then write values - if (fixedRunLength == MAX_SCOPE) { - determineEncoding(); - writeValues(); - } - } else { - // variable delta run - - // if fixed run length is non-zero and if it satisfies the - // short repeat conditions then write the values as short repeats - // else use delta encoding - if (fixedRunLength >= MIN_REPEAT) { - if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { - encoding = EncodingType.SHORT_REPEAT; - writeValues(); - } else { - encoding = EncodingType.DELTA; - isFixedDelta = true; - writeValues(); - } - } - - // if fixed run length is 0 && fixedRunLength < MIN_REPEAT) { - if (val != literals[numLiterals - 1]) { - variableRunLength = fixedRunLength; - fixedRunLength = 0; - } - } - - // after writing values re-initialize the variables - if (numLiterals == 0) { - initializeLiterals(val); - } else { - // keep updating variable run lengths - prevDelta = val - literals[numLiterals - 1]; - literals[numLiterals++] = val; - variableRunLength += 1; - - // if variable run length reach the max scope, write it - if (variableRunLength == MAX_SCOPE) { - determineEncoding(); - writeValues(); - } - } - } - } - } - } - - private void initializeLiterals(long val) { - literals[numLiterals++] = val; - fixedRunLength = 1; - variableRunLength = 1; - } - - @Override - public void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); - } -}