Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5450310ABE for ; Thu, 14 May 2015 14:25:06 +0000 (UTC) Received: (qmail 57814 invoked by uid 500); 14 May 2015 14:25:06 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 57778 invoked by uid 500); 14 May 2015 14:25:06 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 57766 invoked by uid 99); 14 May 2015 14:25:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 May 2015 14:25:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F30DFDFF6A; Thu, 14 May 2015 14:25:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aleksey@apache.org To: commits@cassandra.apache.org Message-Id: <8d9c6016fcf94633b2da81728d6087c7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Add client warnings to native protocol v4 Date: Thu, 14 May 2015 14:25:05 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/trunk 1684e08cf -> 68722e7e5 Add client warnings to native protocol v4 patch by Carl Yeksigan; reviewed by Tyler Hobbs and Aleksey Yeschenko for CASSANDRA-8930 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68722e7e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68722e7e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68722e7e Branch: refs/heads/trunk Commit: 68722e7e594d228b4bf14c8cd8cbee19b50835ec Parents: 1684e08 Author: Carl Yeksigian Authored: Thu May 14 17:23:15 2015 +0300 Committer: Aleksey Yeschenko Committed: Thu May 14 17:24:57 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol_v4.spec | 9 ++- .../org/apache/cassandra/config/Config.java | 4 +- .../cassandra/config/DatabaseDescriptor.java | 10 +-- .../cql3/statements/BatchStatement.java | 2 + .../cassandra/db/filter/SliceQueryFilter.java | 2 + .../apache/cassandra/service/ClientWarn.java | 73 ++++++++++++++++++ .../org/apache/cassandra/transport/Frame.java | 3 +- .../org/apache/cassandra/transport/Message.java | 38 ++++++++- .../cassandra/transport/SimpleClient.java | 3 +- .../cassandra/service/ClientWarningsTest.java | 81 ++++++++++++++++++++ 11 files changed, 215 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e83b385..325a5f3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2 + * Add client warnings to native protocol v4 (CASSANDRA-8930) * Allow roles cache to be invalidated (CASSANDRA-8967) * Upgrade Snappy (CASSANDRA-9063) * Don't start Thrift rpc by default (CASSANDRA-9319) http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/doc/native_protocol_v4.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec index 143fc4a..4014594 100644 --- a/doc/native_protocol_v4.spec +++ b/doc/native_protocol_v4.spec @@ -133,6 +133,12 @@ Table of Contents If both trace-flag and payload-flag are set, the generic key-value payload appears after trace's data. Type of custom payload is [bytes map] (see below). + 0x08: Warning flag. The response contains warnings from the server which + were generated by the server to go along with this response. + If a response frame has the warning flag set, its body will contain the + text of the warnings. The warnings are a [string list] and will be the + first value in the frame body if the tracing flag is not set, or directly + after the tracing ID if it is. The rest of the flags is currently unused and ignored. @@ -772,7 +778,7 @@ Table of Contents Clients are expected to answer the server challenge by an AUTH_RESPONSE message. -4.2.7. AUTH_SUCCESS +4.2.8. AUTH_SUCCESS Indicate the success of the authentication phase. See Section 4.2.3 for more details. @@ -1134,3 +1140,4 @@ Table of Contents * Function_failure error code was added. * Add custom payload to frames for custom QueryHandler implementations (ignored by Cassandra's standard QueryHandler) * Add "TRACE_COMPLETE" event (section 4.2.6). + * Add warnings to frames for responses for which the server generated a warning during processing, which the client needs to address. http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 421794e..2ede76e 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -148,8 +148,8 @@ public class Config /* if the size of columns or super-columns are more than this, indexing will kick in */ public Integer column_index_size_in_kb = 64; - public Integer batch_size_warn_threshold_in_kb = 5; - public volatile Integer batch_size_fail_threshold_in_kb = 50; + public volatile int batch_size_warn_threshold_in_kb = 5; + public volatile int batch_size_fail_threshold_in_kb = 50; public Integer concurrent_compactors; public volatile Integer compaction_throughput_mb_per_sec = 16; http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index ec90be2..b5c5fb4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -617,11 +617,6 @@ public class DatabaseDescriptor } if (seedProvider.getSeeds().size() == 0) throw new ConfigurationException("The seed provider lists no seeds.", false); - - if (conf.batch_size_fail_threshold_in_kb == null) - { - conf.batch_size_fail_threshold_in_kb = conf.batch_size_warn_threshold_in_kb * 10; - } } private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException @@ -801,6 +796,11 @@ public class DatabaseDescriptor return conf.batch_size_fail_threshold_in_kb; } + public static void setBatchSizeWarnThresholdInKB(int threshold) + { + conf.batch_size_warn_threshold_in_kb = threshold; + } + public static void setBatchSizeFailThresholdInKB(int threshold) { conf.batch_size_fail_threshold_in_kb = threshold; http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 465b2d9..ddc46c1 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -32,6 +32,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.tracing.Tracing; @@ -260,6 +261,7 @@ public class BatchStatement implements CQLStatement { logger.warn(format, ksCfPairs, size, warnThreshold, size - warnThreshold, ""); } + ClientWarn.warn(String.format(format, ksCfPairs, size, warnThreshold, size - warnThreshold, "")); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index d914f51..697c715 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -38,6 +38,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; public class SliceQueryFilter implements IDiskAtomFilter @@ -241,6 +242,7 @@ public class SliceQueryFilter implements IDiskAtomFilter container.metadata().getKeyValidator().getString(key.getKey()), count, getSlicesInfo(container)); + ClientWarn.warn(msg); logger.warn(msg); } Tracing.trace("Read {} live and {} tombstone cells{}", http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/service/ClientWarn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java b/src/java/org/apache/cassandra/service/ClientWarn.java new file mode 100644 index 0000000..2ed0a6c --- /dev/null +++ b/src/java/org/apache/cassandra/service/ClientWarn.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.utils.FBUtilities; + +public class ClientWarn +{ + private static final String TRUNCATED = " [truncated]"; + private static final ThreadLocal warnLocal = new ThreadLocal<>(); + + private final List warnings = new ArrayList<>(); + + private ClientWarn() + { + } + + public static void warn(String text) + { + ClientWarn warner = warnLocal.get(); + if (warner != null) + warner.add(text); + } + + private void add(String warning) + { + if (warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT) + warnings.add(maybeTruncate(warning)); + } + + private static String maybeTruncate(String warning) + { + return warning.length() > FBUtilities.MAX_UNSIGNED_SHORT + ? warning.substring(0, FBUtilities.MAX_UNSIGNED_SHORT - TRUNCATED.length()) + TRUNCATED + : warning; + } + + public static void captureWarnings() + { + warnLocal.set(new ClientWarn()); + } + + public static List getWarnings() + { + ClientWarn warner = warnLocal.get(); + if (warner == null || warner.warnings.isEmpty()) + return null; + return warner.warnings; + } + + public static void resetWarnings() + { + warnLocal.remove(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index b72259d..0c038ea 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -112,7 +112,8 @@ public class Frame // The order of that enum matters!! COMPRESSED, TRACING, - CUSTOM_PAYLOAD; + CUSTOM_PAYLOAD, + WARNING; private static final Flag[] ALL_VALUES = values(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 3382593..b6d5a95 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -41,6 +41,7 @@ import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.transport.messages.*; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.utils.JVMStabilityInspector; @@ -223,6 +224,7 @@ public abstract class Message public static abstract class Response extends Message { protected UUID tracingId; + protected List warnings; protected Response(Type type) { @@ -242,6 +244,17 @@ public abstract class Message { return tracingId; } + + public Message setWarnings(List warnings) + { + this.warnings = warnings; + return this; + } + + public List getWarnings() + { + return warnings; + } } @ChannelHandler.Sharable @@ -252,8 +265,10 @@ public abstract class Message boolean isRequest = frame.header.type.direction == Direction.REQUEST; boolean isTracing = frame.header.flags.contains(Frame.Header.Flag.TRACING); boolean isCustomPayload = frame.header.flags.contains(Frame.Header.Flag.CUSTOM_PAYLOAD); + boolean hasWarning = frame.header.flags.contains(Frame.Header.Flag.WARNING); UUID tracingId = isRequest || !isTracing ? null : CBUtil.readUUID(frame.body); + List warnings = isRequest || !hasWarning ? null : CBUtil.readStringList(frame.body); Map customPayload = !isCustomPayload ? null : CBUtil.readBytesMap(frame.body); try @@ -280,6 +295,8 @@ public abstract class Message assert message instanceof Response; if (isTracing) ((Response)message).setTracingId(tracingId); + if (hasWarning) + ((Response)message).setWarnings(warnings); } results.add(message); @@ -315,6 +332,13 @@ public abstract class Message Map customPayload = message.getCustomPayload(); if (tracingId != null) messageSize += CBUtil.sizeOfUUID(tracingId); + List warnings = ((Response)message).getWarnings(); + if (warnings != null) + { + if (version < Server.VERSION_4) + throw new ProtocolException("Must not send frame with WARNING flag for native protocol version < 4"); + messageSize += CBUtil.sizeOfStringList(warnings); + } if (customPayload != null) { if (version < Server.VERSION_4) @@ -327,6 +351,11 @@ public abstract class Message CBUtil.writeUUID(tracingId, body); flags.add(Frame.Header.Flag.TRACING); } + if (warnings != null) + { + CBUtil.writeStringList(warnings, body); + flags.add(Frame.Header.Flag.WARNING); + } if (customPayload != null) { CBUtil.writeBytesMap(customPayload, body); @@ -468,12 +497,15 @@ public abstract class Message { assert request.connection() instanceof ServerConnection; connection = (ServerConnection)request.connection(); + if (connection.getVersion() >= Server.VERSION_4) + ClientWarn.captureWarnings(); + QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); logger.debug("Received: {}, v={}", request, connection.getVersion()); - response = request.execute(qstate); response.setStreamId(request.getStreamId()); + response.setWarnings(ClientWarn.getWarnings()); response.attach(connection); connection.applyStateTransition(request.type, response.type); } @@ -484,6 +516,10 @@ public abstract class Message flush(new FlushItem(ctx, ErrorMessage.fromException(t, handler).setStreamId(request.getStreamId()), request.getSourceFrame())); return; } + finally + { + ClientWarn.resetWarnings(); + } logger.debug("Responding: {}, v={}", response, connection.getVersion()); flush(new FlushItem(ctx, response, request.getSourceFrame())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index b39f166..701a24c 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.transport; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -60,7 +61,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.handler.ssl.SslHandler; import static org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; -public class SimpleClient +public class SimpleClient implements Closeable { static { http://git-wip-us.apache.org/repos/asf/cassandra/blob/68722e7e/test/unit/org/apache/cassandra/service/ClientWarningsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java new file mode 100644 index 0000000..ce35169 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service; + +import org.apache.commons.lang3.StringUtils; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.SimpleClient; +import org.apache.cassandra.transport.messages.QueryMessage; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNull; + +public class ClientWarningsTest extends CQLTester +{ + @BeforeClass + public static void setUp() + { + requireNetwork(); + DatabaseDescriptor.setBatchSizeWarnThresholdInKB(1); + } + + @Test + public void testLargeBatchWithProtoV4() throws Exception + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)"); + + try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_4)) + { + client.connect(false); + + QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); + Message.Response resp = client.execute(query); + assertEquals(1, resp.getWarnings().size()); + } + } + + @Test + public void testLargeBatchWithProtoV2() throws Exception + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)"); + + try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2)) + { + client.connect(false); + + QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); + Message.Response resp = client.execute(query); + assertNull(resp.getWarnings()); + } + } + + private String createBatchStatement(int minSize) + { + return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;", + KEYSPACE, + currentTable(), + StringUtils.repeat('1', minSize)); + } +}