cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Christian Esken (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (CASSANDRA-13265) Expiration in OutboundTcpConnection can block the reader Thread
Date Mon, 10 Apr 2017 16:17:41 GMT

     [ https://issues.apache.org/jira/browse/CASSANDRA-13265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Christian Esken updated CASSANDRA-13265:
----------------------------------------
    Status: Patch Available  (was: Open)

>From 6bd3f3fc3b2da3a66b53a94a819446a9ea8ea2cf Mon Sep 17 00:00:00 2001
From: Christian Esken <Christian.Esken@trivago.com>
Date: Wed, 1 Mar 2017 15:56:36 +0100
Subject: [PATCH] Expire OTC messages by a single Thread

This patch consists of the following aspects related to OutboundTcpConnection:
- Backlog queue expiration by a single Thread
- Drop count statistics
- QueuedMessage.isTimedOut() fix

When backlog queue expiration is done, one single Thread is elected to do the
work. Previously, all Threads would go in and do the same work,
producing high lock contention. The Thread reading from the Queue could
even be starved by not be able to acquire the read lock.
Backlog queue is inspected every otc_backlog_expiration_interval_ms
milliseconds if its size exceeds BACKLOG_PURGE_SIZE. Added unit tests
for OutboundTcpConnection.

Timed out messages are counted in the dropped statistics. Additionally
count the dropped messages when it is not possible to write to the
socket, e.g. if there is no connection because a target node is down.

Fix QueuedMessage.isTimedOut(), which had used a "a < b" comparison on
nano time values, which can be wrong due to wrapping of System.nanoTime().

CASSANDRA-13265
---
 conf/cassandra.yaml                                |   9 ++
 src/java/org/apache/cassandra/config/Config.java   |   6 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 ++
 .../cassandra/net/OutboundTcpConnection.java       | 113 +++++++++++---
 .../org/apache/cassandra/service/StorageProxy.java |  10 +-
 .../cassandra/service/StorageProxyMBean.java       |   3 +
 .../cassandra/net/OutboundTcpConnectionTest.java   | 170 +++++++++++++++++++++
 7 files changed, 294 insertions(+), 27 deletions(-)
 create mode 100644 test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java

diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 790dfd743b..9c1510b66a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -985,3 +985,12 @@ windows_timer_interval: 1
 
 # Do not try to coalesce messages if we already got that many messages. This should be more
than 2 and less than 128.
 # otc_coalescing_enough_coalesced_messages: 8
+
+# How many milliseconds to wait between two expiration runs on the backlog (queue) of the
OutboundTcpConnection.
+# Expiration is done if messages are piling up in the backlog. Droppable messages are expired
to free the memory
+# taken by expired messages. The interval should be between 0 and 1000, and in most installations
the default value
+# will be appropriate. A smaller value could potentially expire messages slightly sooner
at the expense of more CPU
+# time and queue contention while iterating the backlog of messages.
+# An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
+#
+# otc_backlog_expiration_interval_ms: 200
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 9aaf7ae33e..6a99cd3cbd 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -298,6 +298,12 @@ public class Config
     public int otc_coalescing_window_us = otc_coalescing_window_us_default;
     public int otc_coalescing_enough_coalesced_messages = 8;
 
+    /**
+     * Backlog expiration interval in milliseconds for the OutboundTcpConnection.
+     */
+    public static final int otc_backlog_expiration_interval_ms_default = 200;
+    public volatile int otc_backlog_expiration_interval_ms = otc_backlog_expiration_interval_ms_default;
+ 
     public int windows_timer_interval = 0;
 
     public boolean enable_user_defined_functions = false;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 602214f3c6..e9e54c3e20 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1967,6 +1967,16 @@ public class DatabaseDescriptor
         conf.otc_coalescing_enough_coalesced_messages = otc_coalescing_enough_coalesced_messages;
     }
 
+    public static int getOtcBacklogExpirationInterval()
+    {
+        return conf.otc_backlog_expiration_interval_ms;
+    }
+
+    public static void setOtcBacklogExpirationInterval(int intervalInMillis)
+    {
+        conf.otc_backlog_expiration_interval_ms = intervalInMillis;
+    }
+ 
     public static int getWindowsTimerInterval()
     {
         return conf.windows_timer_interval;
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 46083994df..99ad194b94 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Checksum;
@@ -62,6 +63,7 @@ import org.xerial.snappy.SnappyOutputStream;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 public class OutboundTcpConnection extends Thread
@@ -116,9 +118,14 @@ public class OutboundTcpConnection extends Thread
         if (coalescingWindow < 0)
             throw new ExceptionInInitializerError(
                     "Value provided for coalescing window must be greather than 0: " + coalescingWindow);
+
+        int otc_backlog_expiration_interval_in_ms = DatabaseDescriptor.getOtcBacklogExpirationInterval();
+        if (otc_backlog_expiration_interval_in_ms != Config.otc_backlog_expiration_interval_ms_default)
+            logger.info("OutboundTcpConnection backlog expiration interval set to to {}ms",
otc_backlog_expiration_interval_in_ms);
+
     }
 
-    private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
+    private static final MessageOut<?> CLOSE_SENTINEL = new MessageOut<MessagingService.Verb>(MessagingService.Verb.INTERNAL_RESPONSE);
     private volatile boolean isStopped = false;
 
     private static final int OPEN_RETRY_DELAY = 100; // ms between retries
@@ -128,6 +135,11 @@ public class OutboundTcpConnection extends Thread
     static final int LZ4_HASH_SEED = 0x9747b28c;
 
     private final BlockingQueue<QueuedMessage> backlog = new LinkedBlockingQueue<>();
+    private static final String BACKLOG_PURGE_SIZE_PROPERTY = PREFIX + "otc_backlog_purge_size";
+    @VisibleForTesting
+    static final int BACKLOG_PURGE_SIZE = Integer.getInteger(BACKLOG_PURGE_SIZE_PROPERTY,
1024);
+    private final AtomicBoolean backlogExpirationActive = new AtomicBoolean(false);
+    private volatile long backlogNextExpirationTime;
 
     private final OutboundTcpConnectionPool poolReference;
 
@@ -164,11 +176,11 @@ public class OutboundTcpConnection extends Thread
 
     public void enqueue(MessageOut<?> message, int id)
     {
-        if (backlog.size() > 1024)
-            expireMessages();
+        long nanoTime = System.nanoTime();
+        expireMessages(nanoTime);
         try
         {
-            backlog.put(new QueuedMessage(message, id));
+            backlog.put(new QueuedMessage(message, id, nanoTime));
         }
         catch (InterruptedException e)
         {
@@ -176,6 +188,18 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
+    /**
+     * This is a helper method for unit testing. Disclaimer: Do not use this method outside
unit tests, as
+     * this method is iterating the queue which can be an expensive operation (CPU time,
queue locking).
+     * 
+     * @return true, if the queue contains at least one expired element
+     */
+    @VisibleForTesting // (otherwise = VisibleForTesting.NONE)
+    boolean backlogContainsExpiredMessages(long nowNanos)
+    {
+        return backlog.stream().anyMatch(entry -> entry.isTimedOut(nowNanos));
+    }
+
     void closeSocket(boolean destroyThread)
     {
         isStopped = destroyThread; // Exit loop to stop the thread
@@ -214,9 +238,8 @@ public class OutboundTcpConnection extends Thread
                 throw new AssertionError(e);
             }
 
-            currentMsgBufferCount = drainedMessages.size();
+            int count = currentMsgBufferCount = drainedMessages.size();
 
-            int count = drainedMessages.size();
             //The timestamp of the first message has already been provided to the coalescing
strategy
             //so skip logging it.
             inner:
@@ -233,14 +256,16 @@ public class OutboundTcpConnection extends Thread
                         continue;
                     }
 
-                    if (qm.isTimedOut())
+                    if (qm.isTimedOut(System.nanoTime()))
                         dropped.incrementAndGet();
                     else if (socket != null || connect())
                         writeConnected(qm, count == 1 && backlog.isEmpty());
                     else
                     {
-                        // clear out the queue, else gossip messages back up.
-                        drainedMessages.clear();
+                        // Not connected! Clear out the queue, else gossip messages back
up. Update dropped
+                        // statistics accordingly. Hint: The statistics may be slightly too
low, if messages
+                        // are added between the calls of backlog.size() and backlog.clear()
+                        dropped.addAndGet(backlog.size());
                         backlog.clear();
                         break inner;
                     }
@@ -254,6 +279,8 @@ public class OutboundTcpConnection extends Thread
                 }
                 currentMsgBufferCount = --count;
             }
