From commits-return-213820-archive-asf-public=cust-asf.ponee.io@cassandra.apache.org Sat Sep 1 23:54:10 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id E7FC2180630 for ; Sat, 1 Sep 2018 23:54:08 +0200 (CEST) Received: (qmail 42861 invoked by uid 500); 1 Sep 2018 21:54:08 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 42850 invoked by uid 99); 1 Sep 2018 21:54:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 01 Sep 2018 21:54:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A094FDF9AB; Sat, 1 Sep 2018 21:54:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samt@apache.org To: commits@cassandra.apache.org Date: Sat, 01 Sep 2018 21:54:08 -0000 Message-Id: <70f2541665cb4cbb94db1596eea7bee1@git.apache.org> In-Reply-To: <2420199dd26248d283d9b753acd6ed5d@git.apache.org> References: <2420199dd26248d283d9b753acd6ed5d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] cassandra git commit: Add checksumming to the native protocol Add checksumming to the native protocol Patch my Michael Kjellman and Sam Tunnicliffe; reviewed by Dinesh Joshi and Jordan West for CASSANDRA-13304 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65fb17a8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65fb17a8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65fb17a8 Branch: refs/heads/trunk Commit: 65fb17a88bd096b1e952ccca31ad709759644a1b Parents: 960174d Author: Sam Tunnicliffe Authored: Fri Mar 10 15:18:33 2017 +0000 Committer: Sam Tunnicliffe Committed: Sat Sep 1 22:41:37 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/debug-cql | 2 +- build.xml | 2 + conf/cassandra.yaml | 4 + .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 5 + .../org/apache/cassandra/transport/CBUtil.java | 6 + .../org/apache/cassandra/transport/Client.java | 57 ++- .../apache/cassandra/transport/Connection.java | 11 +- .../org/apache/cassandra/transport/Frame.java | 42 ++- .../cassandra/transport/FrameCompressor.java | 211 ----------- .../cassandra/transport/ProtocolVersion.java | 5 + .../org/apache/cassandra/transport/Server.java | 8 +- .../cassandra/transport/SimpleClient.java | 29 +- .../transport/frame/FrameBodyTransformer.java | 57 +++ .../frame/checksum/ChecksummingTransformer.java | 361 +++++++++++++++++++ .../frame/compress/CompressingTransformer.java | 164 +++++++++ .../transport/frame/compress/Compressor.java | 62 ++++ .../transport/frame/compress/LZ4Compressor.java | 68 ++++ .../frame/compress/SnappyCompressor.java | 79 ++++ .../transport/messages/OptionsMessage.java | 20 +- .../transport/messages/StartupMessage.java | 70 +++- .../org/apache/cassandra/cql3/CQLTester.java | 4 +- .../cassandra/cql3/PreparedStatementsTest.java | 2 +- .../cassandra/service/ClientWarningsTest.java | 8 +- .../service/ProtocolBetaVersionTest.java | 4 +- .../cassandra/transport/MessagePayloadTest.java | 6 +- .../checksum/ChecksummingTransformerTest.java | 224 ++++++++++++ .../stress/settings/StressSettings.java | 2 +- 29 files changed, 1234 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index aca31fe..301f97f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add checksumming to the native protocol (CASSANDRA-13304) * Make AuthCache more easily extendable (CASSANDRA-14662) * Extend RolesCache to include detailed role info (CASSANDRA-14497) * Add fqltool compare (CASSANDRA-14619) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/bin/debug-cql ---------------------------------------------------------------------- diff --git a/bin/debug-cql b/bin/debug-cql index c184df9..9550ddf 100755 --- a/bin/debug-cql +++ b/bin/debug-cql @@ -46,7 +46,7 @@ esac class="org.apache.cassandra.transport.Client" cassandra_parms="-Dlogback.configurationFile=logback-tools.xml" -"$JAVA" $JVM_OPTS $cassandra_parms -cp "$CLASSPATH" "$class" $1 $2 +"$JAVA" $JVM_OPTS $cassandra_parms -cp "$CLASSPATH" "$class" $@ exit $? http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 1c957d3..86462f7 100644 --- a/build.xml +++ b/build.xml @@ -435,6 +435,7 @@ + @@ -560,6 +561,7 @@ artifactId="cassandra-parent" version="${version}"/> + http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 503a0fa..995a520 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -656,6 +656,10 @@ native_transport_port: 9042 # you may want to adjust max_value_size_in_mb accordingly. This should be positive and less than 2048. # native_transport_max_frame_size_in_mb: 256 +# If checksumming is enabled as a protocol option, denotes the size of the chunks into which frame +# are bodies will be broken and checksummed. +# native_transport_frame_block_size_in_kb: 32 + # The maximum number of concurrent client connections. # The default is -1, which means unlimited. # native_transport_max_concurrent_connections: -1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 783dcc1..b04f9ec 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -154,6 +154,7 @@ public class Config public volatile long native_transport_max_concurrent_connections_per_ip = -1L; public boolean native_transport_flush_in_batches_legacy = false; public volatile boolean native_transport_allow_older_protocols = true; + public int native_transport_frame_block_size_in_kb = 32; /** * Max size of values in SSTables, in MegaBytes. http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 75b3fc3..ddea8f4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1878,6 +1878,11 @@ public class DatabaseDescriptor conf.native_transport_allow_older_protocols = isEnabled; } + public static int getNativeTransportFrameBlockSize() + { + return conf.native_transport_frame_block_size_in_kb * 1024; + } + public static double getCommitLogSyncGroupWindow() { return conf.commitlog_sync_group_window_in_ms; http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 80b80b4..f7490c3 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -580,4 +580,10 @@ public abstract class CBUtil return bytes; } + public static int readUnsignedShort(ByteBuf buf) + { + int ch1 = buf.readByte() & 0xFF; + int ch2 = buf.readByte() & 0xFF; + return (ch1 << 8) + (ch2); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index 3632175..f7ed272 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -33,7 +33,13 @@ import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer; +import org.apache.cassandra.transport.frame.compress.CompressingTransformer; +import org.apache.cassandra.transport.frame.compress.Compressor; +import org.apache.cassandra.transport.frame.compress.LZ4Compressor; +import org.apache.cassandra.transport.frame.compress.SnappyCompressor; import org.apache.cassandra.transport.messages.*; +import org.apache.cassandra.utils.ChecksumType; import org.apache.cassandra.utils.Hex; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.MD5Digest; @@ -44,7 +50,7 @@ public class Client extends SimpleClient public Client(String host, int port, ProtocolVersion version, EncryptionOptions encryptionOptions) { - super(host, port, version, encryptionOptions); + super(host, port, version, version.isBeta(), encryptionOptions); setEventHandler(eventHandler); } @@ -105,15 +111,56 @@ public class Client extends SimpleClient { Map options = new HashMap(); options.put(StartupMessage.CQL_VERSION, "3.0.0"); + Compressor compressor = null; + ChecksumType checksumType = null; while (iter.hasNext()) { - String next = iter.next(); - if (next.toLowerCase().equals("snappy")) + String next = iter.next().toLowerCase(); + switch (next) { - options.put(StartupMessage.COMPRESSION, "snappy"); - connection.setCompressor(FrameCompressor.SnappyCompressor.instance); + case "snappy": { + if (options.containsKey(StartupMessage.COMPRESSION)) + throw new RuntimeException("Multiple compression types supplied"); + options.put(StartupMessage.COMPRESSION, "snappy"); + compressor = SnappyCompressor.INSTANCE; + break; + } + case "lz4": { + if (options.containsKey(StartupMessage.COMPRESSION)) + throw new RuntimeException("Multiple compression types supplied"); + options.put(StartupMessage.COMPRESSION, "lz4"); + compressor = LZ4Compressor.INSTANCE; + break; + } + case "crc32": { + if (options.containsKey(StartupMessage.CHECKSUM)) + throw new RuntimeException("Multiple checksum types supplied"); + options.put(StartupMessage.CHECKSUM, ChecksumType.CRC32.name()); + checksumType = ChecksumType.CRC32; + break; + } + case "adler32": { + if (options.containsKey(StartupMessage.CHECKSUM)) + throw new RuntimeException("Multiple checksum types supplied"); + options.put(StartupMessage.CHECKSUM, ChecksumType.Adler32.name()); + checksumType = ChecksumType.Adler32; + break; + } } } + + if (checksumType == null) + { + if (compressor != null) + { + connection.setTransformer(CompressingTransformer.getTransformer(compressor)); + } + } + else + { + connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, compressor)); + } + return new StartupMessage(options); } else if (msgType.equals("QUERY")) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Connection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java index a04a055..908e7e9 100644 --- a/src/java/org/apache/cassandra/transport/Connection.java +++ b/src/java/org/apache/cassandra/transport/Connection.java @@ -19,6 +19,7 @@ package org.apache.cassandra.transport; import io.netty.channel.Channel; import io.netty.util.AttributeKey; +import org.apache.cassandra.transport.frame.FrameBodyTransformer; public class Connection { @@ -28,7 +29,7 @@ public class Connection private final ProtocolVersion version; private final Tracker tracker; - private volatile FrameCompressor frameCompressor; + private volatile FrameBodyTransformer transformer; public Connection(Channel channel, ProtocolVersion version, Tracker tracker) { @@ -39,14 +40,14 @@ public class Connection tracker.addConnection(channel, this); } - public void setCompressor(FrameCompressor compressor) + public void setTransformer(FrameBodyTransformer transformer) { - this.frameCompressor = compressor; + this.transformer = transformer; } - public FrameCompressor getCompressor() + public FrameBodyTransformer getTransformer() { - return frameCompressor; + return transformer; } public Tracker getTracker() http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 41e64f9..d6a1cbc 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -32,6 +32,7 @@ import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.Attribute; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.transport.frame.FrameBodyTransformer; import org.apache.cassandra.transport.messages.ErrorMessage; public class Frame @@ -102,7 +103,8 @@ public class Frame TRACING, CUSTOM_PAYLOAD, WARNING, - USE_BETA; + USE_BETA, + CHECKSUMMED; private static final Flag[] ALL_VALUES = values(); @@ -301,54 +303,70 @@ public class Frame } @ChannelHandler.Sharable - public static class Decompressor extends MessageToMessageDecoder + public static class InboundBodyTransformer extends MessageToMessageDecoder { public void decode(ChannelHandlerContext ctx, Frame frame, List results) throws IOException { Connection connection = ctx.channel().attr(Connection.attributeKey).get(); - if (!frame.header.flags.contains(Header.Flag.COMPRESSED) || connection == null) + if ((!frame.header.flags.contains(Header.Flag.COMPRESSED) && !frame.header.flags.contains(Header.Flag.CHECKSUMMED)) || connection == null) { results.add(frame); return; } - FrameCompressor compressor = connection.getCompressor(); - if (compressor == null) + FrameBodyTransformer transformer = connection.getTransformer(); + if (transformer == null) { results.add(frame); return; } - results.add(compressor.decompress(frame)); + try + { + results.add(frame.with(transformer.transformInbound(frame.body, frame.header.flags))); + } + finally + { + // release the old frame + frame.release(); + } } } @ChannelHandler.Sharable - public static class Compressor extends MessageToMessageEncoder + public static class OutboundBodyTransformer extends MessageToMessageEncoder { public void encode(ChannelHandlerContext ctx, Frame frame, List results) throws IOException { Connection connection = ctx.channel().attr(Connection.attributeKey).get(); - // Never compress STARTUP messages + // Never transform STARTUP messages if (frame.header.type == Message.Type.STARTUP || connection == null) { results.add(frame); return; } - FrameCompressor compressor = connection.getCompressor(); - if (compressor == null) + FrameBodyTransformer transformer = connection.getTransformer(); + if (transformer == null) { results.add(frame); return; } - frame.header.flags.add(Header.Flag.COMPRESSED); - results.add(compressor.compress(frame)); + try + { + results.add(frame.with(transformer.transformOutbound(frame.body))); + frame.header.flags.addAll(transformer.getOutboundHeaderFlags()); + } + finally + { + // release the old frame + frame.release(); + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/FrameCompressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/FrameCompressor.java b/src/java/org/apache/cassandra/transport/FrameCompressor.java deleted file mode 100644 index 01c0c31..0000000 --- a/src/java/org/apache/cassandra/transport/FrameCompressor.java +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.transport; - -import java.io.IOException; - -import io.netty.buffer.ByteBuf; -import org.xerial.snappy.Snappy; -import org.xerial.snappy.SnappyError; - -import net.jpountz.lz4.LZ4Factory; - -import org.apache.cassandra.utils.JVMStabilityInspector; - -public interface FrameCompressor -{ - public Frame compress(Frame frame) throws IOException; - public Frame decompress(Frame frame) throws IOException; - - /* - * TODO: We can probably do more efficient, like by avoiding copy. - * Also, we don't reuse ICompressor because the API doesn't expose enough. - */ - public static class SnappyCompressor implements FrameCompressor - { - public static final SnappyCompressor instance; - static - { - SnappyCompressor i; - try - { - i = new SnappyCompressor(); - } - catch (Exception e) - { - JVMStabilityInspector.inspectThrowable(e); - i = null; - } - catch (NoClassDefFoundError | SnappyError | UnsatisfiedLinkError e) - { - i = null; - } - instance = i; - } - - private SnappyCompressor() - { - // this would throw java.lang.NoClassDefFoundError if Snappy class - // wasn't found at runtime which should be processed by the calling method - Snappy.getNativeLibraryVersion(); - } - - public Frame compress(Frame frame) throws IOException - { - byte[] input = CBUtil.readRawBytes(frame.body); - ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.maxCompressedLength(input.length)); - - try - { - int written = Snappy.compress(input, 0, input.length, output.array(), output.arrayOffset()); - output.writerIndex(written); - } - catch (final Throwable e) - { - output.release(); - throw e; - } - finally - { - //release the old frame - frame.release(); - } - - return frame.with(output); - } - - public Frame decompress(Frame frame) throws IOException - { - byte[] input = CBUtil.readRawBytes(frame.body); - - if (!Snappy.isValidCompressedBuffer(input, 0, input.length)) - throw new ProtocolException("Provided frame does not appear to be Snappy compressed"); - - ByteBuf output = CBUtil.allocator.heapBuffer(Snappy.uncompressedLength(input)); - - try - { - int size = Snappy.uncompress(input, 0, input.length, output.array(), output.arrayOffset()); - output.writerIndex(size); - } - catch (final Throwable e) - { - output.release(); - throw e; - } - finally - { - //release the old frame - frame.release(); - } - - return frame.with(output); - } - } - - /* - * This is very close to the ICompressor implementation, and in particular - * it also layout the uncompressed size at the beginning of the message to - * make uncompression faster, but contrarly to the ICompressor, that length - * is written in big-endian. The native protocol is entirely big-endian, so - * it feels like putting little-endian here would be a annoying trap for - * client writer. - */ - public static class LZ4Compressor implements FrameCompressor - { - public static final LZ4Compressor instance = new LZ4Compressor(); - - private static final int INTEGER_BYTES = 4; - private final net.jpountz.lz4.LZ4Compressor compressor; - private final net.jpountz.lz4.LZ4Decompressor decompressor; - - private LZ4Compressor() - { - final LZ4Factory lz4Factory = LZ4Factory.fastestInstance(); - compressor = lz4Factory.fastCompressor(); - decompressor = lz4Factory.decompressor(); - } - - public Frame compress(Frame frame) throws IOException - { - byte[] input = CBUtil.readRawBytes(frame.body); - - int maxCompressedLength = compressor.maxCompressedLength(input.length); - ByteBuf outputBuf = CBUtil.allocator.heapBuffer(INTEGER_BYTES + maxCompressedLength); - - byte[] output = outputBuf.array(); - int outputOffset = outputBuf.arrayOffset(); - - output[outputOffset + 0] = (byte) (input.length >>> 24); - output[outputOffset + 1] = (byte) (input.length >>> 16); - output[outputOffset + 2] = (byte) (input.length >>> 8); - output[outputOffset + 3] = (byte) (input.length); - - try - { - int written = compressor.compress(input, 0, input.length, output, outputOffset + INTEGER_BYTES, maxCompressedLength); - outputBuf.writerIndex(INTEGER_BYTES + written); - - return frame.with(outputBuf); - } - catch (final Throwable e) - { - outputBuf.release(); - throw e; - } - finally - { - //release the old frame - frame.release(); - } - } - - public Frame decompress(Frame frame) throws IOException - { - byte[] input = CBUtil.readRawBytes(frame.body); - - int uncompressedLength = ((input[0] & 0xFF) << 24) - | ((input[1] & 0xFF) << 16) - | ((input[2] & 0xFF) << 8) - | ((input[3] & 0xFF)); - - ByteBuf output = CBUtil.allocator.heapBuffer(uncompressedLength); - - try - { - int read = decompressor.decompress(input, INTEGER_BYTES, output.array(), output.arrayOffset(), uncompressedLength); - if (read != input.length - INTEGER_BYTES) - throw new IOException("Compressed lengths mismatch"); - - output.writerIndex(uncompressedLength); - - return frame.with(output); - } - catch (final Throwable e) - { - output.release(); - throw e; - } - finally - { - //release the old frame - frame.release(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/ProtocolVersion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ProtocolVersion.java b/src/java/org/apache/cassandra/transport/ProtocolVersion.java index ceeeca7..e1f634c 100644 --- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java +++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java @@ -138,6 +138,11 @@ public enum ProtocolVersion implements Comparable return num; } + public boolean supportsChecksums() + { + return num >= V5.asInt(); + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 0c4b7b8..67532ac 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -342,8 +342,8 @@ public class Server implements CassandraDaemon.Server // Stateless handlers private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder(); private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder(); - private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor(); - private static final Frame.Compressor frameCompressor = new Frame.Compressor(); + private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer(); + private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer(); private static final Frame.Encoder frameEncoder = new Frame.Encoder(); private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler(); private static final Message.Dispatcher dispatcher = new Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher()); @@ -373,8 +373,8 @@ public class Server implements CassandraDaemon.Server pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory)); pipeline.addLast("frameEncoder", frameEncoder); - pipeline.addLast("frameDecompressor", frameDecompressor); - pipeline.addLast("frameCompressor", frameCompressor); + pipeline.addLast("inboundFrameTransformer", inboundFrameTransformer); + pipeline.addLast("outboundFrameTransformer", outboundFrameTransformer); pipeline.addLast("messageDecoder", messageDecoder); pipeline.addLast("messageEncoder", messageEncoder); http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index db7de8d..1334448 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; @@ -45,6 +46,10 @@ import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer; +import org.apache.cassandra.transport.frame.compress.CompressingTransformer; +import org.apache.cassandra.transport.frame.compress.Compressor; +import org.apache.cassandra.transport.frame.compress.LZ4Compressor; import org.apache.cassandra.transport.messages.ErrorMessage; import org.apache.cassandra.transport.messages.EventMessage; import org.apache.cassandra.transport.messages.ExecuteMessage; @@ -56,6 +61,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import org.apache.cassandra.utils.ChecksumType; public class SimpleClient implements Closeable { @@ -117,19 +123,24 @@ public class SimpleClient implements Closeable this(host, port, new EncryptionOptions()); } - public SimpleClient connect(boolean useCompression) throws IOException + public SimpleClient connect(boolean useCompression, boolean useChecksums) throws IOException { establishConnection(); Map options = new HashMap<>(); options.put(StartupMessage.CQL_VERSION, "3.0.0"); - if (useCompression) + + if (useChecksums) { - options.put(StartupMessage.COMPRESSION, "snappy"); - connection.setCompressor(FrameCompressor.SnappyCompressor.instance); + Compressor compressor = useCompression ? LZ4Compressor.INSTANCE : null; + connection.setTransformer(ChecksummingTransformer.getTransformer(ChecksumType.CRC32, compressor)); + } + else if (useCompression) + { + connection.setTransformer(CompressingTransformer.getTransformer(LZ4Compressor.INSTANCE)); } - execute(new StartupMessage(options)); + execute(new StartupMessage(options)); return this; } @@ -241,8 +252,8 @@ public class SimpleClient implements Closeable // Stateless handlers private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder(); private static final Message.ProtocolEncoder messageEncoder = new Message.ProtocolEncoder(); - private static final Frame.Decompressor frameDecompressor = new Frame.Decompressor(); - private static final Frame.Compressor frameCompressor = new Frame.Compressor(); + private static final Frame.InboundBodyTransformer inboundFrameTransformer = new Frame.InboundBodyTransformer(); + private static final Frame.OutboundBodyTransformer outboundFrameTransformer = new Frame.OutboundBodyTransformer(); private static final Frame.Encoder frameEncoder = new Frame.Encoder(); private static class ConnectionTracker implements Connection.Tracker @@ -266,8 +277,8 @@ public class SimpleClient implements Closeable pipeline.addLast("frameDecoder", new Frame.Decoder(connectionFactory)); pipeline.addLast("frameEncoder", frameEncoder); - pipeline.addLast("frameDecompressor", frameDecompressor); - pipeline.addLast("frameCompressor", frameCompressor); + pipeline.addLast("inboundFrameTransformer", inboundFrameTransformer); + pipeline.addLast("outboundFrameTransformer", outboundFrameTransformer); pipeline.addLast("messageDecoder", messageDecoder); pipeline.addLast("messageEncoder", messageEncoder); http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java b/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java new file mode 100644 index 0000000..0a6b22f --- /dev/null +++ b/src/java/org/apache/cassandra/transport/frame/FrameBodyTransformer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.frame; + +import java.io.IOException; +import java.util.EnumSet; + +import io.netty.buffer.ByteBuf; +import org.apache.cassandra.transport.Frame; + +public interface FrameBodyTransformer +{ + /** + * Accepts the input buffer representing the frame body of an incoming message and applies a transformation. + * Example transformations include decompression and recombining checksummed chunks into a single, serialized + * message body. + * @param inputBuf the frame body from an inbound message + * @return the new frame body bytes + * @throws IOException if the transformation failed for any reason + */ + ByteBuf transformInbound(ByteBuf inputBuf, EnumSet flags) throws IOException; + + /** + * Accepts an input buffer representing the frame body of an outbound message and applies a transformation. + * Example transformations include compression and splitting into checksummed chunks. + + * @param inputBuf the frame body from an outgoing message + * @return the new frame body bytes + * @throws IOException if the transformation failed for any reason + */ + ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException; + + /** + * Returns an EnumSet of the flags that should be added to the header for any message whose frame body has been + * modified by the transformer. E.g. it may add perform chunking & checksumming to the frame body, + * compress it, or both. + * @return EnumSet containing the header flags to set on messages transformed + */ + EnumSet getOutboundHeaderFlags(); + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java b/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java new file mode 100644 index 0000000..3b15cee --- /dev/null +++ b/src/java/org/apache/cassandra/transport/frame/checksum/ChecksummingTransformer.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.frame.checksum; + +import java.io.IOException; +import java.util.EnumSet; + +import com.google.common.collect.ImmutableTable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.transport.Frame; +import org.apache.cassandra.transport.ProtocolException; +import org.apache.cassandra.transport.frame.FrameBodyTransformer; +import org.apache.cassandra.transport.frame.compress.Compressor; +import org.apache.cassandra.transport.frame.compress.LZ4Compressor; +import org.apache.cassandra.transport.frame.compress.SnappyCompressor; +import org.apache.cassandra.utils.ChecksumType; + +import static org.apache.cassandra.transport.CBUtil.readUnsignedShort; + +/** + * Provides a format that implements chunking and checksumming logic + * that maybe used in conjunction with a frame Compressor if required + *

