cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisb...@apache.org
Subject cassandra git commit: Outbound TCP connections should consult internode authenticator. Patch by Ariel Weisberg; Reviewed by Marcus Eriksson for CASSANDRA-13324
Date Fri, 24 Mar 2017 19:27:04 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 60e2e9826 -> 732d1af86


Outbound TCP connections should consult internode authenticator.
Patch by Ariel Weisberg; Reviewed by Marcus Eriksson for CASSANDRA-13324


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/732d1af8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/732d1af8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/732d1af8

Branch: refs/heads/trunk
Commit: 732d1af866b91e5ba63e7e2a467d99d4cb90e11f
Parents: 60e2e98
Author: Ariel Weisberg <aweisberg@apple.com>
Authored: Fri Mar 24 15:26:50 2017 -0400
Committer: Ariel Weisberg <aweisberg@apple.com>
Committed: Fri Mar 24 15:26:50 2017 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/auth/AuthConfig.java   | 10 +---
 .../cassandra/config/DatabaseDescriptor.java    |  5 +-
 .../locator/ReconnectableSnitchHelper.java      | 21 +++++--
 .../apache/cassandra/net/MessagingService.java  | 44 ++++++++++++--
 .../cassandra/net/OutboundTcpConnection.java    | 33 +++++++---
 .../net/OutboundTcpConnectionPool.java          |  9 ++-
 .../config/DatabaseDescriptorRefTest.java       |  1 +
 .../locator/ReconnectableSnitchHelperTest.java  | 63 ++++++++++++++++++++
 .../cassandra/net/MessagingServiceTest.java     | 60 +++++++++++++++++++
 10 files changed, 218 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fb9b8c4..b42bde6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
  * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
  * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
  * Incremental repair not streaming correct sstables (CASSANDRA-13328)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/auth/AuthConfig.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/AuthConfig.java b/src/java/org/apache/cassandra/auth/AuthConfig.java
index c389ae4..2ca1522 100644
--- a/src/java/org/apache/cassandra/auth/AuthConfig.java
+++ b/src/java/org/apache/cassandra/auth/AuthConfig.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.hsqldb.Database;
 
 /**
  * Only purpose is to Initialize authentication/authorization via {@link #applyAuth()}.
@@ -94,13 +95,8 @@ public final class AuthConfig
 
         // authenticator
 
-        IInternodeAuthenticator internodeAuthenticator;
         if (conf.internode_authenticator != null)
-            internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator,
"internode_authenticator");
-        else
-            internodeAuthenticator = new AllowAllInternodeAuthenticator();
-
-        DatabaseDescriptor.setInternodeAuthenticator(internodeAuthenticator);
+            DatabaseDescriptor.setInternodeAuthenticator(FBUtilities.construct(conf.internode_authenticator,
"internode_authenticator"));
 
         // Validate at last to have authenticator, authorizer, role-manager and internode-auth
setup
         // in case these rely on each other.
@@ -108,6 +104,6 @@ public final class AuthConfig
         authenticator.validateConfiguration();
         authorizer.validateConfiguration();
         roleManager.validateConfiguration();
-        internodeAuthenticator.validateConfiguration();
+        DatabaseDescriptor.getInternodeAuthenticator().validateConfiguration();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4fb742c..465cd8a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -29,6 +29,7 @@ import java.nio.file.Paths;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
@@ -36,6 +37,7 @@ import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
 import org.apache.cassandra.auth.AuthConfig;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthorizer;
@@ -79,7 +81,7 @@ public class DatabaseDescriptor
     private static InetAddress rpcAddress;
     private static InetAddress broadcastRpcAddress;
     private static SeedProvider seedProvider;
-    private static IInternodeAuthenticator internodeAuthenticator;
+    private static IInternodeAuthenticator internodeAuthenticator = new AllowAllInternodeAuthenticator();
 
     /* Hashing strategy Random or OPHF */
     private static IPartitioner partitioner;
