cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [13/15] cassandra git commit: Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
Date Fri, 24 Jul 2015 20:51:54 GMT
Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)

Patch by Stefania, reviewed by brandonwilliams for CASSANDRA-9793


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6f0c7d9f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6f0c7d9f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6f0c7d9f

Branch: refs/heads/trunk
Commit: 6f0c7d9f451c653f3a728bd8ee597fc32bd48d2c
Parents: 14ef5a4
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Fri Jul 24 15:46:03 2015 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Fri Jul 24 15:46:03 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../cassandra/net/IncomingTcpConnection.java    |   9 +-
 .../cassandra/net/MessageDeliveryTask.java      |  10 +-
 .../apache/cassandra/net/MessagingService.java  | 137 +++++++++++++------
 .../cassandra/net/MessagingServiceTest.java     |  37 +++++
 5 files changed, 147 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c7d9f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b8593c0..17ec705 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,8 @@
  * UDF / UDA execution time in trace (CASSANDRA-9723)
  * Remove repair snapshot leftover on startup (CASSANDRA-7357)
  * Use random nodes for batch log when only 2 racks (CASSANDRA-8735)
+Merged from 2.0:
+ * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793)
 
 2.2.0
  * Fix cqlsh copy methods and other windows specific issues (CASSANDRA-9795) 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c7d9f/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index c325717..375da64 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -181,10 +181,15 @@ public class IncomingTcpConnection extends Thread implements Closeable
             id = input.readInt();
 
         long timestamp = System.currentTimeMillis();
+        boolean isCrossNodeTimestamp = false;
         // make sure to readInt, even if cross_node_to is not enabled
         int partial = input.readInt();
         if (DatabaseDescriptor.hasCrossNodeTimeout())
-            timestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL)
<< 2) >> 2);
+        {
+            long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial
& 0xFFFFFFFFL) << 2) >> 2);
+            isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
+            timestamp = crossNodeTimestamp;
+        }
 
         MessageIn message = MessageIn.read(input, version, id);
         if (message == null)
@@ -194,7 +199,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         }
         if (version <= MessagingService.current_version)
         {
-            MessagingService.instance().receive(message, id, timestamp);
+            MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c7d9f/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index f160464..7f30797 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -31,15 +31,17 @@ public class MessageDeliveryTask implements Runnable
     private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class);
 
     private final MessageIn message;
-    private final long constructionTime;
     private final int id;
+    private final long constructionTime;
+    private final boolean isCrossNodeTimestamp;
 
-    public MessageDeliveryTask(MessageIn message, int id, long timestamp)
+    public MessageDeliveryTask(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
     {
         assert message != null;
         this.message = message;
         this.id = id;
-        constructionTime = timestamp;
+        this.constructionTime = timestamp;
+        this.isCrossNodeTimestamp = isCrossNodeTimestamp;
     }
 
     public void run()
@@ -48,7 +50,7 @@ public class MessageDeliveryTask implements Runnable
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
             && System.currentTimeMillis() > constructionTime + message.getTimeout())
         {
-            MessagingService.instance().incrementDroppedMessages(verb);
+            MessagingService.instance().incrementDroppedMessages(verb, isCrossNodeTimestamp);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c7d9f/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index dec7550..e3ad8c0 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -125,7 +126,7 @@ public final class MessagingService implements MessagingServiceMBean
         SNAPSHOT, // Similar to nt snapshot
         MIGRATION_REQUEST,
         GOSSIP_SHUTDOWN,
-        _TRACE, // dummy verb so we can use MS.droppedMessages
+        _TRACE, // dummy verb so we can use MS.droppedMessagesMap
         ECHO,
         REPAIR_MESSAGE,
         // use as padding for backwards compatability where a previous version needs to validate
a verb from the future.
@@ -297,10 +298,23 @@ public final class MessagingService implements MessagingServiceMBean
                                                                    Verb.PAGED_RANGE,
                                                                    Verb.REQUEST_RESPONSE);
 
+
+    private static final class DroppedMessages
+    {
+        final DroppedMessageMetrics metrics;
+        final AtomicInteger droppedInternalTimeout;
+        final AtomicInteger droppedCrossNodeTimeout;
+
+        DroppedMessages(Verb verb)
+        {
+            this.metrics = new DroppedMessageMetrics(verb);
+            this.droppedInternalTimeout = new AtomicInteger(0);
+            this.droppedCrossNodeTimeout = new AtomicInteger(0);
+        }
+
+    }
     // total dropped message counts for server lifetime
-    private final Map<Verb, DroppedMessageMetrics> droppedMessages = new EnumMap<Verb,
DroppedMessageMetrics>(Verb.class);
-    // dropped count when last requested for the Recent api.  high concurrency isn't necessary
here.
-    private final Map<Verb, Integer> lastDroppedInternal = new EnumMap<Verb, Integer>(Verb.class);
+    private final Map<Verb, DroppedMessages> droppedMessagesMap = new EnumMap<>(Verb.class);
 
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
 
@@ -322,7 +336,7 @@ public final class MessagingService implements MessagingServiceMBean
 
     private static class MSHandle
     {
-        public static final MessagingService instance = new MessagingService();
+        public static final MessagingService instance = new MessagingService(false);
     }
 
     public static MessagingService instance()
@@ -330,24 +344,34 @@ public final class MessagingService implements MessagingServiceMBean
         return MSHandle.instance;
     }
 