+ * 1.1. Checksummed/Compression Serialized Format + *

+ *

+ * {@code
+ *                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
+ *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |  Number of Compressed Chunks  |     Compressed Length (e1)    /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * /  Compressed Length cont. (e1) |    Uncompressed Length (e1)   /
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Uncompressed Length cont. (e1)|    Checksum of Lengths (e1)   |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | Checksum of Lengths cont. (e1)|    Compressed Bytes (e1)    +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Checksum (e1)                        ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                    Compressed Length (e2)                     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Uncompressed Length (e2)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Checksum of Lengths (e2)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                     Compressed Bytes (e2)                   +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Checksum (e2)                        ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                    Compressed Length (en)                     |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Uncompressed Length (en)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                   Checksum of Lengths (en)                    |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                      Compressed Bytes (en)                  +//
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * |                         Checksum (en)                        ||
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * }
+ * 
+ *

+ *

+ * 1.2. Checksum Compression Description + *

+ * The entire payload is broken into n chunks each with a pair of checksums: + *

    + *
  • [int]: compressed length of serialized bytes for this chunk (e.g. the length post compression) + *
  • [int]: expected length of the decompressed bytes (e.g. the length after decompression) + *
  • [int]: digest of decompressed and compressed length components above + *
  • [k bytes]: compressed payload for this chunk + *
  • [int]: digest of the decompressed result of the payload above for this chunk + *
