cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [3/6] cassandra git commit: Close incoming connections when MessagingService is stopped
Date Wed, 29 Apr 2015 21:06:27 GMT
Close incoming connections when MessagingService is stopped

Patch by Sergio Bossa, reviewed by brandonwilliams for CASSANDRA-9238


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cefaa4eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cefaa4eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cefaa4eb

Branch: refs/heads/trunk
Commit: cefaa4eb7168a63bbe5b04ad7b9bc093a34f4bb8
Parents: 58f25d5
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Wed Apr 29 16:03:36 2015 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Wed Apr 29 16:03:36 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../gms/GossipDigestSynVerbHandler.java         |  8 ----
 src/java/org/apache/cassandra/gms/Gossiper.java | 26 -------------
 .../net/IncomingStreamingConnection.java        | 31 ++++++++++++----
 .../cassandra/net/IncomingTcpConnection.java    | 39 +++++++++++++-------
 .../apache/cassandra/net/MessagingService.java  | 11 +++++-
 6 files changed, 59 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 213f226..3df91ce 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,6 @@
 2.0.15:
  * IncomingTcpConnection thread is not named (CASSANDRA-9262)
- * Ignore gossip SYNs after shutdown (CASSANDRA-9238)
+ * Close incoming connections when MessagingService is stopped (CASSANDRA-9238)
  * Avoid overflow when calculating max sstable size in LCS (CASSANDRA-9235)
  * Make sstable blacklisting work with compression (CASSANDRA-9138)
  * Do not attempt to rebuild indexes if no index accepts any column (CASSANDRA-9196)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 4454c46..df74808 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -36,22 +36,14 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn>
     public void doVerb(MessageIn<GossipDigestSyn> message, int id)
     {
         InetAddress from = message.from;
-        
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestSynMessage from {}", from);
-        
         if (!Gossiper.instance.isEnabled())
         {
             if (logger.isTraceEnabled())
                 logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled");
             return;
         }
-        
-        if (!Gossiper.instance.shouldAckAfterShutdown(from))
-        {
-            logger.debug("Temporarily ignoring SYN from shutdown node {}", from);
-            return;
-        }
 
         GossipDigestSyn gDigestMessage = message.payload;
         /* If the message is from a different cluster throw it away. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 4665e74..b77064d 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -103,9 +103,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     /* unreachable member set */
     private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress,
Long>();
-    
-    /* shutdown member set */
-    private final Map<InetAddress, Long> shutdownEndpoints = new ConcurrentHashMap<InetAddress,
Long>();
 
     /* initial seeds for joining the cluster */
     private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator);
@@ -305,24 +302,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             return 0L;
     }
 
-    public boolean shouldAckAfterShutdown(InetAddress endpoint)
-    {
-        Long shutdownTimestamp = shutdownEndpoints.get(endpoint);
-        Integer shutdownAnnounce = Integer.getInteger("cassandra.shutdown_announce_in_ms",
2000);
-        // Do not temporarily answer to SYN messages coming from shutdown nodes, to avoid
reconnecting:
-        if (shutdownTimestamp != null && (System.currentTimeMillis() - shutdownTimestamp)
< (shutdownAnnounce * 2))
-        {
-            return false;
-        }
-        // Otherwise, if allowed to answer, remove the node from the shutdown set
-        // (and do this only here, rather than in sendGossip too, to avoid racing between
different calling threads):
-        else
-        {
-            shutdownEndpoints.remove(endpoint);
-            return true;
-        }
-    }
-    
     private boolean isShutdown(InetAddress endpoint)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
@@ -371,7 +350,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             return;
         epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
         epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
-        shutdownEndpoints.put(endpoint, System.currentTimeMillis());
         markDead(endpoint, epState);
         FailureDetector.instance.forceConviction(endpoint);
     }
@@ -644,10 +622,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         /* Generate a random number from 0 -> size */
         int index = (size == 1) ? 0 : random.nextInt(size);
         InetAddress to = liveEndpoints.get(index);
-        Long shutdownTimestamp = shutdownEndpoints.get(to);
-        Integer shutdownAnnounce = Integer.getInteger("cassandra.shutdown_announce_in_ms",
2000);
-        if (shutdownTimestamp != null && (System.currentTimeMillis() - shutdownTimestamp)
< (shutdownAnnounce * 2))
-            return false;
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSyn to {} ...", to);
         MessagingService.instance().sendOneWay(message, to);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