+            // Update dropped statistics by the number of unprocessed drainedMessages
+            dropped.addAndGet(currentMsgBufferCount);
             drainedMessages.clear();
         }
     }
@@ -343,7 +370,7 @@ public class OutboundTcpConnection extends Thread
         }
     }
 
-    private void writeInternal(MessageOut message, int id, long timestamp) throws IOException
+    private void writeInternal(MessageOut<?> message, int id, long timestamp) throws
IOException
     {
         out.writeInt(MessagingService.PROTOCOL_MAGIC);
 
@@ -563,18 +590,53 @@ public class OutboundTcpConnection extends Thread
         return version.get();
     }
 
-    private void expireMessages()
+    /**
+     * Expire elements from the queue if the queue is pretty full and expiration is not already
in progress.
+     * This method will only remove droppable expired entries. If no such element exists,
nothing is removed from the queue.
+     * 
+     * @param timestampNanos The current time as from System.nanoTime()
+     */
+    @VisibleForTesting
+    void expireMessages(long timestampNanos)
     {
-        Iterator<QueuedMessage> iter = backlog.iterator();
-        while (iter.hasNext())
+        if (backlog.size() <= BACKLOG_PURGE_SIZE)
+            return; // Plenty of space
+
+        if (backlogNextExpirationTime - timestampNanos > 0)
+            return; // Expiration is not due.
+
+        /**
+         * Expiration is an expensive process. Iterating the queue locks the queue for both
writes and
+         * reads during iter.next() and iter.remove(). Thus letting only a single Thread
do expiration.
+         */
+        if (backlogExpirationActive.compareAndSet(false, true))
         {
-            QueuedMessage qm = iter.next();
-            if (!qm.droppable)
-                continue;
-            if (!qm.isTimedOut())
-                return;
-            iter.remove();
-            dropped.incrementAndGet();
+            try
+            {
+                Iterator<QueuedMessage> iter = backlog.iterator();
+                while (iter.hasNext())
+                {
+                    QueuedMessage qm = iter.next();
+                    if (!qm.droppable)
+                        continue;
+                    if (!qm.isTimedOut(timestampNanos))
+                        continue;
+                    iter.remove();
+                    dropped.incrementAndGet();
+                }
+
+                if (logger.isTraceEnabled())
+                {
+                    long duration = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - timestampNanos);
+                    logger.trace("Expiration of {} took {}μs", getName(), duration);
+                }
+            }
+            finally
+            {
+                long backlogExpirationIntervalNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getOtcBacklogExpirationInterval());
+                backlogNextExpirationTime = timestampNanos + backlogExpirationIntervalNanos;
+                backlogExpirationActive.set(false);
+            }
         }
     }
 