+ *

+ */ +public class ChecksummingTransformer implements FrameBodyTransformer +{ + private static final Logger logger = LoggerFactory.getLogger(ChecksummingTransformer.class); + + private static final EnumSet CHECKSUMS_ONLY = EnumSet.of(Frame.Header.Flag.CHECKSUMMED); + private static final EnumSet CHECKSUMS_AND_COMPRESSION = EnumSet.of(Frame.Header.Flag.CHECKSUMMED, Frame.Header.Flag.COMPRESSED); + + private static final int CHUNK_HEADER_OVERHEAD = Integer.BYTES + Integer.BYTES + Integer.BYTES + Integer.BYTES; + + private static final ChecksummingTransformer CRC32_NO_COMPRESSION = new ChecksummingTransformer(ChecksumType.CRC32, null); + private static final ChecksummingTransformer ADLER32_NO_COMPRESSION = new ChecksummingTransformer(ChecksumType.Adler32, null); + private static final ImmutableTable transformers; + static + { + ImmutableTable.Builder builder = ImmutableTable.builder(); + builder.put(ChecksumType.CRC32, LZ4Compressor.INSTANCE, new ChecksummingTransformer(ChecksumType.CRC32, LZ4Compressor.INSTANCE)); + builder.put(ChecksumType.CRC32, SnappyCompressor.INSTANCE, new ChecksummingTransformer(ChecksumType.CRC32, SnappyCompressor.INSTANCE)); + builder.put(ChecksumType.Adler32, LZ4Compressor.INSTANCE, new ChecksummingTransformer(ChecksumType.Adler32, LZ4Compressor.INSTANCE)); + builder.put(ChecksumType.Adler32, SnappyCompressor.INSTANCE, new ChecksummingTransformer(ChecksumType.Adler32, SnappyCompressor.INSTANCE)); + transformers = builder.build(); + } + + private final int blockSize; + private final Compressor compressor; + private final ChecksumType checksum; + + public static ChecksummingTransformer getTransformer(ChecksumType checksumType, Compressor compressor) + { + ChecksummingTransformer transformer = compressor == null + ? checksumType == ChecksumType.CRC32 ? CRC32_NO_COMPRESSION : ADLER32_NO_COMPRESSION + : transformers.get(checksumType, compressor); + + if (transformer == null) + { + logger.warn("Invalid compression/checksum options supplied. %s / %s", checksumType, compressor.getClass().getName()); + throw new RuntimeException("Invalid compression / checksum options supplied"); + } + + return transformer; + } + + ChecksummingTransformer(ChecksumType checksumType, Compressor compressor) + { + this(checksumType, DatabaseDescriptor.getNativeTransportFrameBlockSize(), compressor); + } + + ChecksummingTransformer(ChecksumType checksumType, int blockSize, Compressor compressor) + { + this.checksum = checksumType; + this.blockSize = blockSize; + this.compressor = compressor; + } + + public EnumSet getOutboundHeaderFlags() + { + return null == compressor ? CHECKSUMS_ONLY : CHECKSUMS_AND_COMPRESSION; + } + + public ByteBuf transformOutbound(ByteBuf inputBuf) + { + // be pessimistic about life and assume the compressed output will be the same size as the input bytes + int maxTotalCompressedLength = maxCompressedLength(inputBuf.readableBytes()); + int expectedChunks = (int) Math.ceil((double) maxTotalCompressedLength / blockSize); + int expectedMaxSerializedLength = Short.BYTES + (expectedChunks * CHUNK_HEADER_OVERHEAD) + maxTotalCompressedLength; + byte[] retBuf = new byte[expectedMaxSerializedLength]; + ByteBuf ret = Unpooled.wrappedBuffer(retBuf); + ret.writerIndex(0); + ret.readerIndex(0); + + // write out bogus short to start with as we'll encode one at the end + // when we finalize the number of compressed chunks to expect and this + // sets the writer index correctly for starting the first chunk + ret.writeShort((short) 0); + + byte[] inBuf = new byte[blockSize]; + byte[] outBuf = new byte[maxCompressedLength(blockSize)]; + byte[] chunkLengths = new byte[8]; + + int numCompressedChunks = 0; + int readableBytes; + int lengthsChecksum; + while ((readableBytes = inputBuf.readableBytes()) > 0) + { + int lengthToRead = Math.min(blockSize, readableBytes); + inputBuf.readBytes(inBuf, 0, lengthToRead); + int uncompressedChunkChecksum = (int) checksum.of(inBuf, 0, lengthToRead); + int compressedSize = maybeCompress(inBuf, lengthToRead, outBuf); + + if (compressedSize < lengthToRead) + { + // there was some benefit to compression so write out the compressed + // and uncompressed sizes of the chunk + ret.writeInt(compressedSize); + ret.writeInt(lengthToRead); + putInt(compressedSize, chunkLengths, 0); + } + else + { + // if no compression was possible, there's no need to write two lengths, so + // just write the size of the original content (or block size), with its + // sign flipped to signal no compression. + ret.writeInt(-lengthToRead); + putInt(-lengthToRead, chunkLengths, 0); + } + + putInt(lengthToRead, chunkLengths, 4); + + // calculate the checksum of the compressed and decompressed lengths + // protect us against a bogus length causing potential havoc on deserialization + lengthsChecksum = (int) checksum.of(chunkLengths, 0, chunkLengths.length); + ret.writeInt(lengthsChecksum); + + // figure out how many actual bytes we're going to write and make sure we have capacity + int toWrite = Math.min(compressedSize, lengthToRead); + if (ret.writableBytes() < (CHUNK_HEADER_OVERHEAD + toWrite)) + { + // this really shouldn't ever happen -- it means we either mis-calculated the number of chunks we + // expected to create, we gave some input to the compressor that caused the output to be much + // larger than the input.. or some other edge condition. Regardless -- resize if necessary. + byte[] resizedRetBuf = new byte[(retBuf.length + (CHUNK_HEADER_OVERHEAD + toWrite)) * 3 / 2]; + System.arraycopy(retBuf, 0, resizedRetBuf, 0, retBuf.length); + retBuf = resizedRetBuf; + ByteBuf resizedRetByteBuf = Unpooled.wrappedBuffer(retBuf); + resizedRetByteBuf.writerIndex(ret.writerIndex()); + ret = resizedRetByteBuf; + } + + // write the bytes, either compressed or uncompressed + if (compressedSize < lengthToRead) + ret.writeBytes(outBuf, 0, toWrite); // compressed + else + ret.writeBytes(inBuf, 0, toWrite); // uncompressed + + // checksum of the uncompressed chunk + ret.writeInt(uncompressedChunkChecksum); + + numCompressedChunks++; + } + + // now update the number of chunks + ret.setShort(0, (short) numCompressedChunks); + return ret; + } + + public ByteBuf transformInbound(ByteBuf inputBuf, EnumSet flags) + { + int numChunks = readUnsignedShort(inputBuf); + + int currentPosition = 0; + int decompressedLength; + int lengthsChecksum; + + byte[] buf = null; + byte[] retBuf = new byte[inputBuf.readableBytes()]; + byte[] chunkLengths = new byte[8]; + for (int i = 0; i < numChunks; i++) + { + int compressedLength = inputBuf.readInt(); + // if the input was actually compressed, then the writer should have written a decompressed + // length. If not, then we can infer that the compressed length has had its sign bit flipped + // and can derive the decompressed length from that + decompressedLength = compressedLength >= 0 ? inputBuf.readInt() : Math.abs(compressedLength); + + putInt(compressedLength, chunkLengths, 0); + putInt(decompressedLength, chunkLengths, 4); + lengthsChecksum = inputBuf.readInt(); + // calculate checksum on lengths (decompressed and compressed) and make sure it matches + int calculatedLengthsChecksum = (int) checksum.of(chunkLengths, 0, chunkLengths.length); + if (lengthsChecksum != calculatedLengthsChecksum) + { + throw new ProtocolException(String.format("Checksum invalid on chunk bytes lengths. Deserialized compressed " + + "length: %d decompressed length: %d. %d != %d", compressedLength, + decompressedLength, lengthsChecksum, calculatedLengthsChecksum)); + } + + // do we have enough space in the decompression buffer? + if (currentPosition + decompressedLength > retBuf.length) + { + byte[] resizedBuf = new byte[retBuf.length + decompressedLength * 3 / 2]; + System.arraycopy(retBuf, 0, resizedBuf, 0, retBuf.length); + retBuf = resizedBuf; + } + + // now we've validated the lengths checksum, we can abs the compressed length + // to figure out the actual number of bytes we're going to read + int toRead = Math.abs(compressedLength); + if (buf == null || buf.length < toRead) + buf = new byte[toRead]; + + // get the (possibly) compressed bytes for this chunk + inputBuf.readBytes(buf, 0, toRead); + + // decompress using the original compressed length, so it's a no-op if that's < 0 + byte[] decompressedChunk = maybeDecompress(buf, compressedLength, decompressedLength, flags); + + // add the decompressed bytes into the ret buf + System.arraycopy(decompressedChunk, 0, retBuf, currentPosition, decompressedLength); + currentPosition += decompressedLength; + + // get the checksum of the original source bytes and compare against what we read + int expectedDecompressedChecksum = inputBuf.readInt(); + int calculatedDecompressedChecksum = (int) checksum.of(decompressedChunk, 0, decompressedLength); + if (expectedDecompressedChecksum != calculatedDecompressedChecksum) + { + throw new ProtocolException("Decompressed checksum for chunk does not match expected checksum"); + } + } + + ByteBuf ret = Unpooled.wrappedBuffer(retBuf, 0, currentPosition); + ret.writerIndex(currentPosition); + return ret; + } + + private int maxCompressedLength(int uncompressedLength) + { + return null == compressor ? uncompressedLength : compressor.maxCompressedLength(uncompressedLength); + + } + + private int maybeCompress(byte[] input, int length, byte[] output) + { + if (null == compressor) + { + System.arraycopy(input, 0, output, 0, length); + return length; + } + + try + { + return compressor.compress(input, 0, length, output, 0); + } + catch (IOException e) + { + logger.info("IO error during compression of frame body chunk", e); + throw new ProtocolException("Error compressing frame body chunk"); + } + } + + private byte[] maybeDecompress(byte[] input, int length, int expectedLength, EnumSet flags) + { + if (null == compressor || !flags.contains(Frame.Header.Flag.COMPRESSED) || length < 0) + return input; + + try + { + return compressor.decompress(input, 0, length, expectedLength); + } + catch (IOException e) + { + logger.info("IO error during decompression of frame body chunk", e); + throw new ProtocolException("Error decompressing frame body chunk"); + } + } + + private void putInt(int val, byte[] dest, int offset) + { + dest[offset] = (byte) (val >>> 24); + dest[offset + 1] = (byte) (val >>> 16); + dest[offset + 2] = (byte) (val >>> 8); + dest[offset + 3] = (byte) (val); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java b/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java new file mode 100644 index 0000000..db99edf --- /dev/null +++ b/src/java/org/apache/cassandra/transport/frame/compress/CompressingTransformer.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.frame.compress; + +import java.io.IOException; +import java.util.EnumSet; + +import io.netty.buffer.ByteBuf; +import org.apache.cassandra.transport.CBUtil; +import org.apache.cassandra.transport.Frame; +import org.apache.cassandra.transport.ProtocolException; +import org.apache.cassandra.transport.frame.FrameBodyTransformer; + +public abstract class CompressingTransformer implements FrameBodyTransformer +{ + private static final CompressingTransformer LZ4 = new LZ4(); + private static final CompressingTransformer SNAPPY = new Snappy(); + + private static final EnumSet headerFlags = EnumSet.of(Frame.Header.Flag.COMPRESSED); + + public static final CompressingTransformer getTransformer(Compressor compressor) + { + if (compressor instanceof LZ4Compressor) + return LZ4; + + if (compressor instanceof SnappyCompressor) + { + if (SnappyCompressor.INSTANCE == null) + throw new ProtocolException("This instance does not support Snappy compression"); + + return SNAPPY; + } + + throw new ProtocolException("Unsupported compression implementation: " + compressor.getClass().getCanonicalName()); + } + + CompressingTransformer() {} + + public EnumSet getOutboundHeaderFlags() + { + return headerFlags; + } + + public ByteBuf transformInbound(ByteBuf inputBuf, EnumSet flags) throws IOException + { + return transformInbound(inputBuf); + } + + abstract ByteBuf transformInbound(ByteBuf inputBuf) throws IOException; + + // Simple LZ4 encoding prefixes the compressed bytes with the + // length of the uncompressed bytes. This length is explicitly big-endian + // as the native protocol is entirely big-endian, so it feels like putting + // little-endian here would be a annoying trap for client writer + private static class LZ4 extends CompressingTransformer + { + public ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException + { + byte[] input = CBUtil.readRawBytes(inputBuf); + int maxCompressedLength = LZ4Compressor.INSTANCE.maxCompressedLength(input.length); + ByteBuf outputBuf = CBUtil.allocator.heapBuffer(Integer.BYTES + maxCompressedLength); + byte[] output = outputBuf.array(); + int outputOffset = outputBuf.arrayOffset(); + output[outputOffset] = (byte) (input.length >>> 24); + output[outputOffset + 1] = (byte) (input.length >>> 16); + output[outputOffset + 2] = (byte) (input.length >>> 8); + output[outputOffset + 3] = (byte) (input.length); + try + { + int written = LZ4Compressor.INSTANCE.compress(input, 0, input.length, output, Integer.BYTES + outputOffset); + outputBuf.writerIndex(Integer.BYTES + written); + return outputBuf; + } + catch (IOException e) + { + outputBuf.release(); + throw e; + } + } + + ByteBuf transformInbound(ByteBuf inputBuf) throws IOException + { + byte[] input = CBUtil.readRawBytes(inputBuf); + int uncompressedLength = ((input[0] & 0xFF) << 24) + | ((input[1] & 0xFF) << 16) + | ((input[2] & 0xFF) << 8) + | ((input[3] & 0xFF)); + ByteBuf outputBuf = CBUtil.allocator.heapBuffer(uncompressedLength); + try + { + outputBuf.writeBytes(LZ4Compressor.INSTANCE.decompress(input, + Integer.BYTES, + input.length - Integer.BYTES, + uncompressedLength)); + return outputBuf; + } + catch (IOException e) + { + outputBuf.release(); + throw e; + } + } + } + + // Simple Snappy encoding simply writes the compressed bytes, without the preceding length + private static class Snappy extends CompressingTransformer + { + public ByteBuf transformOutbound(ByteBuf inputBuf) throws IOException + { + byte[] input = CBUtil.readRawBytes(inputBuf); + int uncompressedLength = input.length; + int maxCompressedLength = SnappyCompressor.INSTANCE.maxCompressedLength(uncompressedLength); + ByteBuf outputBuf = CBUtil.allocator.heapBuffer(maxCompressedLength); + try + { + int written = SnappyCompressor.INSTANCE.compress(input, + 0, + uncompressedLength, + outputBuf.array(), + outputBuf.arrayOffset()); + outputBuf.writerIndex(written); + return outputBuf; + } + catch (IOException e) + { + outputBuf.release(); + throw e; + } + } + + ByteBuf transformInbound(ByteBuf inputBuf) throws IOException + { + byte[] input = CBUtil.readRawBytes(inputBuf); + int uncompressedLength = org.xerial.snappy.Snappy.uncompressedLength(input); + ByteBuf outputBuf = CBUtil.allocator.heapBuffer(uncompressedLength); + try + { + outputBuf.writeBytes(SnappyCompressor.INSTANCE.decompress(input, 0, input.length, uncompressedLength)); + return outputBuf; + } + catch (IOException e) + { + outputBuf.release(); + throw e; + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java new file mode 100644 index 0000000..e458bdb --- /dev/null +++ b/src/java/org/apache/cassandra/transport/frame/compress/Compressor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.frame.compress; + +import java.io.IOException; + +/** + * Analogous to {@link org.apache.cassandra.io.compress.ICompressor}, but different enough that + * it's worth specializing: + *

    + *
  • disk IO is mostly oriented around ByteBuffers, whereas with Frames raw byte arrays are + * primarily used
  • + *
  • our LZ4 compression format is opionated about the endianness of the preceding length + * bytes, big for protocol, little for disk
  • + *
  • ICompressor doesn't make it easy to pre-allocate the output buffer/array
  • + *
+ * + * In future it may be worth revisiting to unify the interfaces. + */ +public interface Compressor +{ + /** + * @param length the decompressed length being compressed + * @return the maximum length output possible for an input of the provided length + */ + int maxCompressedLength(int length); + + /** + * @param src the input bytes to be compressed + * @param srcOffset the offset to start compressing src from + * @param length the total number of bytes from srcOffset to pass to the compressor implementation + * @param dest the output buffer to write the compressed bytes to + * @param destOffset the offset into the dest buffer to start writing the compressed bytes + * @return the length of resulting compressed bytes written into the dest buffer + * @throws IOException if the compression implementation failed while compressing the input bytes + */ + int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException; + + /** + * @param src the compressed bytes to be decompressed + * @param expectedDecompressedLength the expected length the input bytes will decompress to + * @return a byte[] containing the resuling decompressed bytes + * @throws IOException thrown if the compression implementation failed to decompress the provided input bytes + */ + byte[] decompress(byte[] src, int srcOffset, int length, int expectedDecompressedLength) throws IOException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java new file mode 100644 index 0000000..8ac42e2 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/frame/compress/LZ4Compressor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.frame.compress; + +import java.io.IOException; + +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +public class LZ4Compressor implements Compressor +{ + public static final LZ4Compressor INSTANCE = new LZ4Compressor(); + + private final net.jpountz.lz4.LZ4Compressor compressor; + private final LZ4FastDecompressor decompressor; + + private LZ4Compressor() + { + final LZ4Factory lz4Factory = LZ4Factory.fastestInstance(); + compressor = lz4Factory.fastCompressor(); + decompressor = lz4Factory.fastDecompressor(); + } + + public int maxCompressedLength(int length) + { + return compressor.maxCompressedLength(length); + } + + public int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException + { + try + { + return compressor.compress(src, srcOffset, length, dest, destOffset); + } + catch (Throwable t) + { + throw new IOException("Error caught during LZ4 compression", t); + } + } + + public byte[] decompress(byte[] src, int offset, int length, int expectedDecompressedLength) throws IOException + { + try + { + return decompressor.decompress(src, offset, expectedDecompressedLength); + } + catch (Throwable t) + { + throw new IOException("Error caught during LZ4 decompression", t); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java b/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java new file mode 100644 index 0000000..27ea4c3 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/frame/compress/SnappyCompressor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport.frame.compress; + +import java.io.IOException; + +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.xerial.snappy.Snappy; +import org.xerial.snappy.SnappyError; + +public class SnappyCompressor implements Compressor +{ + public static final SnappyCompressor INSTANCE; + static + { + SnappyCompressor i; + try + { + i = new SnappyCompressor(); + } + catch (Exception e) + { + JVMStabilityInspector.inspectThrowable(e); + i = null; + } + catch (NoClassDefFoundError | SnappyError | UnsatisfiedLinkError e) + { + i = null; + } + INSTANCE = i; + } + + private SnappyCompressor() + { + // this would throw java.lang.NoClassDefFoundError if Snappy class + // wasn't found at runtime which should be processed by the calling method + Snappy.getNativeLibraryVersion(); + } + + @Override + public int maxCompressedLength(int length) + { + return Snappy.maxCompressedLength(length); + } + + @Override + public int compress(byte[] src, int srcOffset, int length, byte[] dest, int destOffset) throws IOException + { + return Snappy.compress(src, 0, src.length, dest, destOffset); + } + + @Override + public byte[] decompress(byte[] src, int offset, int length, int expectedDecompressedLength) throws IOException + { + if (!Snappy.isValidCompressedBuffer(src, 0, length)) + throw new IOException("Provided frame does not appear to be Snappy compressed"); + + int uncompressedLength = Snappy.uncompressedLength(src); + byte[] output = new byte[uncompressedLength]; + Snappy.uncompress(src, offset, length, output, 0); + return output; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java index 2b8e695..a29145a 100644 --- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java @@ -26,9 +26,10 @@ import io.netty.buffer.ByteBuf; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.transport.FrameCompressor; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.frame.compress.SnappyCompressor; +import org.apache.cassandra.utils.ChecksumType; /** * Message to indicate that the server is ready to receive requests. @@ -60,20 +61,29 @@ public class OptionsMessage extends Message.Request @Override protected Message.Response execute(QueryState state, long queryStartNanoTime, boolean traceRequest) { - List cqlVersions = new ArrayList(); + List cqlVersions = new ArrayList<>(); cqlVersions.add(QueryProcessor.CQL_VERSION.toString()); - List compressions = new ArrayList(); - if (FrameCompressor.SnappyCompressor.instance != null) + List compressions = new ArrayList<>(); + if (SnappyCompressor.INSTANCE != null) compressions.add("snappy"); // LZ4 is always available since worst case scenario it default to a pure JAVA implem. compressions.add("lz4"); - Map> supported = new HashMap>(); + Map> supported = new HashMap<>(); supported.put(StartupMessage.CQL_VERSION, cqlVersions); supported.put(StartupMessage.COMPRESSION, compressions); supported.put(StartupMessage.PROTOCOL_VERSIONS, ProtocolVersion.supportedVersions()); + if (connection.getVersion().supportsChecksums()) + { + ChecksumType[] types = ChecksumType.values(); + List checksumImpls = new ArrayList<>(types.length); + for (ChecksumType type : types) + checksumImpls.add(type.toString()); + supported.put(StartupMessage.CHECKSUM, checksumImpls); + } + return new SupportedMessage(supported); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index 01b9331..92c9764 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -26,7 +26,13 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.*; +import org.apache.cassandra.transport.frame.checksum.ChecksummingTransformer; +import org.apache.cassandra.transport.frame.compress.CompressingTransformer; +import org.apache.cassandra.transport.frame.compress.Compressor; +import org.apache.cassandra.transport.frame.compress.LZ4Compressor; +import org.apache.cassandra.transport.frame.compress.SnappyCompressor; import org.apache.cassandra.utils.CassandraVersion; +import org.apache.cassandra.utils.ChecksumType; /** * The initial message of the protocol. @@ -39,6 +45,7 @@ public class StartupMessage extends Message.Request public static final String PROTOCOL_VERSIONS = "PROTOCOL_VERSIONS"; public static final String DRIVER_NAME = "DRIVER_NAME"; public static final String DRIVER_VERSION = "DRIVER_VERSION"; + public static final String CHECKSUM = "CONTENT_CHECKSUM"; public static final Message.Codec codec = new Message.Codec() { @@ -83,23 +90,18 @@ public class StartupMessage extends Message.Request throw new ProtocolException(e.getMessage()); } - if (options.containsKey(COMPRESSION)) + ChecksumType checksumType = getChecksumType(); + Compressor compressor = getCompressor(); + + if (null != checksumType) { - String compression = options.get(COMPRESSION).toLowerCase(); - if (compression.equals("snappy")) - { - if (FrameCompressor.SnappyCompressor.instance == null) - throw new ProtocolException("This instance does not support Snappy compression"); - connection.setCompressor(FrameCompressor.SnappyCompressor.instance); - } - else if (compression.equals("lz4")) - { - connection.setCompressor(FrameCompressor.LZ4Compressor.instance); - } - else - { - throw new ProtocolException(String.format("Unknown compression algorithm: %s", compression)); - } + if (!connection.getVersion().supportsChecksums()) + throw new ProtocolException(String.format("Invalid message flag. Protocol version %s does not support frame body checksums", connection.getVersion().toString())); + connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, compressor)); + } + else if (null != compressor) + { + connection.setTransformer(CompressingTransformer.getTransformer(compressor)); } ClientState clientState = state.getClientState(); @@ -124,6 +126,42 @@ public class StartupMessage extends Message.Request return newMap; } + private ChecksumType getChecksumType() throws ProtocolException + { + String name = options.get(CHECKSUM); + try + { + return name != null ? ChecksumType.valueOf(name) : null; + } + catch (IllegalArgumentException e) + { + throw new ProtocolException(String.format("Requested checksum type %s is not known or supported by " + + "this version of Cassandra", name)); + } + } + + private Compressor getCompressor() throws ProtocolException + { + String name = options.get(COMPRESSION); + if (null == name) + return null; + + switch (name.toLowerCase()) + { + case "snappy": + { + if (SnappyCompressor.INSTANCE == null) + throw new ProtocolException("This instance does not support Snappy compression"); + + return SnappyCompressor.INSTANCE; + } + case "lz4": + return LZ4Compressor.INSTANCE; + default: + throw new ProtocolException(String.format("Unknown compression algorithm: %s", name)); + } + } + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 2fbbc28..adadb9c 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -871,9 +871,9 @@ public abstract class CQLTester return sessions.get(protocolVersion); } - protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression) throws IOException + protected SimpleClient newSimpleClient(ProtocolVersion version, boolean compression, boolean checksums) throws IOException { - return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression); + return new SimpleClient(nativeAddr.getHostAddress(), nativePort, version, version.isBeta(), new EncryptionOptions()).connect(compression, checksums); } protected String formatQuery(String query) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java index 0a314da..11df055 100644 --- a/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java +++ b/test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java @@ -292,7 +292,7 @@ public class PreparedStatementsTest extends CQLTester createTable("CREATE TABLE %s (pk int, v1 int, v2 int, PRIMARY KEY (pk))"); execute("INSERT INTO %s (pk, v1, v2) VALUES (1,1,1)"); - try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false)) + try (SimpleClient simpleClient = newSimpleClient(ProtocolVersion.BETA.orElse(ProtocolVersion.CURRENT), false, false)) { ResultMessage.Prepared prepUpdate = simpleClient.prepare(String.format("UPDATE %s.%s SET v1 = ?, v2 = ? WHERE pk = 1 IF v1 = ?", keyspace(), currentTable())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/service/ClientWarningsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java index e939df0..3ae49ed 100644 --- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java +++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java @@ -51,7 +51,7 @@ public class ClientWarningsTest extends CQLTester try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4)) { - client.connect(false); + client.connect(false, false); QueryMessage query = new QueryMessage(createBatchStatement2(1), QueryOptions.DEFAULT); Message.Response resp = client.execute(query); @@ -70,7 +70,7 @@ public class ClientWarningsTest extends CQLTester try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4)) { - client.connect(false); + client.connect(false, false); QueryMessage query = new QueryMessage(createBatchStatement2(DatabaseDescriptor.getBatchSizeWarnThreshold() / 2 + 1), QueryOptions.DEFAULT); Message.Response resp = client.execute(query); @@ -90,7 +90,7 @@ public class ClientWarningsTest extends CQLTester try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V4)) { - client.connect(false); + client.connect(false, false); for (int i = 0; i < iterations; i++) { @@ -130,7 +130,7 @@ public class ClientWarningsTest extends CQLTester try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V3)) { - client.connect(false); + client.connect(false, false); QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); Message.Response resp = client.execute(query); http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java index 4ade4ad..7f08cf2 100644 --- a/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java +++ b/test/unit/org/apache/cassandra/service/ProtocolBetaVersionTest.java @@ -70,7 +70,7 @@ public class ProtocolBetaVersionTest extends CQLTester try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, true, new EncryptionOptions())) { - client.connect(false); + client.connect(false, false); for (int i = 0; i < 10; i++) { QueryMessage query = new QueryMessage(String.format("INSERT INTO %s.%s (pk, v) VALUES (%s, %s)", @@ -105,7 +105,7 @@ public class ProtocolBetaVersionTest extends CQLTester assertTrue(betaVersion.isBeta()); // change to another beta version or remove test if no beta version try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, betaVersion, false, new EncryptionOptions())) { - client.connect(false); + client.connect(false, false); fail("Exception should have been thrown"); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/65fb17a8/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java index 0c15bca..a2ee6fb 100644 --- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java +++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java @@ -129,7 +129,7 @@ public class MessagePayloadTest extends CQLTester new EncryptionOptions()); try { - client.connect(false); + client.connect(false, false); Map reqMap; Map respMap; @@ -205,7 +205,7 @@ public class MessagePayloadTest extends CQLTester SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort); try { - client.connect(false); + client.connect(false, false); Map reqMap; Map respMap; @@ -274,7 +274,7 @@ public class MessagePayloadTest extends CQLTester SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, ProtocolVersion.V3); try { - client.connect(false); + client.connect(false, false); Map reqMap; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org For additional commands, e-mail: commits-help@cassandra.apache.org