@@ -1538,6 +1540,7 @@ public class DatabaseDescriptor
 
     public static void setInternodeAuthenticator(IInternodeAuthenticator internodeAuthenticator)
     {
+        Preconditions.checkNotNull(internodeAuthenticator);
         DatabaseDescriptor.internodeAuthenticator = internodeAuthenticator;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
index a6bec0c..08f0a14 100644
--- a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -21,8 +21,12 @@ package org.apache.cassandra.locator;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.OutboundTcpConnectionPool;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +53,7 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
     {
         try
         {
-            reconnect(publicAddress, InetAddress.getByName(localAddressValue.value));
+            reconnect(publicAddress, InetAddress.getByName(localAddressValue.value), snitch,
localDc);
         }
         catch (UnknownHostException e)
         {
@@ -57,12 +61,21 @@ public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
         }
     }
 
-    private void reconnect(InetAddress publicAddress, InetAddress localAddress)
+    @VisibleForTesting
+    static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch
snitch, String localDc)
     {
+        OutboundTcpConnectionPool cp = MessagingService.instance().getConnectionPool(publicAddress);
+        //InternodeAuthenticator said don't connect
+        if (cp == null)
+        {
+            logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress,
localAddress);
+            return;
+        }
+
         if (snitch.getDatacenter(publicAddress).equals(localDc)
-                && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress))
+                && !cp.endPoint().equals(localAddress))
         {
-            MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress);
+            cp.reset(localAddress);
             logger.debug("Initiated reconnect to an Internal IP {} for the {}", localAddress,
publicAddress);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 729c042..55604d0 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -416,7 +416,8 @@ public final class MessagingService implements MessagingServiceMBean
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
-    private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers
= new NonBlockingHashMap<>();
+    @VisibleForTesting
+    final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers
= new NonBlockingHashMap<>();
 
     private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
     private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
@@ -531,6 +532,10 @@ public final class MessagingService implements MessagingServiceMBean
                 maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target,
pair.right.timeout);
 
                 ConnectionMetrics.totalTimeouts.mark();
+                OutboundTcpConnectionPool cp = getConnectionPool(expiredCallbackInfo.target);
+                if (cp != null)
+                    cp.incrementTimeout();
+
                 getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
 
                 if (expiredCallbackInfo.callback.supportsBackPressure())
@@ -670,8 +675,16 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void convict(InetAddress ep)
     {
-        logger.trace("Resetting pool for {}", ep);
-        getConnectionPool(ep).reset();
+        OutboundTcpConnectionPool cp = getConnectionPool(ep);
+        if (cp != null)
+        {
+            logger.trace("Resetting pool for {}", ep);
+            getConnectionPool(ep).reset();
+        }
+        else
+        {
+            logger.debug("Not resetting pool for {} because internode authenticator said
not to connect", ep);
+        }
     }
 
     public void listen()
@@ -795,11 +808,22 @@ public final class MessagingService implements MessagingServiceMBean
         connectionManagers.remove(to);
     }
 
+    /**
+     * Get a connection pool to the specified endpoint. Constructs one if none exists.
+     *
+     * Can return null if the InternodeAuthenticator fails to authenticate the node.
+     * @param to
+     * @return The connection pool or null if internode authenticator says not to
+     */
     public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
     {
         OutboundTcpConnectionPool cp = connectionManagers.get(to);
         if (cp == null)
         {
+            //Don't attempt to connect to nodes that won't (or shouldn't) authenticate anyways
+            if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, OutboundTcpConnectionPool.portFor(to)))
+                return null;
+
             cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
             OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
             if (existingPool != null)
@@ -811,10 +835,17 @@ public final class MessagingService implements MessagingServiceMBean
         return cp;
     }
 
