Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CDD6B186E7 for ; Wed, 30 Sep 2015 15:49:52 +0000 (UTC) Received: (qmail 58294 invoked by uid 500); 30 Sep 2015 15:49:52 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 58261 invoked by uid 500); 30 Sep 2015 15:49:52 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 58240 invoked by uid 99); 30 Sep 2015 15:49:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Sep 2015 15:49:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3B213E08CE; Wed, 30 Sep 2015 15:49:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tylerhobbs@apache.org To: commits@cassandra.apache.org Date: Wed, 30 Sep 2015 15:49:52 -0000 Message-Id: <850567175ef34aefb2d815a4df1564b5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] cassandra git commit: Suppress some pushed events when rpc_address is shared Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 fc675450e -> 7452b2050 Suppress some pushed events when rpc_address is shared Patch by Stefania Alborghetti; reviewed by Olivier Michallat and Tyler Hobbs for CASSANDRA-10052 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6cab37d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6cab37d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6cab37d Branch: refs/heads/cassandra-2.2 Commit: f6cab37d5ee42313c7a5618c5d0694f312c9c194 Parents: 4c6411f Author: Stefania Alborghetti Authored: Wed Sep 30 10:46:34 2015 -0500 Committer: Tyler Hobbs Committed: Wed Sep 30 10:46:34 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../org/apache/cassandra/transport/Event.java | 29 ++++++++---- .../org/apache/cassandra/transport/Server.java | 48 ++++++++++++++------ 3 files changed, 57 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3364dcd..0ad2b36 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.1.10 + * Avoid misleading pushed notifications when multiple nodes + share an rpc_address (CASSANDRA-10052) * Fix dropping undroppable when message queue is full (CASSANDRA-10113) * Fix potential ClassCastException during paging (CASSANDRA-10352) * Prevent ALTER TYPE from creating circular references (CASSANDRA-10339) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index b7c5e68..12ad6e9 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -62,18 +62,32 @@ public abstract class Event protected abstract void serializeEvent(ByteBuf dest, int version); protected abstract int eventSerializedSize(int version); - public static class TopologyChange extends Event + public static abstract class NodeEvent extends Event + { + public final InetSocketAddress node; + + public InetAddress nodeAddress() + { + return node.getAddress(); + } + + private NodeEvent(Type type, InetSocketAddress node) + { + super(type); + this.node = node; + } + } + + public static class TopologyChange extends NodeEvent { public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE } public final Change change; - public final InetSocketAddress node; private TopologyChange(Change change, InetSocketAddress node) { - super(Type.TOPOLOGY_CHANGE); + super(Type.TOPOLOGY_CHANGE, node); this.change = change; - this.node = node; } public static TopologyChange newNode(InetAddress host, int port) @@ -134,18 +148,17 @@ public abstract class Event } } - public static class StatusChange extends Event + + public static class StatusChange extends NodeEvent { public enum Status { UP, DOWN } public final Status status; - public final InetSocketAddress node; private StatusChange(Status status, InetSocketAddress node) { - super(Type.STATUS_CHANGE); + super(Type.STATUS_CHANGE, node); this.status = status; - this.node = node; } public static StatusChange nodeUp(InetAddress host, int port) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 8f0f89f..c21a669 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -55,6 +55,7 @@ import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.ssl.SslHandler; +import org.apache.cassandra.utils.FBUtilities; public class Server implements CassandraDaemon.Server { @@ -381,78 +382,97 @@ public class Server implements CassandraDaemon.Server } } + private void send(InetAddress endpoint, Event.NodeEvent event) + { + // If the endpoint is not the local node, extract the node address + // and if it is the same as our own RPC broadcast address (which defaults to the rcp address) + // then don't send the notification. This covers the case of rpc_address set to "localhost", + // which is not useful to any driver and in fact may cauase serious problems to some drivers, + // see CASSANDRA-10052 + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && + event.nodeAddress().equals(DatabaseDescriptor.getBroadcastRpcAddress())) + return; + + send(event); + } + + private void send(Event event) + { + server.connectionTracker.send(event); + } + public void onJoinCluster(InetAddress endpoint) { - server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); + send(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onLeaveCluster(InetAddress endpoint) { - server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort())); + send(endpoint, Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onMove(InetAddress endpoint) { - server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort())); + send(endpoint, Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onUp(InetAddress endpoint) { Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.UP); if (prev == null || prev != Event.StatusChange.Status.UP) - server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); + send(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); } public void onDown(InetAddress endpoint) { Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN); if (prev == null || prev != Event.StatusChange.Status.DOWN) - server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); + send(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); } public void onCreateKeyspace(String ksName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName)); } public void onCreateColumnFamily(String ksName, String cfName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); } public void onCreateUserType(String ksName, String typeName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } public void onUpdateKeyspace(String ksName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName)); } public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE, ksName, cfName)); } public void onUpdateUserType(String ksName, String typeName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } public void onDropKeyspace(String ksName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName)); } public void onDropColumnFamily(String ksName, String cfName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE, ksName, cfName)); } public void onDropUserType(String ksName, String typeName) { - server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName)); + send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE, ksName, typeName)); } } }