@@ -586,18 +648,19 @@ public class OutboundTcpConnection extends Thread
         final long timestampNanos;
         final boolean droppable;
 
-        QueuedMessage(MessageOut<?> message, int id)
+        QueuedMessage(MessageOut<?> message, int id, long timestampNanos)
         {
             this.message = message;
             this.id = id;
-            this.timestampNanos = System.nanoTime();
+            this.timestampNanos = timestampNanos;
             this.droppable = MessagingService.DROPPABLE_VERBS.contains(message.verb);
         }
 
         /** don't drop a non-droppable message just because it's timestamp is expired */
-        boolean isTimedOut()
+        boolean isTimedOut(long nowNanos)
         {
-            return droppable && timestampNanos < System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+            long messageTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(message.getTimeout());
+            return droppable && nowNanos - timestampNanos  > messageTimeoutNanos;
         }
 
         boolean shouldRetry()
@@ -615,7 +678,7 @@ public class OutboundTcpConnection extends Thread
     {
         RetriedQueuedMessage(QueuedMessage msg)
         {
-            super(msg.message, msg.id);
+            super(msg.message, msg.id, msg.timestampNanos);
         }
 
         boolean shouldRetry()
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index cffd63cd8d..ea082d5f20 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -72,8 +72,6 @@ import org.apache.cassandra.triggers.TriggerExecutor;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.AbstractIterator;
 
-import static com.google.common.collect.Iterables.contains;
-
 public class StorageProxy implements StorageProxyMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.db:type=StorageProxy";
@@ -2683,4 +2681,12 @@ public class StorageProxy implements StorageProxyMBean
     public long getReadRepairRepairedBackground() {
         return ReadRepairMetrics.repairedBackground.getCount();
     }
+
+    public int getOtcBacklogExpirationInterval() {
+        return DatabaseDescriptor.getOtcBacklogExpirationInterval();
+    }
+
+    public void setOtcBacklogExpirationInterval(int intervalInMillis) {
+        DatabaseDescriptor.setOtcBacklogExpirationInterval(intervalInMillis);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 0db0ca60ff..ee82a5b1dd 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -59,6 +59,9 @@ public interface StorageProxyMBean
     public long getReadRepairRepairedBlocking();
     public long getReadRepairRepairedBackground();
 
+    public int getOtcBacklogExpirationInterval();
+    public void setOtcBacklogExpirationInterval(int intervalInMillis);
+
     /** Returns each live node's schema version */
     public Map<String, List<String>> getSchemaVersions();
 }
diff --git a/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
new file mode 100644
index 0000000000..c09ae0f07e
--- /dev/null
+++ b/test/unit/org/apache/cassandra/net/OutboundTcpConnectionTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingService.Verb;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The tests check whether Queue expiration in the OutboundTcpConnection behaves properly
for droppable and
+ * non-droppable messages.
+ */
+public class OutboundTcpConnectionTest
+{
+    AtomicInteger messageId = new AtomicInteger(0);
+
+    final static Verb VERB_DROPPABLE = Verb.MUTATION; // Droppable, 2s timeout
+    final static Verb VERB_NONDROPPABLE = Verb.GOSSIP_DIGEST_ACK; // Not droppable
+
+    final static long NANOS_FOR_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getTimeout(VERB_DROPPABLE)*2);
+
+    
+    /**
+     * Verifies our assumptions whether a Verb can be dropped or not. The tests make use
of droppabilty, and
+     * may produce wrong test results if their droppabilty is changed. 
+     */
+    @BeforeClass
+    public static void assertDroppability()
+    {
+        if (!MessagingService.DROPPABLE_VERBS.contains(VERB_DROPPABLE))
+            throw new AssertionError("Expected " + VERB_DROPPABLE + " to be droppable");
+        if (MessagingService.DROPPABLE_VERBS.contains(VERB_NONDROPPABLE))
+            throw new AssertionError("Expected " + VERB_NONDROPPABLE + " not to be droppable");
+    }
+
+    /**
+     * Tests that non-droppable messages are never expired
+     */
+    @Test
+    public void testNondroppable() throws UnknownHostException
+    {
+        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+        long nanoTimeBeforeEnqueue = System.nanoTime();
+
+        assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        otc.expireMessages(expirationTimeNanos());
+
+        assertFalse("OutboundTcpConnection with non-droppable verbs should not expire",
+                otc.backlogContainsExpiredMessages(expirationTimeNanos()));
+    }
+
+    /**
+     * Tests that droppable messages will be dropped after they expire, but not before.
+     * 
+     * @throws UnknownHostException
+     */
+    @Test
+    public void testDroppable() throws UnknownHostException
+    {
+        OutboundTcpConnection otc = getOutboundTcpConnectionForLocalhost();
+        long nanoTimeBeforeEnqueue = System.nanoTime();
+
+        initialFill(otc, VERB_DROPPABLE);
+        assertFalse("OutboundTcpConnection with droppable verbs should not expire immediately",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        otc.expireMessages(nanoTimeBeforeEnqueue);
+        assertFalse("OutboundTcpConnection with droppable verbs should not expire with enqueue-time
expiration",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+        // Lets presume, expiration time have passed => At that time there shall be expired
messages in the Queue
+        long nanoTimeWhenExpired = expirationTimeNanos();
+        assertTrue("OutboundTcpConnection with droppable verbs should have expired",
+                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+
+        // Using the same timestamp, lets expire them and check whether they have gone
+        otc.expireMessages(nanoTimeWhenExpired);
+        assertFalse("OutboundTcpConnection should not have expired entries",
+                otc.backlogContainsExpiredMessages(nanoTimeWhenExpired));
+
+        // Actually the previous test can be done in a harder way: As expireMessages() has
run, we cannot have
+        // ANY expired values, thus lets test also against nanoTimeBeforeEnqueue
+        assertFalse("OutboundTcpConnection should not have any expired entries",
+                otc.backlogContainsExpiredMessages(nanoTimeBeforeEnqueue));
+
+    }
+
+    /**
+     * Fills the given OutboundTcpConnection with (1 + BACKLOG_PURGE_SIZE), elements. The
first
+     * BACKLOG_PURGE_SIZE elements are non-droppable, the last one is a message with the
given Verb and can be
+     * droppable or non-droppable.
+     */
+    private void initialFill(OutboundTcpConnection otc, Verb verb)
+    {
+        assertFalse("Fresh OutboundTcpConnection contains expired messages",
+                otc.backlogContainsExpiredMessages(System.nanoTime()));
+
+        fillToPurgeSize(otc, VERB_NONDROPPABLE);
+        MessageOut<?> messageDroppable10s = new MessageOut<>(verb);
+        otc.enqueue(messageDroppable10s, nextMessageId());
+        otc.expireMessages(System.nanoTime());
+    }
+
+    /**
+     * Returns a nano timestamp in the far future, when expiration should have been performed
for VERB_DROPPABLE.
+     * The offset is chosen as 2 times of the expiration time of VERB_DROPPABLE.
+     * 
+     * @return The future nano timestamp
+     */
+    private long expirationTimeNanos()
+    {
+        return System.nanoTime() + NANOS_FOR_TIMEOUT;
+    }
+
+    private int nextMessageId()
+    {
+        return messageId.incrementAndGet();
+    }
+
+    /**
+     * Adds BACKLOG_PURGE_SIZE messages to the queue. Hint: At BACKLOG_PURGE_SIZE expiration
starts to work.
+     * 
+     * @param otc
+     *            The OutboundTcpConnection
+     * @param verb
+     *            The verb that defines the message type
+     */
+    private void fillToPurgeSize(OutboundTcpConnection otc, Verb verb)
+    {
+        for (int i = 0; i < OutboundTcpConnection.BACKLOG_PURGE_SIZE; i++)
+        {
+            otc.enqueue(new MessageOut<>(verb), nextMessageId());
+        }
+    }
+
+    private OutboundTcpConnection getOutboundTcpConnectionForLocalhost() throws UnknownHostException
+    {
+        InetAddress lo = InetAddress.getByName("127.0.0.1");
+        OutboundTcpConnectionPool otcPool = new OutboundTcpConnectionPool(lo);
+        OutboundTcpConnection otc = new OutboundTcpConnection(otcPool);
+        return otc;
+    }
+}
-- 
2.12.0



> Expiration in OutboundTcpConnection can block the reader Thread
> ---------------------------------------------------------------
>
>                 Key: CASSANDRA-13265
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-13265
>             Project: Cassandra
>          Issue Type: Bug
>         Environment: Cassandra 3.0.9
> Java HotSpot(TM) 64-Bit Server VM version 25.112-b15 (Java version 1.8.0_112-b15)
> Linux 3.16
>            Reporter: Christian Esken
>            Assignee: Christian Esken
>             Fix For: 3.0.x
>
>         Attachments: cassandra.pb-cache4-dus.2017-02-17-19-36-26.chist.xz, cassandra.pb-cache4-dus.2017-02-17-19-36-26.td.xz
>
>
> I observed that sometimes a single node in a Cassandra cluster fails to communicate to
the other nodes. This can happen at any time, during peak load or low load. Restarting that
single node from the cluster fixes the issue.
> Before going in to details, I want to state that I have analyzed the situation and am
already developing a possible fix. Here is the analysis so far:
> - A Threaddump in this situation showed  324 Threads in the OutboundTcpConnection class
that want to lock the backlog queue for doing expiration.
> - A class histogram shows 262508 instances of OutboundTcpConnection$QueuedMessage.
> What is the effect of it? As soon as the Cassandra node has reached a certain amount
of queued messages, it starts thrashing itself to death. Each of the Thread fully locks the
Queue for reading and writing by calling iterator.next(), making the situation worse and worse.
> - Writing: Only after 262508 locking operation it can progress with actually writing
to the Queue.
> - Reading: Is also blocked, as 324 Threads try to do iterator.next(), and fully lock
the Queue
> This means: Writing blocks the Queue for reading, and readers might even be starved which
makes the situation even worse.
> -----
> The setup is:
>  - 3-node cluster
>  - replication factor 2
>  - Consistency LOCAL_ONE
>  - No remote DC's
>  - high write throughput (100000 INSERT statements per second and more during peak times).
>  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message