-
+    /**
+     * Get a connection for a message to a specific endpoint. Constructs one if none exists.
+     *
+     * Can return null if the InternodeAuthenticator fails to authenticate the node.
+     * @param to
+     * @return The connection or null if internode authenticator says not to
+     */
     public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg)
     {
-        return getConnectionPool(to).getConnection(msg);
+        OutboundTcpConnectionPool cp = getConnectionPool(to);
+        return cp == null ? null : cp.getConnection(msg);
     }
 
     /**
@@ -968,7 +999,8 @@ public final class MessagingService implements MessagingServiceMBean
         OutboundTcpConnection connection = getConnection(to, message);
 
         // write it
-        connection.enqueue(message, id);
+        if (connection != null)
+            connection.enqueue(message, id);
     }
 
     public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index feff527..9b19eab 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -249,6 +249,12 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                         break inner;
                     }
                 }
+                catch (InternodeAuthFailed e)
+                {
+                    logger.warn("Internode auth failed connecting to " + poolReference.endPoint());
+                    //Remove the connection pool and other thread so messages aren't queued
+                    MessagingService.instance().destroyConnectionPool(poolReference.endPoint());
+                }
                 catch (Exception e)
                 {
                     JVMStabilityInspector.inspectThrowable(e);
@@ -394,20 +400,27 @@ public class OutboundTcpConnection extends FastThreadLocalThread
     }
 
     @SuppressWarnings("resource")
-    private boolean connect()
+    private boolean connect() throws InternodeAuthFailed
     {
-        logger.debug("Attempting to connect to {}", poolReference.endPoint());
+        InetAddress endpoint = poolReference.endPoint();
+        if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(endpoint, poolReference.portFor(endpoint)))
+        {
+            throw new InternodeAuthFailed();
+        }
+
+        logger.debug("Attempting to connect to {}", endpoint);
+
 
         long start = System.nanoTime();
         long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
         while (System.nanoTime() - start < timeout)
         {
-            targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
+            targetVersion = MessagingService.instance().getVersion(endpoint);
             try
             {
                 socket = poolReference.newSocket();
                 socket.setKeepAlive(true);
-                if (isLocalDC(poolReference.endPoint()))
+                if (isLocalDC(endpoint))
                 {
                     socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
                 }
@@ -446,7 +459,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                 }
                 else
                 {
-                    MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
+                    MessagingService.instance().setVersion(endpoint, maxTargetVersion);
                 }
 
                 if (targetVersion > maxTargetVersion)
@@ -454,7 +467,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                     logger.trace("Target max version is {}; will reconnect with that version",
maxTargetVersion);
                     try
                     {
-                        if (DatabaseDescriptor.getSeeds().contains(poolReference.endPoint()))
+                        if (DatabaseDescriptor.getSeeds().contains(endpoint))
                             logger.warn("Seed gossip version is {}; will not connect with
that version", maxTargetVersion);
                     }
                     catch (Throwable e)
@@ -484,7 +497,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                 if (shouldCompressConnection())
                 {
                     out.flush();
-                    logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint());
+                    logger.trace("Upgrading OutputStream to {} to be compressed", endpoint);
 
                     // TODO: custom LZ4 OS that supports BB write methods
                     LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
@@ -495,7 +508,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                                                                         checksum,
                                                                         true)); // no async
flushing
                 }
-                logger.debug("Done connecting to {}", poolReference.endPoint());
+                logger.debug("Done connecting to {}", endpoint);
                 return true;
             }
             catch (SSLHandshakeException e)
@@ -508,7 +521,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
             catch (IOException e)
             {
                 socket = null;
-                logger.debug("Unable to connect to {}", poolReference.endPoint(), e);
+                logger.debug("Unable to connect to {}", endpoint, e);
                 Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, TimeUnit.MILLISECONDS);
             }
         }
@@ -613,4 +626,6 @@ public class OutboundTcpConnection extends FastThreadLocalThread
             return false;
         }
     }
+
+    private static class InternodeAuthFailed extends Exception {}
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 9f9ffee..20a8da6 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -148,6 +148,11 @@ public class OutboundTcpConnectionPool
         }
     }
 
+    public static int portFor(InetAddress endpoint)
+    {
+        return isEncryptedChannel(endpoint) ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
+    }
+
     public InetAddress endPoint()
     {
         if (id.equals(FBUtilities.getBroadcastAddress()))
@@ -218,7 +223,7 @@ public class OutboundTcpConnectionPool
             smallMessages.closeSocket(true);
         if (gossipMessages != null)
             gossipMessages.closeSocket(true);
-
-        metrics.release();
+        if (metrics != null)
+            metrics.release();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 17cdd77..c8f8bc1 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -54,6 +54,7 @@ import static org.junit.Assert.fail;
 public class DatabaseDescriptorRefTest
 {
     static final String[] validClasses = {
+    "org.apache.cassandra.auth.AllowAllInternodeAuthenticator",
     "org.apache.cassandra.auth.IInternodeAuthenticator",
     "org.apache.cassandra.auth.IAuthenticator",
     "org.apache.cassandra.auth.IAuthorizer",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
new file mode 100644
index 0000000..232865a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReconnectableSnitchHelperTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.MessagingServiceTest;
+
+public class ReconnectableSnitchHelperTest
+{
+    static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+
+    @BeforeClass
+    public static void beforeClass() throws UnknownHostException
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setBackPressureStrategy(new MessagingServiceTest.MockBackPressureStrategy(Collections.emptyMap()));
+        DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1"));
+    }
+
+    /**
+     * Make sure that if a node fails internode authentication and MessagingService returns
a null
+     * pool that ReconnectableSnitchHelper fails gracefully.
+     */
+    @Test
+    public void failedAuthentication() throws Exception
+    {
+        DatabaseDescriptor.setInternodeAuthenticator(MessagingServiceTest.ALLOW_NOTHING_AUTHENTICATOR);
+        InetAddress address = InetAddress.getByName("127.0.0.250");
+        //Should tolerate null returns by MS for the connection
+        ReconnectableSnitchHelper.reconnect(address, address, null, null);
+    }
+
+    @After
+    public void replaceAuthenticator()
+    {
+        DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/732d1af8/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index d9a9915..e6b5cd0 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -36,12 +36,16 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Iterables;
 import com.codahale.metrics.Timer;
 
+import org.apache.cassandra.auth.IInternodeAuthenticator;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.monitoring.ApproximateTime;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
 import org.caffinitas.ohc.histo.EstimatedHistogram;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -52,6 +56,20 @@ public class MessagingServiceTest
 {
     private final static long ONE_SECOND = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS);
     private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets();
+    public static final IInternodeAuthenticator ALLOW_NOTHING_AUTHENTICATOR = new IInternodeAuthenticator()
+    {
+        public boolean authenticate(InetAddress remoteAddress, int remotePort)
+        {
+            return false;
+        }
+
+        public void validateConfiguration() throws ConfigurationException
+        {
+
+        }
+    };
+    static final IInternodeAuthenticator originalAuthenticator = DatabaseDescriptor.getInternodeAuthenticator();
+
     private final MessagingService messagingService = MessagingService.test();
 
     @BeforeClass
@@ -368,4 +386,46 @@ public class MessagingServiceTest
             throw new UnsupportedOperationException("Not supported.");
         }
     }
