cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject [1/2] cassandra git commit: Route gossip messages over dedicated socket
Date Thu, 13 Aug 2015 12:42:05 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 0ff2a5b71 -> 6e2cf5720


Route gossip messages over dedicated socket

patch by awiesberg; reviewed by jasobrown for CASSANDRA-9237


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9a6ee93f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9a6ee93f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9a6ee93f

Branch: refs/heads/cassandra-3.0
Commit: 9a6ee93f63301d03ff1d20032e6d80f806a14d5d
Parents: 85d550f
Author: Jason Brown <jasedbrown@gmail.com>
Authored: Thu Aug 13 05:38:45 2015 -0700
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Thu Aug 13 05:38:45 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/metrics/ConnectionMetrics.java    | 30 ++++++++++++++++++++
 .../org/apache/cassandra/net/MessageOut.java    |  2 +-
 .../apache/cassandra/net/MessagingService.java  | 24 ++++++++++++++++
 .../cassandra/net/MessagingServiceMBean.java    | 15 ++++++++++
 .../net/OutboundTcpConnectionPool.java          | 14 +++++++--
 .../cassandra/tools/nodetool/NetStats.java      |  8 ++++++
 7 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cff477b..3b548e5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.1
+ * Route gossip messages over dedicated socket (CASSANDRA-9237)
  * Add checksum to saved cache files (CASSANDRA-9265)
  * Log warning when using an aggregate without partition key (CASSANDRA-9737)
  * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
index 9661c48..f01c06d 100644
--- a/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ConnectionMetrics.java
@@ -50,6 +50,12 @@ public class ConnectionMetrics
     public final Gauge<Long> smallMessageCompletedTasks;
     /** Dropped tasks for small message TCP Connections */
     public final Gauge<Long> smallMessageDroppedTasks;
+    /** Pending tasks for gossip message TCP Connections */
+    public final Gauge<Integer> gossipMessagePendingTasks;
+    /** Completed tasks for gossip message TCP Connections */
+    public final Gauge<Long> gossipMessageCompletedTasks;
+    /** Dropped tasks for gossip message TCP Connections */
+    public final Gauge<Long> gossipMessageDroppedTasks;
 
     /** Number of timeouts for specific IP */
     public final Meter timeouts;
@@ -111,6 +117,27 @@ public class ConnectionMetrics
                 return connectionPool.smallMessages.getDroppedMessages();
             }
         });
+        gossipMessagePendingTasks = Metrics.register(factory.createMetricName("GossipMessagePendingTasks"),
new Gauge<Integer>()
+        {
+            public Integer getValue()
+            {
+                return connectionPool.gossipMessages.getPendingMessages();
+            }
+        });
+        gossipMessageCompletedTasks = Metrics.register(factory.createMetricName("GossipMessageCompletedTasks"),
new Gauge<Long>()
+        {
+            public Long getValue()
+            {
+                return connectionPool.gossipMessages.getCompletedMesssages();
+            }
+        });
+        gossipMessageDroppedTasks = Metrics.register(factory.createMetricName("GossipMessageDroppedTasks"),
new Gauge<Long>()
+        {
+            public Long getValue()
+            {
+                return connectionPool.gossipMessages.getDroppedMessages();
+            }
+        });
         timeouts = Metrics.meter(factory.createMetricName("Timeouts"));
     }
 
@@ -122,6 +149,9 @@ public class ConnectionMetrics
         Metrics.remove(factory.createMetricName("SmallMessagePendingTasks"));
         Metrics.remove(factory.createMetricName("SmallMessageCompletedTasks"));
         Metrics.remove(factory.createMetricName("SmallMessageDroppedTasks"));