index 20392f2..e37299f 100644
--- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java
@@ -17,10 +17,12 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.Closeable;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.Socket;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,18 +34,20 @@ import org.apache.cassandra.streaming.messages.StreamMessage;
 /**
  * Thread to consume stream init messages.
  */
-public class IncomingStreamingConnection extends Thread
+public class IncomingStreamingConnection extends Thread implements Closeable
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class);
 
     private final int version;
     private final Socket socket;
+    private final Set<Closeable> group;
 
-    public IncomingStreamingConnection(int version, Socket socket)
+    public IncomingStreamingConnection(int version, Socket socket, Set<Closeable> group)
     {
         super("STREAM-INIT-" + socket.getRemoteSocketAddress());
         this.version = version;
         this.socket = socket;
+        this.group = group;
     }
 
     @Override
@@ -67,14 +71,27 @@ public class IncomingStreamingConnection extends Thread
         catch (IOException e)
         {
             logger.debug("IOException reading from socket; closing", e);
-            try
+            close();
+        }
+    }
+    
+    @Override
+    public void close()
+    {
+        try
+        {
+            if (!socket.isClosed())
             {
                 socket.close();
             }
-            catch (IOException e2)
-            {
-                logger.debug("error closing socket", e2);
-            }
+        }
+        catch (IOException e)
+        {
+            logger.debug("Error closing socket", e);
+        }
+        finally
+        {
+            group.remove(this);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index fbdd221..b61e82e 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -21,6 +21,7 @@ import java.io.*;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.Set;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,21 +31,23 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.gms.Gossiper;
 
-public class IncomingTcpConnection extends Thread
+public class IncomingTcpConnection extends Thread implements Closeable
 {
     private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class);
 
     private final int version;
     private final boolean compressed;
     private final Socket socket;
+    private final Set<Closeable> group;
     public InetAddress from;
 
-    public IncomingTcpConnection(int version, boolean compressed, Socket socket)
+    public IncomingTcpConnection(int version, boolean compressed, Socket socket, Set<Closeable>
group)
     {
         super("MessagingService-Incoming-" + socket.getInetAddress());
         this.version = version;
         this.compressed = compressed;
         this.socket = socket;
+        this.group = group;
         if (DatabaseDescriptor.getInternodeRecvBufferSize() != null)
         {
             try
@@ -91,6 +94,26 @@ public class IncomingTcpConnection extends Thread
             close();
         }
     }
+    
+    @Override
+    public void close()
+    {
+        try
+        {
+            if (!socket.isClosed())
+            {
+                socket.close();
+            }
+        }
+        catch (IOException e)
+        {
+            logger.debug("Error closing socket", e);
+        }
+        finally
+        {
+            group.remove(this);
+        }
+    }
 
     private void receiveMessages() throws IOException
     {
@@ -162,16 +185,4 @@ public class IncomingTcpConnection extends Thread
         }
         return message.from;
     }
-
-    private void close()
-    {
-        try
-        {
-            socket.close();
-        }
-        catch (IOException e)
-        {
-            logger.debug("Error closing socket", e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 117bd3c..d570faf 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -33,6 +33,7 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
@@ -885,6 +886,7 @@ public final class MessagingService implements MessagingServiceMBean
     private static class SocketThread extends Thread
     {
         private final ServerSocket server;
+        private final Set<Closeable> connections = Sets.newConcurrentHashSet();
 
         SocketThread(ServerSocket server, String name)
         {
@@ -919,9 +921,10 @@ public final class MessagingService implements MessagingServiceMBean
                     socket.setSoTimeout(0);
 
                     Thread thread = isStream
-                                  ? new IncomingStreamingConnection(version, socket)
-                                  : new IncomingTcpConnection(version, MessagingService.getBits(header,
2, 1) == 1, socket);
+                                  ? new IncomingStreamingConnection(version, socket, connections)
+                                  : new IncomingTcpConnection(version, MessagingService.getBits(header,
2, 1) == 1, socket, connections);
                     thread.start();
+                    connections.add((Closeable) thread);
                 }
                 catch (AsynchronousCloseException e)
                 {
@@ -947,6 +950,10 @@ public final class MessagingService implements MessagingServiceMBean
         {
             logger.debug("Closing accept() thread");
             server.close();
+            for (Closeable connection : connections) 
+            {
+                connection.close();
+            }
         }
 
         private boolean authenticate(Socket socket)


Mime
View raw message