cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [4/4] git commit: refactor reconnecting snitches patch by jasobrown; reviewed by jbellis for CASSANDRA-5681
Date Fri, 21 Jun 2013 15:40:42 GMT
refactor reconnecting snitches
patch by jasobrown; reviewed by jbellis for CASSANDRA-5681


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e75e33fa
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e75e33fa
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e75e33fa

Branch: refs/heads/cassandra-1.2
Commit: e75e33fa6dc5e2a3fe061d747cc98679a65ef960
Parents: 18f3a79
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Fri Jun 21 10:40:31 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Fri Jun 21 10:40:31 2013 -0500

----------------------------------------------------------------------
 .../cassandra/locator/Ec2MultiRegionSnitch.java | 71 +---------------
 .../locator/GossipingPropertyFileSnitch.java    | 63 +-------------
 .../locator/ReconnectableSnitchHelper.java      | 88 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
index 9317941..bd5e091 100644
--- a/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
@@ -19,16 +19,11 @@ package org.apache.cassandra.locator;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 
 /**
@@ -36,16 +31,13 @@ import org.apache.cassandra.service.StorageService;
  *
  * 2) Snitch will set the private IP as a Gossip application state.
  *
- * 3) Snitch implements IESCS and will reset the connection if it is within the
+ * 3) Uses a helper class that implements IESCS and will reset the public IP connection if
it is within the
  * same region to communicate via private IP.
  *
- * Implements Ec2Snitch to inherit its functionality and extend it for
- * Multi-Region.
- *
  * Operational: All the nodes in this cluster needs to be able to (modify the
  * Security group settings in AWS) communicate via Public IP's.
  */
-public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateChangeSubscriber
+public class Ec2MultiRegionSnitch extends Ec2Snitch
 {
     private static final String PUBLIC_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/public-ipv4";
     private static final String PRIVATE_IP_QUERY_URL = "http://169.254.169.254/latest/meta-data/local-ipv4";
@@ -62,67 +54,10 @@ public class Ec2MultiRegionSnitch extends Ec2Snitch implements IEndpointStateCha
         DatabaseDescriptor.setBroadcastAddress(localPublicAddress);
     }
 
-    public void onJoin(InetAddress endpoint, EndpointState epState)
-    {
-        if (epState.getApplicationState(ApplicationState.INTERNAL_IP) != null)
-            reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
-    }
-
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
-    {
-        if (state == ApplicationState.INTERNAL_IP)
-            reconnect(endpoint, value);
-    }
-
-    public void onAlive(InetAddress endpoint, EndpointState state)
-    {
-        if (state.getApplicationState(ApplicationState.INTERNAL_IP) != null)
-            reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
-    }
-
-    public void onDead(InetAddress endpoint, EndpointState state)
-    {
-        // do nothing
-    }
-
-    public void onRestart(InetAddress endpoint, EndpointState state)
-    {
-        // do nothing
-    }
-
-    public void onRemove(InetAddress endpoint)
-    {
-        // do nothing.
-    }
-
-    private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue)
-    {
-        try
-        {
-            reconnect(publicAddress, InetAddress.getByName(localAddressValue.value));
-        }
-        catch (UnknownHostException e)
-        {
-            logger.error("Error in getting the IP address resolved: ", e);
-        }
-    }
-
-    private void reconnect(InetAddress publicAddress, InetAddress localAddress)
-    {
-        if (getDatacenter(publicAddress).equals(getDatacenter(localPublicAddress))
-            && MessagingService.instance().getVersion(publicAddress) == MessagingService.current_version
-            && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress))
-        {
-            MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress);
-            logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s",
localAddress, publicAddress));
-        }
-    }
-
-    @Override
     public void gossiperStarting()
     {
         super.gossiperStarting();
         Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP, StorageService.instance.valueFactory.internalIP(localPrivateAddress));
-        Gossiper.instance.register(this);
+        Gossiper.instance.register(new ReconnectableSnitchHelper(this, ec2region, true));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
index 071cd09..e00239e 100644
--- a/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/GossipingPropertyFileSnitch.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Map;
 
 import org.apache.cassandra.db.SystemTable;
