Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A65FE18702 for ; Wed, 30 Sep 2015 15:52:15 +0000 (UTC) Received: (qmail 64328 invoked by uid 500); 30 Sep 2015 15:52:15 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 64228 invoked by uid 500); 30 Sep 2015 15:52:15 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 64201 invoked by uid 99); 30 Sep 2015 15:52:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Sep 2015 15:52:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1C4D8E08CE; Wed, 30 Sep 2015 15:52:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tylerhobbs@apache.org To: commits@cassandra.apache.org Date: Wed, 30 Sep 2015 15:52:16 -0000 Message-Id: <013e92b1720d4d42a6b675a9d52e6f12@git.apache.org> In-Reply-To: <4ec49ac641624f92b9fae4ce2ceb8147@git.apache.org> References: <4ec49ac641624f92b9fae4ce2ceb8147@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/4] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2 Merge branch 'cassandra-2.1' into cassandra-2.2 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7452b205 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7452b205 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7452b205 Branch: refs/heads/trunk Commit: 7452b20503c376c9ea15fdfac8da0c78381b3f73 Parents: fc67545 f6cab37 Author: Tyler Hobbs Authored: Wed Sep 30 10:49:42 2015 -0500 Committer: Tyler Hobbs Committed: Wed Sep 30 10:49:42 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/transport/Event.java | 29 +++++--- .../org/apache/cassandra/transport/Server.java | 71 +++++++++++++------- 3 files changed, 70 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index c36c6f5,0ad2b36..45070b2 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,16 -1,6 +1,18 @@@ -2.1.10 +2.2.2 + * cqlsh prompt includes name of keyspace after failed `use` statement (CASSANDRA-10369) + * Configurable page size in cqlsh (CASSANDRA-9855) + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761) + * Cancel transaction for sstables we wont redistribute index summary + for (CASSANDRA-10270) + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) + * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222) + * Fix failure to start with space in directory path on Windows (CASSANDRA-10239) + * Fix repair hang when snapshot failed (CASSANDRA-10057) + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks + (CASSANDRA-10199) +Merged from 2.1: + * Avoid misleading pushed notifications when multiple nodes + share an rpc_address (CASSANDRA-10052) * Fix dropping undroppable when message queue is full (CASSANDRA-10113) * Fix potential ClassCastException during paging (CASSANDRA-10352) * Prevent ALTER TYPE from creating circular references (CASSANDRA-10339) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7452b205/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/transport/Server.java index 72a1b60,c21a669..d610bff --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@@ -55,6 -50,12 +55,7 @@@ import org.apache.cassandra.metrics.Cli import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; import org.apache.cassandra.transport.messages.EventMessage; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.*; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.handler.ssl.SslHandler; + import org.apache.cassandra.utils.FBUtilities; public class Server implements CassandraDaemon.Server { @@@ -409,9 -382,28 +410,31 @@@ } } + private void send(InetAddress endpoint, Event.NodeEvent event) + { ++ if (logger.isTraceEnabled()) ++ logger.trace("Sending event for endpoint {}, rpc address {}", endpoint, event.nodeAddress()); ++ + // If the endpoint is not the local node, extract the node address + // and if it is the same as our own RPC broadcast address (which defaults to the rcp address) + // then don't send the notification. This covers the case of rpc_address set to "localhost", + // which is not useful to any driver and in fact may cauase serious problems to some drivers, + // see CASSANDRA-10052 + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && + event.nodeAddress().equals(DatabaseDescriptor.getBroadcastRpcAddress())) + return; + + send(event); + } + + private void send(Event event) + { + server.connectionTracker.send(event); + } + public void onJoinCluster(InetAddress endpoint) { - send(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); + onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onLeaveCluster(InetAddress endpoint) @@@ -431,35 -425,9 +454,35 @@@ public void onDown(InetAddress endpoint) { - Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN); - if (prev == null || prev != Event.StatusChange.Status.DOWN) - send(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); + onStatusChange(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); + } + + private void onTopologyChange(InetAddress endpoint, Event.TopologyChange event) + { + if (logger.isTraceEnabled()) + logger.trace("Topology changed event : {}, {}", endpoint, event.change); + + LatestEvent prev = latestEvents.get(endpoint); + if (prev == null || prev.topology != event.change) + { + LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev)); + if (ret == prev) - server.connectionTracker.send(event); ++ send(endpoint, event); + } + } + + private void onStatusChange(InetAddress endpoint, Event.StatusChange event) + { + if (logger.isTraceEnabled()) + logger.trace("Status changed event : {}, {}", endpoint, event.status); + + LatestEvent prev = latestEvents.get(endpoint); + if (prev == null || prev.status != event.status) + { - LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, prev)); ++ LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, null)); + if (ret == prev) - server.connectionTracker.send(event); ++ send(endpoint, event); + } } public void onCreateKeyspace(String ksName) @@@ -474,24 -442,12 +497,24 @@@ public void onCreateUserType(String ksName, String typeName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } + public void onCreateFunction(String ksName, String functionName, List> argTypes) + { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, - ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); ++ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.FUNCTION, ++ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); + } + + public void onCreateAggregate(String ksName, String aggregateName, List> argTypes) + { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, - ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); ++ send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.AGGREGATE, ++ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); + } + public void onUpdateKeyspace(String ksName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName)); } public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange) @@@ -501,24 -457,12 +524,24 @@@ public void onUpdateUserType(String ksName, String typeName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } + public void onUpdateFunction(String ksName, String functionName, List> argTypes) + { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION, - ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); ++ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.FUNCTION, ++ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); + } + + public void onUpdateAggregate(String ksName, String aggregateName, List> argTypes) + { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, - ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); ++ send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.AGGREGATE, ++ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); + } + public void onDropKeyspace(String ksName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName)); } public void onDropColumnFamily(String ksName, String cfName) @@@ -528,19 -472,7 +551,19 @@@ public void onDropUserType(String ksName, String typeName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } + + public void onDropFunction(String ksName, String functionName, List> argTypes) + { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, - ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); ++ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.FUNCTION, ++ ksName, functionName, AbstractType.asCQLTypeStringList(argTypes))); + } + + public void onDropAggregate(String ksName, String aggregateName, List> argTypes) + { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, - ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); ++ send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.AGGREGATE, ++ ksName, aggregateName, AbstractType.asCQLTypeStringList(argTypes))); + } } }