Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1FE0C18610 for ; Thu, 2 Jul 2015 08:40:26 +0000 (UTC) Received: (qmail 8513 invoked by uid 500); 2 Jul 2015 08:40:25 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 8443 invoked by uid 500); 2 Jul 2015 08:40:25 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 8426 invoked by uid 99); 2 Jul 2015 08:40:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 02 Jul 2015 08:40:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A1CAE02A2; Thu, 2 Jul 2015 08:40:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Thu, 02 Jul 2015 08:40:25 -0000 Message-Id: <893ace1e30d7406c934677f359463549@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] cassandra git commit: add vInt encoding to Data(Input|Output)Plus Repository: cassandra Updated Branches: refs/heads/trunk 6092b01e3 -> 03f72acd5 add vInt encoding to Data(Input|Output)Plus patch by ariel and benedict for CASSANDRA-9499 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1491a40b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1491a40b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1491a40b Branch: refs/heads/trunk Commit: 1491a40b7b4ea2723bcf22d870ee514b47ea901b Parents: 6092b01 Author: Ariel Weisberg Authored: Mon Jun 15 14:31:03 2015 -0400 Committer: Benedict Elliott Smith Committed: Thu Jul 2 09:39:57 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 +- NOTICE.txt | 5 + src/java/org/apache/cassandra/db/TypeSizes.java | 23 +-- .../cassandra/io/util/AbstractDataInput.java | 34 ++++ .../io/util/BufferedDataOutputStreamPlus.java | 23 ++- .../cassandra/io/util/DataOutputPlus.java | 19 ++ .../cassandra/io/util/NIODataInputStream.java | 86 +++++++-- .../io/util/UnbufferedDataOutputStreamPlus.java | 1 - .../utils/vint/EncodedDataInputStream.java | 47 +---- .../utils/vint/EncodedDataOutputStream.java | 35 +--- .../apache/cassandra/utils/vint/VIntCoding.java | 183 +++++++++++++++++++ .../io/util/BufferedDataOutputStreamTest.java | 95 +++++++++- .../io/util/NIODataInputStreamTest.java | 120 ++++++++++-- .../cassandra/utils/vint/VIntCodingTest.java | 85 +++++++++ 14 files changed, 632 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6895395..7561e4b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,7 +10,7 @@ * Change default garbage collector to G1 (CASSANDRA-7486) * Populate TokenMetadata early during startup (CASSANDRA-9317) * undeprecate cache recentHitRate (CASSANDRA-6591) - + * Add support for selectively varint encoding fields (CASSANDRA-9499) 2.2.0-rc2 * (cqlsh) Allow setting the initial connection timeout (CASSANDRA-9601) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/NOTICE.txt ---------------------------------------------------------------------- diff --git a/NOTICE.txt b/NOTICE.txt index a71d822..0ad792f 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -74,3 +74,8 @@ OHC (https://github.com/snazy/ohc) Java Off-Heap-Cache, licensed under APLv2 Copyright 2014-2015 Robert Stupp, Germany. + +Protocol buffers for varint encoding +https://developers.google.com/protocol-buffers/ +Copyright 2008 Google Inc. All rights reserved. +BSD 3-clause http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/db/TypeSizes.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TypeSizes.java b/src/java/org/apache/cassandra/db/TypeSizes.java index efae762..79d5774 100644 --- a/src/java/org/apache/cassandra/db/TypeSizes.java +++ b/src/java/org/apache/cassandra/db/TypeSizes.java @@ -20,6 +20,8 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; import java.util.UUID; +import org.apache.cassandra.utils.vint.VIntCoding; + public abstract class TypeSizes { public static final TypeSizes NATIVE = new NativeDBTypeSizes(); @@ -106,26 +108,7 @@ public abstract class TypeSizes public int sizeofVInt(long i) { - if (i >= -112 && i <= 127) - return 1; - - int size = 0; - int len = -112; - if (i < 0) - { - i ^= -1L; // take one's complement' - len = -120; - } - long tmp = i; - while (tmp != 0) - { - tmp = tmp >> 8; - len--; - } - size++; - len = (len < -120) ? -(len + 120) : -(len + 112); - size += len; - return size; + return VIntCoding.computeVIntSize(i); } public int sizeof(long i) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java index 588540d..935a06d 100644 --- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java +++ b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java @@ -19,6 +19,8 @@ package org.apache.cassandra.io.util; import java.io.*; +import org.apache.cassandra.utils.vint.VIntCoding; + public abstract class AbstractDataInput extends InputStream implements DataInput { public abstract void seek(long position) throws IOException; @@ -265,6 +267,38 @@ public abstract class AbstractDataInput extends InputStream implements DataInput } /** + * Reads a varint encoded integer from the current position in this file. Blocks until + * the end of the varint is reached, the end of the file is reached, or an exception is + * thrown. + * + * @return the next varint value from this file. + * @throws EOFException + * if the end of this file is detected. + * @throws IOException + * if this file is closed or another I/O error occurs. + */ + public long readVInt() throws IOException + { + return VIntCoding.readVInt(this); + } + + /** + * Reads an unsigned varint encoded integer from the current position in this file. Blocks until + * the end of the varint is reached, the end of the file is reached, or an exception is + * thrown. + * + * @return the next unsigned varint value from this file. + * @throws EOFException + * if the end of this file is detected. + * @throws IOException + * if this file is closed or another I/O error occurs. + */ + public long readUnsignedVInt() throws IOException + { + return VIntCoding.readUnsignedVInt(this); + } + + /** * Reads a 16-bit short from the current position in this file. Blocks until * two bytes have been read, the end of the file is reached or an exception * is thrown. http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java index 5669a8d..b6f3231 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/BufferedDataOutputStreamPlus.java @@ -29,7 +29,7 @@ import com.google.common.base.Preconditions; import org.apache.cassandra.config.Config; import org.apache.cassandra.utils.memory.MemoryUtil; - +import org.apache.cassandra.utils.vint.VIntCoding; /** * An implementation of the DataOutputStreamPlus interface using a ByteBuffer to stage writes @@ -214,6 +214,27 @@ public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus } @Override + public void writeVInt(long value) throws IOException + { + writeUnsignedVInt(VIntCoding.encodeZigZag64(value)); + } + + @Override + public void writeUnsignedVInt(long value) throws IOException + { + int size = VIntCoding.computeUnsignedVIntSize(value); + if (size == 1) + { + ensureRemaining(1); + buffer.put((byte) value); + return; + } + + ensureRemaining(size); + buffer.put(VIntCoding.encodeVInt(value, size), 0, size); + } + + @Override public void writeFloat(float v) throws IOException { ensureRemaining(4); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java index f63c1e5..f6a3648 100644 --- a/src/java/org/apache/cassandra/io/util/DataOutputPlus.java +++ b/src/java/org/apache/cassandra/io/util/DataOutputPlus.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import org.apache.cassandra.utils.vint.VIntCoding; + import com.google.common.base.Function; /** @@ -40,4 +42,21 @@ public interface DataOutputPlus extends DataOutput * and forget to flush */ R applyToChannel(Function c) throws IOException; + + default void writeVInt(long i) throws IOException + { + VIntCoding.writeVInt(i, this); + } + + /** + * Think hard before opting for an unsigned encoding. Is this going to bite someone because some day + * they might need to pass in a sentinel value using negative numbers? Is the risk worth it + * to save a few bytes? + * + * Signed, not a fan of unsigned values in protocols and formats + */ + default void writeUnsignedVInt(long i) throws IOException + { + VIntCoding.writeUnsignedVInt(i, this); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java index 94ba9ed..4816379 100644 --- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java +++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java @@ -27,6 +27,8 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; +import org.apache.cassandra.utils.vint.VIntCoding; + import com.google.common.base.Preconditions; /** @@ -50,7 +52,7 @@ public class NIODataInputStream extends InputStream implements DataInput, Closea public NIODataInputStream(ReadableByteChannel rbc, int bufferSize) { Preconditions.checkNotNull(rbc); - Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accomadate a long/double"); + Preconditions.checkArgument(bufferSize >= 9, "Buffer size must be large enough to accomadate a varint"); this.rbc = rbc; buf = ByteBuffer.allocateDirect(bufferSize); buf.position(0); @@ -116,7 +118,7 @@ public class NIODataInputStream extends InputStream implements DataInput, Closea private int readNext() throws IOException { Preconditions.checkState(buf.remaining() != buf.capacity()); - assert(buf.remaining() < 8); + assert(buf.remaining() < 9); /* * If there is data already at the start of the buffer, move the position to the end @@ -151,31 +153,55 @@ public class NIODataInputStream extends InputStream implements DataInput, Closea return read; } - /* - * Read at least minimum bytes and throw EOF if that fails - */ - private void readMinimum(int minimum) throws IOException + /* + * Read at least minimum bytes and throw EOF if that fails + */ + private void readMinimum(int attempt, int require) throws IOException { assert(buf.remaining() < 8); - while (buf.remaining() < minimum) + int remaining; + while ((remaining = buf.remaining()) < attempt) { int read = readNext(); if (read == -1) { - //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here - buf.position(0); - buf.limit(0); - throw new EOFException(); + if (remaining < require) + { + //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here + buf.position(0); + buf.limit(0); + throw new EOFException(); + } } } } /* - * Ensure the buffer contains the minimum number of readable bytes + * Ensure the buffer contains the minimum number of readable bytes, throws EOF if enough bytes aren't available + * Add padding if requested and return the limit of the buffer without any padding that is added. + */ + private int prepareReadPaddedPrimitive(int minimum) throws IOException + { + int limitToSet = buf.limit(); + int position = buf.position(); + if (limitToSet - position < minimum) + { + readMinimum(minimum, 1); + limitToSet = buf.limit(); + position = buf.position(); + if (limitToSet - position < minimum) + buf.limit(position + minimum); + } + return limitToSet; + } + + /* + * Ensure the buffer contains the minimum number of readable bytes, throws EOF if enough bytes aren't available */ private void prepareReadPrimitive(int minimum) throws IOException { - if (buf.remaining() < minimum) readMinimum(minimum); + if (buf.remaining() < minimum) + readMinimum(minimum, minimum); } @Override @@ -248,6 +274,40 @@ public class NIODataInputStream extends InputStream implements DataInput, Closea return buf.getLong(); } + public long readVInt() throws IOException + { + return VIntCoding.decodeZigZag64(readUnsignedVInt()); + } + + public long readUnsignedVInt() throws IOException + { + byte firstByte = readByte(); + + //Bail out early if this is one byte, necessary or it fails later + if (firstByte >= 0) + return firstByte; + + //If padding was added, the limit to set after to get rid of the padding + int limitToSet = prepareReadPaddedPrimitive(8); + + int position = buf.position(); + int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte); + int extraBits = extraBytes * 8; + + long retval = buf.getLong(position); + buf.position(position + extraBytes); + buf.limit(limitToSet); + + + // truncate the bytes we read in excess of those we needed + retval >>>= 64 - extraBits; + // remove the non-value bits from the first byte + firstByte &= VIntCoding.firstByteValueMask(extraBytes); + // shift the first byte up to its correct position + retval |= (long) firstByte << extraBits; + return retval; + } + @Override public float readFloat() throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java index 9137ba2..b8f0884 100644 --- a/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java +++ b/src/java/org/apache/cassandra/io/util/UnbufferedDataOutputStreamPlus.java @@ -23,7 +23,6 @@ import java.io.UTFDataFormatException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; -import org.apache.cassandra.config.Config; import org.apache.cassandra.utils.memory.MemoryUtil; import com.google.common.base.Function; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java index bee8ab0..663e176 100644 --- a/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java +++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataInputStream.java @@ -27,6 +27,9 @@ import org.apache.cassandra.io.util.AbstractDataInput; * https://developers.google.com/protocol-buffers/docs/encoding#varints * * Should be used with EncodedDataOutputStream + * + * @deprecated Where possible use NIODataInputStream which has a more efficient implementation of buffered input + * for most read methods */ public class EncodedDataInputStream extends AbstractDataInput implements DataInput { @@ -71,55 +74,21 @@ public class EncodedDataInputStream extends AbstractDataInput implements DataInp public int readInt() throws IOException { - return (int) vintDecode(); + return (int) VIntCoding.readVInt(input); } public long readLong() throws IOException { - return vintDecode(); + return VIntCoding.readVInt(input); } public int readUnsignedShort() throws IOException { - return (short) vintDecode(); - } - - public short readShort() throws IOException - { - return (short) vintDecode(); + return (short) VIntCoding.readVInt(input); } - private long vintDecode() throws IOException - { - byte firstByte = input.readByte(); - int len = vintDecodeSize(firstByte); - if (len == 1) - return firstByte; - long i = 0; - for (int idx = 0; idx < len - 1; idx++) - { - byte b = input.readByte(); - i = i << 8; - i = i | (b & 0xFF); - } - return (vintIsNegative(firstByte) ? (i ^ -1L) : i); - } - - private int vintDecodeSize(byte value) - { - if (value >= -112) - { - return 1; - } - else if (value < -120) - { - return -119 - value; - } - return -111 - value; - } - - private boolean vintIsNegative(byte value) + public short readShort() throws IOException { - return value < -120 || (value >= -112 && value < 0); + return (short) VIntCoding.readVInt(input); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java index fe43ff2..7f7613f 100644 --- a/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java +++ b/src/java/org/apache/cassandra/utils/vint/EncodedDataOutputStream.java @@ -54,45 +54,16 @@ public class EncodedDataOutputStream extends UnbufferedDataOutputStreamPlus public void writeInt(int v) throws IOException { - vintEncode(v); + writeVInt(v); } public void writeLong(long v) throws IOException { - vintEncode(v); + writeVInt(v); } public void writeShort(int v) throws IOException { - vintEncode(v); - } - - private void vintEncode(long i) throws IOException - { - if (i >= -112 && i <= 127) - { - writeByte((byte) i); - return; - } - int len = -112; - if (i < 0) - { - i ^= -1L; // take one's complement' - len = -120; - } - long tmp = i; - while (tmp != 0) - { - tmp = tmp >> 8; - len--; - } - writeByte((byte) len); - len = (len < -120) ? -(len + 120) : -(len + 112); - for (int idx = len; idx != 0; idx--) - { - int shiftbits = (idx - 1) * 8; - long mask = 0xFFL << shiftbits; - writeByte((byte) ((i & mask) >> shiftbits)); - } + writeVInt(v); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/vint/VIntCoding.java b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java new file mode 100644 index 0000000..0ac4124 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/vint/VIntCoding.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// Protocol Buffers - Google's data interchange format +// Copyright 2008 Google Inc. All rights reserved. +// https://developers.google.com/protocol-buffers/ +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +package org.apache.cassandra.utils.vint; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import net.nicoulaj.compilecommand.annotations.Inline; + +/** + * Borrows idea from + * https://developers.google.com/protocol-buffers/docs/encoding#varints + */ +public class VIntCoding +{ + + public static long readUnsignedVInt(DataInput input) throws IOException { + int firstByte = input.readByte(); + + //Bail out early if this is one byte, necessary or it fails later + if (firstByte >= 0) + return firstByte; + + int size = numberOfExtraBytesToRead(firstByte); + long retval = firstByte & firstByteValueMask(size);; + for (int ii = 0; ii < size; ii++) + { + byte b = input.readByte(); + retval <<= 8; + retval |= b & 0xff; + } + + return retval; + } + + public static long readVInt(DataInput input) throws IOException { + return decodeZigZag64(readUnsignedVInt(input)); + } + + // & this with the first byte to give the value part for a given extraBytesToRead encoded in the byte + public static int firstByteValueMask(int extraBytesToRead) + { + // by including the known 0bit in the mask, we can use this for encodeExtraBytesToRead + return 0xff >> extraBytesToRead; + } + + public static int encodeExtraBytesToRead(int extraBytesToRead) + { + // because we have an extra bit in the value mask, we just need to invert it + return ~firstByteValueMask(extraBytesToRead); + } + + public static int numberOfExtraBytesToRead(int firstByte) + { + // we count number of set upper bits; so if we simply invert all of the bits, we're golden + // this is aided by the fact that we only work with negative numbers, so when upcast to an int all + // of the new upper bits are also set, so by inverting we set all of them to zero + return Integer.numberOfLeadingZeros(~firstByte) - 24; + } + + protected static final ThreadLocal encodingBuffer = new ThreadLocal() + { + @Override + public byte[] initialValue() + { + return new byte[9]; + } + }; + + public static void writeUnsignedVInt(long value, DataOutput output) throws IOException { + int size = VIntCoding.computeUnsignedVIntSize(value); + if (size == 1) + { + output.write((int)value); + return; + } + + output.write(VIntCoding.encodeVInt(value, size), 0, size); + } + + @Inline + public static byte[] encodeVInt(long value, int size) { + byte encodingSpace[] = encodingBuffer.get(); + int extraBytes = size - 1; + + for (int i = extraBytes ; i >= 0; --i) + { + encodingSpace[i] = (byte) value; + value >>= 8; + } + encodingSpace[0] |= VIntCoding.encodeExtraBytesToRead(extraBytes); + return encodingSpace; + } + + public static void writeVInt(long value, DataOutput output) throws IOException { + writeUnsignedVInt(encodeZigZag64(value), output); + } + + /** + * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n An unsigned 64-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + * @return A signed 64-bit integer. + */ + public static long decodeZigZag64(final long n) { + return (n >>> 1) ^ -(n & 1); + } + + /** + * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n A signed 64-bit integer. + * @return An unsigned 64-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + */ + public static long encodeZigZag64(final long n) { + // Note: the right-shift must be arithmetic + return (n << 1) ^ (n >> 63); + } + + /** Compute the number of bytes that would be needed to encode a varint. */ + public static int computeVIntSize(final long param) { + return computeUnsignedVIntSize(encodeZigZag64(param)); + } + + /** Compute the number of bytes that would be needed to encode an unsigned varint. */ + public static int computeUnsignedVIntSize(final long value) { + int magnitude = Long.numberOfLeadingZeros(value | 1); // | with 1 to ensure magntiude <= 63, so (63 - 1) / 7 <= 8 + return 9 - ((magnitude - 1) / 7); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java index d0819fe..acef1ec 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedDataOutputStreamTest.java @@ -8,10 +8,16 @@ import java.lang.reflect.Field; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; +import java.util.Arrays; import java.util.Random; +import org.apache.cassandra.utils.vint.VIntCoding; import org.junit.Test; +import com.google.common.primitives.UnsignedBytes; +import com.google.common.primitives.UnsignedInteger; +import com.google.common.primitives.UnsignedLong; + import static org.junit.Assert.*; public class BufferedDataOutputStreamTest @@ -185,7 +191,7 @@ public class BufferedDataOutputStreamTest int action = 0; while (generated.size() < 1024 * 1024 * 8) { - action = r.nextInt(19); + action = r.nextInt(21); //System.out.println("Action " + action + " iteration " + iteration); iteration++; @@ -362,6 +368,20 @@ public class BufferedDataOutputStreamTest } break; } + case 19: + { + long val = r.nextLong(); + VIntCoding.writeVInt(val, dosp); + ndosp.writeVInt(val); + break; + } + case 20: + { + long val = r.nextLong(); + VIntCoding.writeUnsignedVInt(val, dosp); + ndosp.writeUnsignedVInt(val); + break; + } default: fail("Shouldn't reach here"); } @@ -442,4 +462,77 @@ public class BufferedDataOutputStreamTest } return count; } + + /* + * Add values to the array with a bit set in every position + */ + public static long[] enrich(long vals[]) + { + long retval[] = Arrays.copyOf(vals, vals.length + 64); + for (int ii = 0; ii < 64; ii++) + retval[vals.length + ii] = 1L << ii; + return retval; + } + + @Test + public void testVInt() throws Exception + { + setUp(); + long testValues[] = new long[] { + 0, 1, -1 + ,Long.MIN_VALUE, Long.MIN_VALUE + 1, Long.MAX_VALUE, Long.MAX_VALUE - 1 + ,Integer.MIN_VALUE, Integer.MIN_VALUE + 1, Integer.MAX_VALUE, Integer.MAX_VALUE - 1 + ,Short.MIN_VALUE, Short.MIN_VALUE + 1, Short.MAX_VALUE, Short.MAX_VALUE - 1 + ,Byte.MIN_VALUE, Byte.MIN_VALUE + 1, Byte.MAX_VALUE, Byte.MAX_VALUE - 1 }; + testValues = enrich(testValues); + + int expectedSize = 0; + for (long v : testValues) + { + expectedSize += VIntCoding.computeVIntSize(v); + ndosp.writeVInt(v); + } + + ndosp.flush(); + + @SuppressWarnings("resource") + ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0); + + assertEquals(expectedSize, generated.toByteArray().length); + + for (long v : testValues) + { + assertEquals(v, bbdi.readVInt()); + } + } + + @Test + public void testUnsignedVInt() throws Exception + { + setUp(); + long testValues[] = new long[] { //-1 }; + 0, 1 + , UnsignedLong.MAX_VALUE.longValue(), UnsignedLong.MAX_VALUE.longValue() - 1, UnsignedLong.MAX_VALUE.longValue() + 1 + , UnsignedInteger.MAX_VALUE.longValue(), UnsignedInteger.MAX_VALUE.longValue() - 1, UnsignedInteger.MAX_VALUE.longValue() + 1 + , UnsignedBytes.MAX_VALUE, UnsignedBytes.MAX_VALUE - 1, UnsignedBytes.MAX_VALUE + 1 + , 65536, 65536 - 1, 65536 + 1 }; + testValues = enrich(testValues); + + int expectedSize = 0; + for (long v : testValues) + { + expectedSize += VIntCoding.computeUnsignedVIntSize(v); + ndosp.writeUnsignedVInt(v); + } + + ndosp.flush(); + + @SuppressWarnings("resource") + ByteBufferDataInput bbdi = new ByteBufferDataInput(ByteBuffer.wrap(generated.toByteArray()), "", 0, 0); + + assertEquals(expectedSize, generated.toByteArray().length); + + for (long v : testValues) + assertEquals(v, bbdi.readUnsignedVInt()); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java index a19346b..11ff23a 100644 --- a/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java +++ b/test/unit/org/apache/cassandra/io/util/NIODataInputStreamTest.java @@ -18,6 +18,9 @@ import org.apache.cassandra.io.util.NIODataInputStream; import org.junit.Test; import com.google.common.base.Charsets; +import com.google.common.primitives.UnsignedBytes; +import com.google.common.primitives.UnsignedInteger; +import com.google.common.primitives.UnsignedLong; import static org.junit.Assert.*; @@ -126,7 +129,7 @@ public class NIODataInputStreamTest } - NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 8); + NIODataInputStream fakeStream = new NIODataInputStream(new FakeChannel(), 9); @Test(expected = IOException.class) public void testResetThrows() throws Exception @@ -212,7 +215,7 @@ public class NIODataInputStreamTest fos.write(new byte[10]); fos.seek(0); - is = new NIODataInputStream(fos.getChannel(), 8); + is = new NIODataInputStream(fos.getChannel(), 9); int remaining = 10; assertEquals(10, is.available()); @@ -226,6 +229,31 @@ public class NIODataInputStreamTest assertEquals(0, is.available()); } + private static ReadableByteChannel wrap(final byte bytes[]) + { + final ByteBuffer buf = ByteBuffer.wrap(bytes); + return new ReadableByteChannel() + { + + @Override + public boolean isOpen() {return false;} + + @Override + public void close() throws IOException {} + + @Override + public int read(ByteBuffer dst) throws IOException + { + int read = Math.min(dst.remaining(), buf.remaining()); + buf.limit(buf.position() + read); + dst.put(buf); + buf.limit(buf.capacity()); + return read == 0 ? -1 : read; + } + + }; + } + @SuppressWarnings("resource") @Test public void testReadUTF() throws Exception @@ -244,28 +272,84 @@ public class NIODataInputStreamTest daos.writeUTF(BufferedDataOutputStreamTest.threeByte); daos.writeUTF(BufferedDataOutputStreamTest.fourByte); - NIODataInputStream is = new NIODataInputStream(new ReadableByteChannel() + NIODataInputStream is = new NIODataInputStream(wrap(baos.toByteArray()), 4096); + + assertEquals(simple, is.readUTF()); + assertEquals(BufferedDataOutputStreamTest.twoByte, is.readUTF()); + assertEquals(BufferedDataOutputStreamTest.threeByte, is.readUTF()); + assertEquals(BufferedDataOutputStreamTest.fourByte, is.readUTF()); + } + + @SuppressWarnings("resource") + @Test + public void testReadVInt() throws Exception { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStreamPlus daos = new WrappedDataOutputStreamPlus(baos); + + long values[] = new long[] { + 0, 1, -1, + Long.MIN_VALUE, Long.MIN_VALUE + 1, Long.MAX_VALUE, Long.MAX_VALUE - 1, + Integer.MIN_VALUE, Integer.MIN_VALUE + 1, Integer.MAX_VALUE, Integer.MAX_VALUE - 1, + Short.MIN_VALUE, Short.MIN_VALUE + 1, Short.MAX_VALUE, Short.MAX_VALUE - 1, + Byte.MIN_VALUE, Byte.MIN_VALUE + 1, Byte.MAX_VALUE, Byte.MAX_VALUE - 1 }; + values = BufferedDataOutputStreamTest.enrich(values); + + for (long v : values) + daos.writeVInt(v); + + daos.flush(); + + NIODataInputStream is = new NIODataInputStream(wrap(baos.toByteArray()), 9); + + for (long v : values) + assertEquals(v, is.readVInt()); + + boolean threw = false; + try { + is.readVInt(); + } + catch (EOFException e) + { + threw = true; + } + assertTrue(threw); + } - @Override - public boolean isOpen() {return false;} + @SuppressWarnings("resource") + @Test + public void testReadUnsignedVInt() throws Exception { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStreamPlus daos = new WrappedDataOutputStreamPlus(baos); - @Override - public void close() throws IOException {} + long values[] = new long[] { + 0, 1 + , UnsignedLong.MAX_VALUE.longValue(), UnsignedLong.MAX_VALUE.longValue() - 1, UnsignedLong.MAX_VALUE.longValue() + 1 + , UnsignedInteger.MAX_VALUE.longValue(), UnsignedInteger.MAX_VALUE.longValue() - 1, UnsignedInteger.MAX_VALUE.longValue() + 1 + , UnsignedBytes.MAX_VALUE, UnsignedBytes.MAX_VALUE - 1, UnsignedBytes.MAX_VALUE + 1 + , 65536, 65536 - 1, 65536 + 1 }; + values = BufferedDataOutputStreamTest.enrich(values); - @Override - public int read(ByteBuffer dst) throws IOException - { - dst.put(baos.toByteArray()); - return baos.toByteArray().length; - } + for (long v : values) + daos.writeUnsignedVInt(v); - }, 4096); + daos.flush(); - assertEquals(simple, is.readUTF()); - assertEquals(BufferedDataOutputStreamTest.twoByte, is.readUTF()); - assertEquals(BufferedDataOutputStreamTest.threeByte, is.readUTF()); - assertEquals(BufferedDataOutputStreamTest.fourByte, is.readUTF()); + NIODataInputStream is = new NIODataInputStream(wrap(baos.toByteArray()), 9); + + for (long v : values) + assertEquals(v, is.readUnsignedVInt()); + + boolean threw = false; + try + { + is.readUnsignedVInt(); + } + catch (EOFException e) + { + threw = true; + } + assertTrue(threw); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/1491a40b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java new file mode 100644 index 0000000..f08b181 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/vint/VIntCodingTest.java @@ -0,0 +1,85 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.utils.vint; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.junit.Test; + +import junit.framework.Assert; + +public class VIntCodingTest +{ + + @Test + public void testComputeSize() throws Exception + { + assertEncodedAtExpectedSize(0L, 1); + + for (int size = 1 ; size < 8 ; size++) + { + assertEncodedAtExpectedSize((1L << 7 * size) - 1, size); + assertEncodedAtExpectedSize(1L << 7 * size, size + 1); + } + Assert.assertEquals(9, VIntCoding.computeUnsignedVIntSize(Long.MAX_VALUE)); + } + + private void assertEncodedAtExpectedSize(long value, int expectedSize) throws Exception + { + Assert.assertEquals(expectedSize, VIntCoding.computeUnsignedVIntSize(value)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + VIntCoding.writeUnsignedVInt(value, dos); + dos.flush(); + Assert.assertEquals( expectedSize, baos.toByteArray().length); + + DataOutputBuffer dob = new DataOutputBuffer(); + dob.writeUnsignedVInt(value); + Assert.assertEquals( expectedSize, dob.buffer().remaining()); + dob.close(); + } + + @Test + public void testReadExtraBytesCount() + { + for (int i = 1 ; i < 8 ; i++) + Assert.assertEquals(i, VIntCoding.numberOfExtraBytesToRead((byte) ((0xFF << (8 - i)) & 0xFF))); + } + + /* + * Quick sanity check that 1 byte encodes up to 127 as expected + */ + @Test + public void testOneByteCapacity() throws Exception { + int biggestOneByte = 127; + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + VIntCoding.writeUnsignedVInt(biggestOneByte, dos); + dos.flush(); + Assert.assertEquals( 1, baos.toByteArray().length); + + DataOutputBuffer dob = new DataOutputBuffer(); + dob.writeUnsignedVInt(biggestOneByte); + Assert.assertEquals( 1, dob.buffer().remaining()); + dob.close(); + } +}