-    private MessagingService()
+    private static class MSTestHandle
+    {
+        public static final MessagingService instance = new MessagingService(true);
+    }
+
+    static MessagingService test()
+    {
+        return MSTestHandle.instance;
+    }
+
+    private MessagingService(boolean testOnly)
     {
         for (Verb verb : DROPPABLE_VERBS)
-        {
-            droppedMessages.put(verb, new DroppedMessageMetrics(verb));
-            lastDroppedInternal.put(verb, 0);
-        }
+            droppedMessagesMap.put(verb, new DroppedMessages(verb));
 
         listenGate = new SimpleCondition();
         verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class);
-        Runnable logDropped = new Runnable()
+        if (!testOnly)
         {
-            public void run()
+            Runnable logDropped = new Runnable()
             {
-                logDroppedMessages();
-            }
-        };
-        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS,
LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+                public void run()
+                {
+                    logDroppedMessages();
+                }
+            };
+            ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS,
LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        }
 
         Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>,
?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>,
Object>()
         {
@@ -378,16 +402,19 @@ public final class MessagingService implements MessagingServiceMBean
             }
         };
 
-        callbacks = new ExpiringMap<Integer, CallbackInfo>(DatabaseDescriptor.getMinRpcTimeout(),
timeoutReporter);
+        callbacks = new ExpiringMap<>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
 
-        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
-        try
-        {
-            mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
-        }
-        catch (Exception e)
+        if (!testOnly)
         {
-            throw new RuntimeException(e);
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            try
+            {
+                mbs.registerMBean(this, new ObjectName(MBEAN_NAME));
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -741,7 +768,7 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
-    public void receive(MessageIn message, int id, long timestamp)
+    public void receive(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp)
     {
         TraceState state = Tracing.instance.initializeFromMessage(message);
         if (state != null)
@@ -752,7 +779,7 @@ public final class MessagingService implements MessagingServiceMBean
             if (!ms.allowIncomingMessage(message, id))
                 return;
 
-        Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
+        Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
         TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
 
@@ -842,29 +869,55 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void incrementDroppedMessages(Verb verb)
     {
+        incrementDroppedMessages(verb, false);
+    }
+
+    public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout)
+    {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be
dropped";
-        droppedMessages.get(verb).dropped.mark();
+        incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
+    }
+
+    private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout)
+    {
+        droppedMessages.metrics.dropped.mark();
+        if (isCrossNodeTimeout)
+            droppedMessages.droppedCrossNodeTimeout.incrementAndGet();
+        else
+            droppedMessages.droppedInternalTimeout.incrementAndGet();
     }
 
     private void logDroppedMessages()
     {
-        boolean logTpstats = false;
-        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
+        List<String> logs = getDroppedMessagesLogs();
+        for (String log : logs)
+            logger.error(log);
+
+        if (logs.size() > 0)
+            StatusLogger.log();
+    }
+
+    @VisibleForTesting
+    List<String> getDroppedMessagesLogs()
+    {
+        List<String> ret = new ArrayList<>();
+        for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
         {
-            int dropped = (int) entry.getValue().dropped.getCount();
             Verb verb = entry.getKey();
-            int recent = dropped - lastDroppedInternal.get(verb);
-            if (recent > 0)
+            DroppedMessages droppedMessages = entry.getValue();
+
+            int droppedInternalTimeout = droppedMessages.droppedInternalTimeout.getAndSet(0);
+            int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0);
+            if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0)
             {
-                logTpstats = true;
-                logger.info("{} {} messages dropped in last {}ms",
-                             new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS});
-                lastDroppedInternal.put(verb, dropped);
+                ret.add(String.format("%s messages were dropped in last %d ms: %d for internal
timeout and %d for cross node timeout",
+                                      verb,
+                                      LOG_DROPPED_INTERVAL_IN_MS,
+                                      droppedInternalTimeout,
+                                      droppedCrossNodeTimeout));
             }
         }
-
-        if (logTpstats)
-            StatusLogger.log();
+        return ret;
     }
 
     private static class SocketThread extends Thread
@@ -1016,9 +1069,9 @@ public final class MessagingService implements MessagingServiceMBean
 
     public Map<String, Integer> getDroppedMessages()
     {
-        Map<String, Integer> map = new HashMap<String, Integer>(droppedMessages.size());
-        for (Map.Entry<Verb, DroppedMessageMetrics> entry : droppedMessages.entrySet())
-            map.put(entry.getKey().toString(), (int) entry.getValue().dropped.getCount());
+        Map<String, Integer> map = new HashMap<>(droppedMessagesMap.size());
+        for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
+            map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.getCount());
         return map;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6f0c7d9f/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
new file mode 100644
index 0000000..75c146e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -0,0 +1,37 @@
+package org.apache.cassandra.net;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MessagingServiceTest
+{
+    private final MessagingService messagingService = MessagingService.test();
+
+    @Test
+    public void testDroppedMessages()
+    {
+        MessagingService.Verb verb = MessagingService.Verb.READ;
+
+        for (int i = 0; i < 5000; i++)
+            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+
+        List<String> logs = messagingService.getDroppedMessagesLogs();
+        assertEquals(1, logs.size());
+        assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout
and 2500 for cross node timeout", logs.get(0));
+        assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString()));
+
+        logs = messagingService.getDroppedMessagesLogs();
+        assertEquals(0, logs.size());
+
+        for (int i = 0; i < 2500; i++)
+            messagingService.incrementDroppedMessages(verb, i % 2 == 0);
+
+        logs = messagingService.getDroppedMessagesLogs();
+        assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout
and 1250 for cross node timeout", logs.get(0));
+        assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
+    }
+
+}


Mime
View raw message