@@ -30,14 +29,11 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
-import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.service.StorageService;
 
 
-public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch implements
IEndpointStateChangeSubscriber
+public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch// implements
IEndpointStateChangeSubscriber
 {
     private static final Logger logger = LoggerFactory.getLogger(GossipingPropertyFileSnitch.class);
 
@@ -47,7 +43,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch
i
     private Map<InetAddress, Map<String, String>> savedEndpoints;
     private String DEFAULT_DC = "UNKNOWN_DC";
     private String DEFAULT_RACK = "UNKNOWN_RACK";
-    private boolean preferLocal;
+    private final boolean preferLocal;
 
     public GossipingPropertyFileSnitch() throws ConfigurationException
     {
@@ -126,64 +122,11 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch
i
         return epState.getApplicationState(ApplicationState.RACK).value;
     }
 
-    // IEndpointStateChangeSubscriber methods
-
-    public void onJoin(InetAddress endpoint, EndpointState epState)
-    {
-        if (preferLocal && epState.getApplicationState(ApplicationState.INTERNAL_IP)
!= null)
-            reConnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
-    }
-
-    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
-    {
-        if (preferLocal && state == ApplicationState.INTERNAL_IP)
-            reConnect(endpoint, value);
-    }
-
-    public void onAlive(InetAddress endpoint, EndpointState state)
-    {
-        if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP)
!= null)
-            reConnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
-    }
-
-    public void onDead(InetAddress endpoint, EndpointState state)
-    {
-        // do nothing
-    }
-
-    public void onRestart(InetAddress endpoint, EndpointState state)
-    {
-        // do nothing
-    }
-
-    public void onRemove(InetAddress endpoint)
-    {
-        // do nothing.
-    }
-
-    private void reConnect(InetAddress endpoint, VersionedValue versionedValue)
-    {
-        if (!getDatacenter(endpoint).equals(myDC))
-            return; // do nothing return back...
-
-        try
-        {
-            InetAddress remoteIP = InetAddress.getByName(versionedValue.value);
-            MessagingService.instance().getConnectionPool(endpoint).reset(remoteIP);
-            logger.debug(String.format("Intiated reconnect to an Internal IP %s for the endpoint
%s", remoteIP, endpoint));
-        }
-        catch (UnknownHostException e)
-        {
-            logger.error("Error in getting the IP address resolved", e);
-        }
-    }
-
-    @Override
     public void gossiperStarting()
     {
         super.gossiperStarting();
         Gossiper.instance.addLocalApplicationState(ApplicationState.INTERNAL_IP,
                                                    StorageService.instance.valueFactory.internalIP(FBUtilities.getLocalAddress().getHostAddress()));
-        Gossiper.instance.register(this);
+        Gossiper.instance.register(new ReconnectableSnitchHelper(this, myDC, preferLocal));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e75e33fa/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
new file mode 100644
index 0000000..adec953
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReconnectableSnitchHelper.java
@@ -0,0 +1,88 @@
+package org.apache.cassandra.locator;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Sidekick helper for snitches that want to reconnect from one IP addr for a node to another.
+ * Typically, this is for situations like EC2 where a node will have a public address and
a private address,
+ * where we connect on the public, discover the private, and reconnect on the private.
+ */
+public class ReconnectableSnitchHelper implements IEndpointStateChangeSubscriber
+{
+    private static final Logger logger = LoggerFactory.getLogger(ReconnectableSnitchHelper.class);
+    private final IEndpointSnitch snitch;
+    private final String localDc;
+    private final boolean preferLocal;
+
+    public ReconnectableSnitchHelper(IEndpointSnitch snitch, String localDc, boolean preferLocal)
+    {
+        this.snitch = snitch;
+        this.localDc = localDc;
+        this.preferLocal = preferLocal;
+    }
+
+    private void reconnect(InetAddress publicAddress, VersionedValue localAddressValue)
+    {
+        try
+        {
+            reconnect(publicAddress, InetAddress.getByName(localAddressValue.value));
+        }
+        catch (UnknownHostException e)
+        {
+            logger.error("Error in getting the IP address resolved: ", e);
+        }
+    }
+
+    private void reconnect(InetAddress publicAddress, InetAddress localAddress)
+    {
+        if (snitch.getDatacenter(publicAddress).equals(localDc)
+                && MessagingService.instance().getVersion(publicAddress) == MessagingService.current_version
+                && !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress))
+        {
+            MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress);
+            logger.debug(String.format("Intiated reconnect to an Internal IP %s for the %s",
localAddress, publicAddress));
+        }
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState)
+    {
+        if (preferLocal && epState.getApplicationState(ApplicationState.INTERNAL_IP)
!= null)
+            reconnect(endpoint, epState.getApplicationState(ApplicationState.INTERNAL_IP));
+    }
+
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
+    {
+        if (preferLocal && state == ApplicationState.INTERNAL_IP)
+            reconnect(endpoint, value);
+    }
+
+    public void onAlive(InetAddress endpoint, EndpointState state)
+    {
+        if (preferLocal && state.getApplicationState(ApplicationState.INTERNAL_IP)
!= null)
+            reconnect(endpoint, state.getApplicationState(ApplicationState.INTERNAL_IP));
+    }
+
+    public void onDead(InetAddress endpoint, EndpointState state)
+    {
+        // do nothing.
+    }
+
+    public void onRemove(InetAddress endpoint)
+    {
+        // do nothing.
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState state)
+    {
+        // do nothing.
+    }
+}


Mime
View raw message