cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject [1/3] cassandra git commit: Suppress some pushed events when rpc_address is shared
Date Wed, 30 Sep 2015 15:51:53 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 3ae01ddcc -> a039b7df5


Suppress some pushed events when rpc_address is shared

Patch by Stefania Alborghetti; reviewed by Olivier Michallat and Tyler Hobbs for
CASSANDRA-10052


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f6cab37d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f6cab37d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f6cab37d

Branch: refs/heads/cassandra-3.0
Commit: f6cab37d5ee42313c7a5618c5d0694f312c9c194
Parents: 4c6411f
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Wed Sep 30 10:46:34 2015 -0500
Committer: Tyler Hobbs <tylerlhobbs@gmail.com>
Committed: Wed Sep 30 10:46:34 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../org/apache/cassandra/transport/Event.java   | 29 ++++++++----
 .../org/apache/cassandra/transport/Server.java  | 48 ++++++++++++++------
 3 files changed, 57 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3364dcd..0ad2b36 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.1.10
+ * Avoid misleading pushed notifications when multiple nodes
+   share an rpc_address (CASSANDRA-10052)
  * Fix dropping undroppable when message queue is full (CASSANDRA-10113)
  * Fix potential ClassCastException during paging (CASSANDRA-10352)
  * Prevent ALTER TYPE from creating circular references (CASSANDRA-10339)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index b7c5e68..12ad6e9 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -62,18 +62,32 @@ public abstract class Event
     protected abstract void serializeEvent(ByteBuf dest, int version);
     protected abstract int eventSerializedSize(int version);
 
-    public static class TopologyChange extends Event
+    public static abstract class NodeEvent extends Event
+    {
+        public final InetSocketAddress node;
+
+        public InetAddress nodeAddress()
+        {
+            return node.getAddress();
+        }
+
+        private NodeEvent(Type type, InetSocketAddress node)
+        {
+            super(type);
+            this.node = node;
+        }
+    }
+
+    public static class TopologyChange extends NodeEvent
     {
         public enum Change { NEW_NODE, REMOVED_NODE, MOVED_NODE }
 
         public final Change change;
-        public final InetSocketAddress node;
 
         private TopologyChange(Change change, InetSocketAddress node)
         {
-            super(Type.TOPOLOGY_CHANGE);
+            super(Type.TOPOLOGY_CHANGE, node);
             this.change = change;
-            this.node = node;
         }
 
         public static TopologyChange newNode(InetAddress host, int port)
@@ -134,18 +148,17 @@ public abstract class Event
         }
     }
 
-    public static class StatusChange extends Event
+
+    public static class StatusChange extends NodeEvent
     {
         public enum Status { UP, DOWN }
 
         public final Status status;
-        public final InetSocketAddress node;
 
         private StatusChange(Status status, InetSocketAddress node)
         {
-            super(Type.STATUS_CHANGE);
+            super(Type.STATUS_CHANGE, node);
             this.status = status;
-            this.node = node;
         }
 
         public static StatusChange nodeUp(InetAddress host, int port)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f6cab37d/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 8f0f89f..c21a669 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -55,6 +55,7 @@ import io.netty.channel.*;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.ssl.SslHandler;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class Server implements CassandraDaemon.Server
 {
@@ -381,78 +382,97 @@ public class Server implements CassandraDaemon.Server
             }
         }
 
+        private void send(InetAddress endpoint, Event.NodeEvent event)
+        {
+            // If the endpoint is not the local node, extract the node address
+            // and if it is the same as our own RPC broadcast address (which defaults to
the rcp address)
+            // then don't send the notification. This covers the case of rpc_address set
to "localhost",
+            // which is not useful to any driver and in fact may cauase serious problems
to some drivers,
+            // see CASSANDRA-10052
+            if (!endpoint.equals(FBUtilities.getBroadcastAddress()) &&
+                event.nodeAddress().equals(DatabaseDescriptor.getBroadcastRpcAddress()))
+                return;
+
+            send(event);
+        }
+
+        private void send(Event event)
+        {
+            server.connectionTracker.send(event);
+        }
+
         public void onJoinCluster(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint),
server.socket.getPort()));
+            send(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onLeaveCluster(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint),
server.socket.getPort()));
+            send(endpoint, Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onMove(InetAddress endpoint)
         {
-            server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint),
server.socket.getPort()));
+            send(endpoint, Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onUp(InetAddress endpoint)
         {
             Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.UP);
             if (prev == null || prev != Event.StatusChange.Status.UP)
-                server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint),
server.socket.getPort()));
+                send(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onDown(InetAddress endpoint)
         {
             Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN);
             if (prev == null || prev != Event.StatusChange.Status.DOWN)
-                server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint),
server.socket.getPort()));
+                send(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort()));
         }
 
         public void onCreateKeyspace(String ksName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED,
ksName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, ksName));
         }
 
         public void onCreateColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED,
Event.SchemaChange.Target.TABLE, ksName, cfName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TABLE,
ksName, cfName));
         }
 
         public void onCreateUserType(String ksName, String typeName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED,
Event.SchemaChange.Target.TYPE, ksName, typeName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.CREATED, Event.SchemaChange.Target.TYPE,
ksName, typeName));
         }
 
         public void onUpdateKeyspace(String ksName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED,
ksName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, ksName));
         }
 
         public void onUpdateColumnFamily(String ksName, String cfName, boolean columnsDidChange)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED,
Event.SchemaChange.Target.TABLE, ksName, cfName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TABLE,
ksName, cfName));
         }
 
         public void onUpdateUserType(String ksName, String typeName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED,
Event.SchemaChange.Target.TYPE, ksName, typeName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.UPDATED, Event.SchemaChange.Target.TYPE,
ksName, typeName));
         }
 
         public void onDropKeyspace(String ksName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED,
ksName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, ksName));
         }
 
         public void onDropColumnFamily(String ksName, String cfName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED,
Event.SchemaChange.Target.TABLE, ksName, cfName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TABLE,
ksName, cfName));
         }
 
         public void onDropUserType(String ksName, String typeName)
         {
-            server.connectionTracker.send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED,
Event.SchemaChange.Target.TYPE, ksName, typeName));
+            send(new Event.SchemaChange(Event.SchemaChange.Change.DROPPED, Event.SchemaChange.Target.TYPE,
ksName, typeName));
         }
     }
 }


Mime
View raw message