+
+    /**
+     * Make sure that if internode authenticatino fails for an outbound connection that all
the code that relies
+     * on getting the connection pool handles the null return
+     * @throws Exception
+     */
+    @Test
+    public void testFailedInternodeAuth() throws Exception
+    {
+        MessagingService ms = MessagingService.instance();
+        DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
+        InetAddress address = InetAddress.getByName("127.0.0.250");
+
+        //Should return null
+        assertNull(ms.getConnectionPool(address));
+        assertNull(ms.getConnection(address, new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK)));
+
+        //Should tolerate null
+        ms.convict(address);
+        ms.sendOneWay(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK), address);
+    }
+
+    @Test
+    public void testOutboundTcpConnectionCleansUp() throws Exception
+    {
+        MessagingService ms = MessagingService.instance();
+        DatabaseDescriptor.setInternodeAuthenticator(ALLOW_NOTHING_AUTHENTICATOR);
+        InetAddress address = InetAddress.getByName("127.0.0.250");
+        OutboundTcpConnectionPool pool = new OutboundTcpConnectionPool(address, new MockBackPressureStrategy(null).newState(address));
+        ms.connectionManagers.put(address, pool);
+        pool.smallMessages.start();
+        pool.smallMessages.enqueue(new MessageOut(MessagingService.Verb.GOSSIP_DIGEST_ACK),
0);
+        pool.smallMessages.join();
+        assertFalse(ms.connectionManagers.containsKey(address));
+    }
+
+    @After
+    public void replaceAuthenticator()
+    {
+        DatabaseDescriptor.setInternodeAuthenticator(originalAuthenticator);
+    }
+
 }


Mime
View raw message