+        Metrics.remove(factory.createMetricName("GossipMessagePendingTasks"));
+        Metrics.remove(factory.createMetricName("GossipMessageCompletedTasks"));
+        Metrics.remove(factory.createMetricName("GossipMessageDroppedTasks"));
         Metrics.remove(factory.createMetricName("Timeouts"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 28038b3..1e291c2 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -88,7 +88,7 @@ public class MessageOut<T>
         return new MessageOut<T>(verb, payload, serializer, builder.build());
     }
 
-    private Stage getStage()
+    public Stage getStage()
     {
         return MessagingService.verbStages.get(verb);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index f8fd6fd..944dced 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1066,6 +1066,30 @@ public final class MessagingService implements MessagingServiceMBean
         return droppedTasks;
     }
 
+    public Map<String, Integer> getGossipMessagePendingTasks()
+    {
+        Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size());
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+            pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getPendingMessages());
+        return pendingTasks;
+    }
+
+    public Map<String, Long> getGossipMessageCompletedTasks()
+    {
+        Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size());
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+            completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getCompletedMesssages());
+        return completedTasks;
+    }
+
+    public Map<String, Long> getGossipMessageDroppedTasks()
+    {
+        Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size());
+        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
+            droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getDroppedMessages());
+        return droppedTasks;
+    }
+
     public Map<String, Integer> getDroppedMessages()
     {
         Map<String, Integer> map = new HashMap<>(droppedMessagesMap.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
index f1b418c..3bcb0d5 100644
--- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
+++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java
@@ -59,6 +59,21 @@ public interface MessagingServiceMBean
     public Map<String, Long> getSmallMessageDroppedTasks();
 
     /**
+     * Pending tasks for gossip message TCP Connections
+     */
+    public Map<String, Integer> getGossipMessagePendingTasks();
+
+    /**
+     * Completed tasks for gossip message TCP Connections
+     */
+    public Map<String, Long> getGossipMessageCompletedTasks();
+
+    /**
+     * Dropped tasks for gossip message TCP Connections
+     */
+    public Map<String, Long> getGossipMessageDroppedTasks();
+
+    /**
      * dropped message counts for server lifetime
      */
     public Map<String, Integer> getDroppedMessages();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index d388f29..0e6d2cc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -44,6 +44,8 @@ public class OutboundTcpConnectionPool
     private final CountDownLatch started;
     public final OutboundTcpConnection smallMessages;
     public final OutboundTcpConnection largeMessages;
+    public final OutboundTcpConnection gossipMessages;
+
     // pointer to the reset Address.
     private InetAddress resetEndpoint;
     private ConnectionMetrics metrics;
@@ -56,6 +58,7 @@ public class OutboundTcpConnectionPool
 
         smallMessages = new OutboundTcpConnection(this);
         largeMessages = new OutboundTcpConnection(this);
+        gossipMessages = new OutboundTcpConnection(this);
     }
 
     /**
@@ -64,6 +67,8 @@ public class OutboundTcpConnectionPool
      */
     OutboundTcpConnection getConnection(MessageOut msg)
     {
+        if (Stage.GOSSIP == msg.getStage())
+            return gossipMessages;
         return msg.payloadSize(smallMessages.getTargetVersion()) > LARGE_MESSAGE_THRESHOLD
                ? largeMessages
                : smallMessages;
@@ -71,13 +76,13 @@ public class OutboundTcpConnectionPool
 
     void reset()
     {
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages
})
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages,
gossipMessages })
             conn.closeSocket(false);
     }
 
     public void resetToNewerVersion(int version)
     {
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages
})
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages,
gossipMessages })
         {
             if (version > conn.getTargetVersion())
                 conn.softCloseSocket();
@@ -93,7 +98,7 @@ public class OutboundTcpConnectionPool
     {
         SystemKeyspace.updatePreferredIP(id, remoteEP);
         resetEndpoint = remoteEP;
-        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages
})
+        for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages,
gossipMessages })
             conn.softCloseSocket();
 
         // release previous metrics and create new one with reset address
@@ -170,6 +175,7 @@ public class OutboundTcpConnectionPool
     {
         smallMessages.start();
         largeMessages.start();
+        gossipMessages.start();
 
         metrics = new ConnectionMetrics(id, this);
 
@@ -203,6 +209,8 @@ public class OutboundTcpConnectionPool
             largeMessages.closeSocket(true);
         if (smallMessages != null)
             smallMessages.closeSocket(true);
+        if (gossipMessages != null)
+            gossipMessages.closeSocket(true);
 
         metrics.release();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9a6ee93f/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
index cd2f1a2..3e06ca0 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
@@ -110,6 +110,14 @@ public class NetStats extends NodeToolCmd
             for (long n : ms.getSmallMessageCompletedTasks().values())
                 completed += n;
             System.out.printf("%-25s%10s%10s%15s%n", "Small messages", "n/a", pending, completed);
+
+            pending = 0;
+            for (int n : ms.getGossipMessagePendingTasks().values())
+                pending += n;
+            completed = 0;
+            for (long n : ms.getGossipMessageCompletedTasks().values())
+                completed += n;
+            System.out.printf("%-25s%10s%10s%15s%n", "Gossip messages", "n/a", pending, completed);
         }
     }
 }
\ No newline at end of file


Mime
View raw message