cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject cassandra git commit: Retry all MS messages once after reopening a connection
Date Wed, 24 Aug 2016 22:31:58 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 732b10183 -> c26bd9185


Retry all MS messages once after reopening a connection

Patch by Tyler Hobbs; reviewed by Yuki Morishita for CASSANDRA-12192


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c26bd918
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c26bd918
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c26bd918

Branch: refs/heads/trunk
Commit: c26bd91852cbf19d7dba9f62078f5da31b04dbe0
Parents: 732b101
Author: Tyler Hobbs <tylerlhobbs@gmail.com>
Authored: Wed Aug 24 17:31:07 2016 -0500
Committer: Tyler Hobbs <tylerlhobbs@gmail.com>
Committed: Wed Aug 24 17:31:07 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  |  4 +--
 .../cassandra/net/OutboundTcpConnection.java    | 33 +++++++++-----------
 .../net/OutboundTcpConnectionPool.java          |  6 ++--
 4 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af7d0dd..eadb4a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.10
+ * Retry all internode messages once after a connection is
+   closed and reopened (CASSANDRA-12192)
  * Add support to rebuild from targeted replica (CASSANDRA-9875)
  * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
  * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 14d6440..94d71ff 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2144,7 +2144,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         // beginning if we restart before they [the CL segments] are discarded for
         // normal reasons post-truncate.  To prevent this, we store truncation
         // position in the System keyspace.
-        logger.trace("truncating {}", name);
+        logger.info("Truncating {}.{}", keyspace.getName(), name);
 
         final long truncatedAt;
         final CommitLogPosition replayAfter;
@@ -2197,7 +2197,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         };
 
         runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true);
-        logger.trace("truncate complete");
+        logger.info("Truncate of {}.{} is complete", keyspace.getName(), name);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 155cd1a..9a47fda 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -113,7 +113,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
 
         if (coalescingWindow < 0)
             throw new ExceptionInInitializerError(
-                    "Value provided for coalescing window must be greather than 0: " + coalescingWindow);
+                    "Value provided for coalescing window must be greater than 0: " + coalescingWindow);
     }
 
     private static final MessageOut CLOSE_SENTINEL = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE);
@@ -137,9 +137,9 @@ public class OutboundTcpConnection extends FastThreadLocalThread
     private volatile int currentMsgBufferCount = 0;
     private volatile int targetVersion;
 
-    public OutboundTcpConnection(OutboundTcpConnectionPool pool)
+    public OutboundTcpConnection(OutboundTcpConnectionPool pool, String name)
     {
-        super("MessagingService-Outgoing-" + pool.endPoint());
+        super("MessagingService-Outgoing-" + pool.endPoint() + "-" + name);
         this.poolReference = pool;
         cs = newCoalescingStrategy(pool.endPoint().getHostAddress());
 
@@ -176,6 +176,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
 
     void closeSocket(boolean destroyThread)
     {
+        logger.debug("Enqueuing socket close for {}", poolReference.endPoint());
         backlog.clear();
         isStopped = destroyThread; // Exit loop to stop the thread
         enqueue(CLOSE_SENTINEL, -1);
@@ -308,11 +309,10 @@ public class OutboundTcpConnection extends FastThreadLocalThread
             disconnect();
             if (e instanceof IOException || e.getCause() instanceof IOException)
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("error writing to {}", poolReference.endPoint(), e);
+                logger.debug("Error writing to {}", poolReference.endPoint(), e);
 
-                // if the message was important, such as a repair acknowledgement, put it
back on the queue
-                // to retry after re-connecting.  See CASSANDRA-5393
+                // If we haven't retried this message yet, put it back on the queue to retry
after re-connecting.
+                // See CASSANDRA-5393 and CASSANDRA-12192.
                 if (qm.shouldRetry())
                 {
                     try
@@ -370,13 +370,11 @@ public class OutboundTcpConnection extends FastThreadLocalThread
             try
             {
                 socket.close();
-                if (logger.isTraceEnabled())
-                    logger.trace("Socket to {} closed", poolReference.endPoint());
+                logger.debug("Socket to {} closed", poolReference.endPoint());
             }
             catch (IOException e)
             {
-                if (logger.isTraceEnabled())
-                    logger.trace("exception closing connection to " + poolReference.endPoint(),
e);
+                logger.debug("Exception closing connection to {}", poolReference.endPoint(),
e);
             }
             out = null;
             socket = null;
@@ -386,8 +384,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
     @SuppressWarnings("resource")
     private boolean connect()
     {
-        if (logger.isTraceEnabled())
-            logger.trace("attempting to connect to {}", poolReference.endPoint());
+        logger.debug("Attempting to connect to {}", poolReference.endPoint());
 
         long start = System.nanoTime();
         long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
@@ -463,7 +460,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                 if (shouldCompressConnection())
                 {
                     out.flush();
-                    logger.trace("Upgrading OutputStream to be compressed");
+                    logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint());
                     if (targetVersion < MessagingService.VERSION_21)
                     {
                         // Snappy is buffered, so no need for extra buffering output stream
@@ -481,7 +478,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                                                                             true)); // no
async flushing
                     }
                 }
-
+                logger.debug("Done connecting to {}", poolReference.endPoint());
                 return true;
             }
             catch (SSLHandshakeException e)
@@ -494,8 +491,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
             catch (IOException e)
             {
                 socket = null;
-                if (logger.isTraceEnabled())
-                    logger.trace("unable to connect to " + poolReference.endPoint(), e);
+                logger.debug("Unable to connect to {}", poolReference.endPoint(), e);
                 Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, TimeUnit.MILLISECONDS);
             }
         }
@@ -582,7 +578,8 @@ public class OutboundTcpConnection extends FastThreadLocalThread
 
         boolean shouldRetry()
         {
-            return !droppable;
+            // retry all messages once
+            return true;
         }
 
         public long timestampNanos()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c26bd918/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index ecadf89..0418ff6 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -56,9 +56,9 @@ public class OutboundTcpConnectionPool
         resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
         started = new CountDownLatch(1);
 
-        smallMessages = new OutboundTcpConnection(this);
-        largeMessages = new OutboundTcpConnection(this);
-        gossipMessages = new OutboundTcpConnection(this);
+        smallMessages = new OutboundTcpConnection(this, "Small");
+        largeMessages = new OutboundTcpConnection(this, "Large");
+        gossipMessages = new OutboundTcpConnection(this, "Gossip");
     }
 
     /**


Mime
View raw message