cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmcken...@apache.org
Subject cassandra git commit: Add latency logging for dropped messages
Date Wed, 23 Dec 2015 18:16:49 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 04a99ab84 -> 3c8d87f43


Add latency logging for dropped messages

Patch by akale; reviewed by pmotta for CASSANDRA-10580


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c8d87f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c8d87f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c8d87f4

Branch: refs/heads/trunk
Commit: 3c8d87f4324e5ff8bf6b1c3652e9c5eacf03bc20
Parents: 04a99ab
Author: anubhavkale <anubhavk@microsoft.com>
Authored: Thu Dec 10 12:28:45 2015 -0800
Committer: Joshua McKenzie <jmckenzie@apache.org>
Committed: Wed Dec 23 13:15:10 2015 -0500

----------------------------------------------------------------------
 .../cassandra/net/MessageDeliveryTask.java      | 42 +++++++++++++++++--
 .../apache/cassandra/service/StorageProxy.java  | 44 ++++++++++++++++++--
 2 files changed, 79 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8d87f4/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index 818cfc6..bede3d8 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -18,11 +18,13 @@
 package org.apache.cassandra.net;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.index.IndexNotAvailableException;
@@ -43,10 +45,11 @@ public class MessageDeliveryTask implements Runnable
 
     public void run()
     {
+        long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp;
         MessagingService.Verb verb = message.verb;
-        if (MessagingService.DROPPABLE_VERBS.contains(verb)
-            && System.currentTimeMillis() > message.constructionTime.timestamp
+ message.getTimeout())
+        if (MessagingService.DROPPABLE_VERBS.contains(verb)&& message.getTimeout()
> timeTaken)
         {
+            LogDroppedMessageDetails(timeTaken);
             MessagingService.instance().incrementDroppedMessages(message);
             return;
         }
@@ -82,6 +85,37 @@ public class MessageDeliveryTask implements Runnable
             Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp);
     }
 
+    private void LogDroppedMessageDetails(long timeTaken)
+    {
+        logger.debug("MessageDeliveryTask ran after {} ms, allowed time was {} ms. Dropping
message {}",
+                timeTaken, message.getTimeout(), message.toString());
+        // Print KS and CF if Payload is mutation or a list of mutations (sent due to schema
announcements)
+        IMutation mutation;
+        if (message.payload instanceof IMutation)
+        {
+            mutation = (IMutation)message.payload;
+            if (mutation != null)
+            {
+                logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}", mutation.getKeyspaceName(),
Arrays.toString(mutation.getColumnFamilyIds().toArray()));
+            }
+        }
+        else if (message.payload instanceof Collection<?>)
+        {
+            Collection<?> payloadItems = (Collection<?>)message.payload;
+            for (Object payloadItem : payloadItems)
+            {
+                if (payloadItem instanceof IMutation)
+                {
+                    mutation = (IMutation)payloadItem;
+                    if (mutation != null)
+                    {
+                        logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}",
mutation.getKeyspaceName(), Arrays.toString(mutation.getColumnFamilyIds().toArray()));
+                    }
+                }
+            }
+        }
+    }
+
     private void handleFailure(Throwable t)
     {
         if (message.doCallbackOnFailure())
@@ -95,4 +129,4 @@ public class MessageDeliveryTask implements Runnable
     private static final EnumSet<MessagingService.Verb> GOSSIP_VERBS = EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                                                   MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                                                   MessagingService.Verb.GOSSIP_DIGEST_SYN);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8d87f4/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f161607..1c30cd7 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
-            performLocally(stage, mutation::apply, responseHandler);
+            performLocally(stage, mutation, mutation::apply, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1286,6 +1286,27 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
+    private static void performLocally(Stage stage, IMutation mutation, final Runnable runnable,
final IAsyncCallbackWithFailure<?> handler)
+    {
+        StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(mutation)
+        {
+            public void runMayThrow()
+            {
+                try
+                {
+                    runnable.run();
+                    handler.response(null);
+                }
+                catch (Exception ex)
+                {
+                    if (!(ex instanceof WriteTimeoutException))
+                        logger.error("Failed to apply mutation locally : {}", ex);
+                    handler.onFailure(FBUtilities.getBroadcastAddress());
+                }
+            }
+        });
+    }
+
     /**
      * Handle counter mutation on the coordinator host.
      *
@@ -2408,11 +2429,28 @@ public class StorageProxy implements StorageProxyMBean
     private static abstract class LocalMutationRunnable implements Runnable
     {
         private final long constructionTime = System.currentTimeMillis();
+        private IMutation mutation;
+
+        public LocalMutationRunnable(IMutation mutation)
+        {
+            this.mutation = mutation;
+        }
+
+        public LocalMutationRunnable()
+        {
+        }
 
         public final void run()
         {
-            if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION))
+            long mutationTimeout = DatabaseDescriptor.getTimeout(MessagingService.Verb.MUTATION);
+            if (System.currentTimeMillis() > constructionTime + mutationTimeout)
             {
+                long timeTaken = System.currentTimeMillis() - constructionTime;
+                logger.debug("LocalMutationRunnable thread ran after {} ms, allowed time
was {} ms. ", timeTaken, mutationTimeout);
+                if (this.mutation != null)
+                {
+                    logger.debug("MessageDeliveryTask dropped mutation of KS {}, CF {}",
this.mutation.getKeyspaceName(), Arrays.toString(this.mutation.getColumnFamilyIds().toArray()));
+                }
                 MessagingService.instance().incrementDroppedMessages(MessagingService.Verb.MUTATION);
                 HintRunnable runnable = new HintRunnable(Collections.singleton(FBUtilities.getBroadcastAddress()))
                 {
@@ -2596,4 +2634,4 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }
-}
+}
\ No newline at end of file


Mime
View raw message