Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 780B41860B for ; Fri, 24 Jul 2015 20:51:44 +0000 (UTC) Received: (qmail 76182 invoked by uid 500); 24 Jul 2015 20:51:42 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 76101 invoked by uid 500); 24 Jul 2015 20:51:42 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 75651 invoked by uid 99); 24 Jul 2015 20:51:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Jul 2015 20:51:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7DEAEE6855; Fri, 24 Jul 2015 20:51:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brandonwilliams@apache.org To: commits@cassandra.apache.org Date: Fri, 24 Jul 2015 20:51:50 -0000 Message-Id: <9db87b0ac2c7429bb5db241190b1652e@git.apache.org> In-Reply-To: <260f1f80e5d44e189c6c495220f2d5b3@git.apache.org> References: <260f1f80e5d44e189c6c495220f2d5b3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/15] cassandra git commit: Log when messages are dropped due to cross_node_timeout Log when messages are dropped due to cross_node_timeout Patch by Stefania, reviewed by brandonwilliams for CASSANDRA-9793 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9ed0f30a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9ed0f30a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9ed0f30a Branch: refs/heads/cassandra-2.2 Commit: 9ed0f30a4df488e644cb048d39e31e7ae8c23c5b Parents: 8a0fcb8 Author: Brandon Williams Authored: Fri Jul 24 15:43:37 2015 -0500 Committer: Brandon Williams Committed: Fri Jul 24 15:43:37 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/net/IncomingTcpConnection.java | 9 +- .../cassandra/net/MessageDeliveryTask.java | 10 +- .../apache/cassandra/net/MessagingService.java | 150 +++++++++++++------ .../cassandra/net/MessagingServiceTest.java | 39 +++++ 5 files changed, 155 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed0f30a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 69a7b31..9cb7b83 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ * Remove repair snapshot leftover on startup (CASSANDRA-7357) * Use random nodes for batch log when only 2 racks (CASSANDRA-8735) Merged from 2.0: + * Log when messages are dropped due to cross_node_timeout (CASSANDRA-9793) * checkForEndpointCollision fails for legitimate collisions (CASSANDRA-9765) * Complete CASSANDRA-8448 fix (CASSANDRA-9519) * Don't include auth credentials in debug log (CASSANDRA-9682) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed0f30a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 2456050..09ce717 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -180,10 +180,15 @@ public class IncomingTcpConnection extends Thread implements Closeable id = input.readInt(); long timestamp = System.currentTimeMillis(); + boolean isCrossNodeTimestamp = false; // make sure to readInt, even if cross_node_to is not enabled int partial = input.readInt(); if (DatabaseDescriptor.hasCrossNodeTimeout()) - timestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); + { + long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); + isCrossNodeTimestamp = (timestamp != crossNodeTimestamp); + timestamp = crossNodeTimestamp; + } MessageIn message = MessageIn.read(input, version, id); if (message == null) @@ -193,7 +198,7 @@ public class IncomingTcpConnection extends Thread implements Closeable } if (version <= MessagingService.current_version) { - MessagingService.instance().receive(message, id, timestamp); + MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed0f30a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java index 982f17e..06caf94 100644 --- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java +++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java @@ -29,15 +29,17 @@ public class MessageDeliveryTask implements Runnable private static final Logger logger = LoggerFactory.getLogger(MessageDeliveryTask.class); private final MessageIn message; - private final long constructionTime; private final int id; + private final long constructionTime; + private final boolean isCrossNodeTimestamp; - public MessageDeliveryTask(MessageIn message, int id, long timestamp) + public MessageDeliveryTask(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp) { assert message != null; this.message = message; this.id = id; - constructionTime = timestamp; + this.constructionTime = timestamp; + this.isCrossNodeTimestamp = isCrossNodeTimestamp; } public void run() @@ -46,7 +48,7 @@ public class MessageDeliveryTask implements Runnable if (MessagingService.DROPPABLE_VERBS.contains(verb) && System.currentTimeMillis() > constructionTime + message.getTimeout()) { - MessagingService.instance().incrementDroppedMessages(verb); + MessagingService.instance().incrementDroppedMessages(verb, isCrossNodeTimestamp); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed0f30a/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 1820c5c..c65c44c 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.management.MBeanServer; import javax.management.ObjectName; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -123,7 +124,7 @@ public final class MessagingService implements MessagingServiceMBean SNAPSHOT, // Similar to nt snapshot MIGRATION_REQUEST, GOSSIP_SHUTDOWN, - _TRACE, // dummy verb so we can use MS.droppedMessages + _TRACE, // dummy verb so we can use MS.droppedMessagesMap ECHO, REPAIR_MESSAGE, // use as padding for backwards compatability where a previous version needs to validate a verb from the future. @@ -296,10 +297,23 @@ public final class MessagingService implements MessagingServiceMBean Verb.PAGED_RANGE, Verb.REQUEST_RESPONSE); + + private static final class DroppedMessages + { + final DroppedMessageMetrics metrics; + final AtomicInteger droppedInternalTimeout; + final AtomicInteger droppedCrossNodeTimeout; + + DroppedMessages(Verb verb) + { + this.metrics = new DroppedMessageMetrics(verb); + this.droppedInternalTimeout = new AtomicInteger(0); + this.droppedCrossNodeTimeout = new AtomicInteger(0); + } + + } // total dropped message counts for server lifetime - private final Map droppedMessages = new EnumMap(Verb.class); - // dropped count when last requested for the Recent api. high concurrency isn't necessary here. - private final Map lastDroppedInternal = new EnumMap(Verb.class); + private final Map droppedMessagesMap = new EnumMap<>(Verb.class); private final List subscribers = new ArrayList(); @@ -308,31 +322,42 @@ public final class MessagingService implements MessagingServiceMBean private static class MSHandle { - public static final MessagingService instance = new MessagingService(); + public static final MessagingService instance = new MessagingService(false); } + public static MessagingService instance() { return MSHandle.instance; } - private MessagingService() + private static class MSTestHandle + { + public static final MessagingService instance = new MessagingService(true); + } + + static MessagingService test() + { + return MSTestHandle.instance; + } + + private MessagingService(boolean testOnly) { for (Verb verb : DROPPABLE_VERBS) - { - droppedMessages.put(verb, new DroppedMessageMetrics(verb)); - lastDroppedInternal.put(verb, 0); - } + droppedMessagesMap.put(verb, new DroppedMessages(verb)); listenGate = new SimpleCondition(); verbHandlers = new EnumMap(Verb.class); - Runnable logDropped = new Runnable() + if (!testOnly) { - public void run() + Runnable logDropped = new Runnable() { - logDroppedMessages(); - } - }; - ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); + public void run() + { + logDroppedMessages(); + } + }; + ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS); + } Function>, ?> timeoutReporter = new Function>, Object>() { @@ -363,16 +388,19 @@ public final class MessagingService implements MessagingServiceMBean } }; - callbacks = new ExpiringMap(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter); + callbacks = new ExpiringMap<>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter); - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try + if (!testOnly) { - mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); - } - catch (Exception e) - { - throw new RuntimeException(e); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName(MBEAN_NAME)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } } @@ -724,7 +752,7 @@ public final class MessagingService implements MessagingServiceMBean } } - public void receive(MessageIn message, int id, long timestamp) + public void receive(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp) { TraceState state = Tracing.instance.initializeFromMessage(message); if (state != null) @@ -738,7 +766,7 @@ public final class MessagingService implements MessagingServiceMBean return; } - Runnable runnable = new MessageDeliveryTask(message, id, timestamp); + Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp); TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType()); assert stage != null : "No stage for message type " + message.verb; @@ -856,8 +884,13 @@ public final class MessagingService implements MessagingServiceMBean public void incrementDroppedMessages(Verb verb) { + incrementDroppedMessages(verb, false); + } + + public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout) + { assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped"; - droppedMessages.get(verb).dropped.mark(); + incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout); } /** @@ -865,34 +898,55 @@ public final class MessagingService implements MessagingServiceMBean */ private void incrementRejectedMessages(Verb verb) { - DroppedMessageMetrics metrics = droppedMessages.get(verb); - if (metrics == null) + DroppedMessages droppedMessages = droppedMessagesMap.get(verb); + if (droppedMessages == null) { - metrics = new DroppedMessageMetrics(verb); - droppedMessages.put(verb, metrics); + droppedMessages = new DroppedMessages(verb); + droppedMessagesMap.put(verb, droppedMessages); } - metrics.dropped.mark(); + incrementDroppedMessages(droppedMessagesMap.get(verb), false); + } + + private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout) + { + droppedMessages.metrics.dropped.mark(); + if (isCrossNodeTimeout) + droppedMessages.droppedCrossNodeTimeout.incrementAndGet(); + else + droppedMessages.droppedInternalTimeout.incrementAndGet(); } private void logDroppedMessages() { - boolean logTpstats = false; - for (Map.Entry entry : droppedMessages.entrySet()) + List logs = getDroppedMessagesLogs(); + for (String log : logs) + logger.error(log); + + if (logs.size() > 0) + StatusLogger.log(); + } + + @VisibleForTesting + List getDroppedMessagesLogs() + { + List ret = new ArrayList<>(); + for (Map.Entry entry : droppedMessagesMap.entrySet()) { - int dropped = (int) entry.getValue().dropped.count(); Verb verb = entry.getKey(); - int recent = dropped - lastDroppedInternal.get(verb); - if (recent > 0) + DroppedMessages droppedMessages = entry.getValue(); + + int droppedInternalTimeout = droppedMessages.droppedInternalTimeout.getAndSet(0); + int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0); + if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0) { - logTpstats = true; - logger.info("{} {} messages dropped in last {}ms", - new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS}); - lastDroppedInternal.put(verb, dropped); + ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout", + verb, + LOG_DROPPED_INTERVAL_IN_MS, + droppedInternalTimeout, + droppedCrossNodeTimeout)); } } - - if (logTpstats) - StatusLogger.log(); + return ret; } private static class SocketThread extends Thread @@ -1023,16 +1077,16 @@ public final class MessagingService implements MessagingServiceMBean public Map getDroppedMessages() { Map map = new HashMap(); - for (Map.Entry entry : droppedMessages.entrySet()) - map.put(entry.getKey().toString(), (int) entry.getValue().dropped.count()); + for (Map.Entry entry : droppedMessagesMap.entrySet()) + map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.count()); return map; } public Map getRecentlyDroppedMessages() { Map map = new HashMap(); - for (Map.Entry entry : droppedMessages.entrySet()) - map.put(entry.getKey().toString(), entry.getValue().getRecentlyDropped()); + for (Map.Entry entry : droppedMessagesMap.entrySet()) + map.put(entry.getKey().toString(), entry.getValue().metrics.getRecentlyDropped()); return map; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9ed0f30a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java new file mode 100644 index 0000000..04dacf3 --- /dev/null +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -0,0 +1,39 @@ +package org.apache.cassandra.net; + +import java.util.List; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class MessagingServiceTest +{ + private final MessagingService messagingService = MessagingService.test(); + + @Test + public void testDroppedMessages() + { + MessagingService.Verb verb = MessagingService.Verb.READ; + + for (int i = 0; i < 5000; i++) + messagingService.incrementDroppedMessages(verb, i % 2 == 0); + + List logs = messagingService.getDroppedMessagesLogs(); + assertEquals(1, logs.size()); + assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout", logs.get(0)); + assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString())); + assertEquals(5000, (int) messagingService.getRecentlyDroppedMessages().get(verb.toString())); + + logs = messagingService.getDroppedMessagesLogs(); + assertEquals(0, logs.size()); + + for (int i = 0; i < 2500; i++) + messagingService.incrementDroppedMessages(verb, i % 2 == 0); + + logs = messagingService.getDroppedMessagesLogs(); + assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout", logs.get(0)); + assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString())); + assertEquals(2500, (int) messagingService.getRecentlyDroppedMessages().get(verb.toString())); + } + +}