cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject git commit: Save EC2Snitch topology information in system table - take 2 patch by Vijay; reviewed by Jason Brown for CASSANDRA-5171
Date Wed, 10 Jul 2013 02:11:18 GMT
Updated Branches:
  refs/heads/trunk 088d965ce -> db4da73e5


Save EC2Snitch topology information in system table - take 2
patch by Vijay; reviewed by Jason Brown for CASSANDRA-5171


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/db4da73e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/db4da73e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/db4da73e

Branch: refs/heads/trunk
Commit: db4da73e58424de64d56d61a7bca10d33fe213d1
Parents: 088d965
Author: Vijay Parthasarathy <vijay2win@gmail.com>
Authored: Tue Jul 9 19:08:42 2013 -0700
Committer: Vijay Parthasarathy <vijay2win@gmail.com>
Committed: Tue Jul 9 19:08:42 2013 -0700

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  16 ++++++++
 .../org/apache/cassandra/locator/Ec2Snitch.java |  15 ++++++++
 .../net/OutboundTcpConnectionPool.java          |   4 ++
 .../Keyspace1-Standard1-ic-0-Summary.db         | Bin 194 -> 202 bytes
 test/data/serialization/2.0/db.RowMutation.bin  | Bin 3599 -> 3599 bytes
 .../apache/cassandra/locator/EC2SnitchTest.java |  38 +++++++++++++++++++
 7 files changed, 74 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index f131cda..fe984f9 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -185,6 +185,7 @@ public final class CFMetaData
                                                      + "schema_version uuid,"
                                                      + "release_version text,"
                                                      + "rpc_address inet,"
+                                                     + "preferred_ip inet,"
                                                      + "data_center text,"
                                                      + "rack text"
                                                      + ") WITH COMMENT='known peers in the
cluster'");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index e686f16..ba8f63a 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -269,6 +269,13 @@ public class SystemKeyspace
         forceBlockingFlush(PEERS_CF);
     }
 
+    public static synchronized void updatePreferredIP(InetAddress ep, InetAddress preferred_ip)
+    {
+        String req = "INSERT INTO system.%s (peer, preferred_ip) VALUES ('%s', '%s')";
+        processInternal(String.format(req, PEERS_CF, ep.getHostAddress(), preferred_ip.getHostAddress()));
+        forceBlockingFlush(PEERS_CF);
+    }
+
     public static synchronized void updatePeerInfo(InetAddress ep, String columnName, String
value)
     {
         if (ep.equals(FBUtilities.getBroadcastAddress()))
@@ -393,6 +400,15 @@ public class SystemKeyspace
         return hostIdMap;
     }
 
+    public static InetAddress getPreferredIP(InetAddress ep)
+    {
+        String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
+        UntypedResultSet result = processInternal(String.format(req, PEERS_CF, ep.getHostAddress()));
+        if (!result.isEmpty() && result.one().has("preferred_ip"))
+            return result.one().getInetAddress("preferred_ip");
+        return null;
+    }
+
     /**
      * Return a map of IP addresses containing a map of dc and rack info
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/locator/Ec2Snitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
index a28e2a6..5dc8638 100644
--- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java
+++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URL;
+import java.util.Map;
 
 import com.google.common.base.Charsets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
@@ -44,6 +46,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
     protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone";
     private static final String DEFAULT_DC = "UNKNOWN-DC";
     private static final String DEFAULT_RACK = "UNKNOWN-RACK";
+    private Map<InetAddress, Map<String, String>> savedEndpoints;
     protected String ec2zone;
     protected String ec2region;
 
@@ -93,7 +96,13 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
             return ec2zone;
         EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
+        {
+            if (savedEndpoints == null)
+                savedEndpoints = SystemKeyspace.loadDcRackInfo();
+            if (savedEndpoints.containsKey(endpoint))
+                return savedEndpoints.get(endpoint).get("rack");
             return DEFAULT_RACK;
+        }
         return state.getApplicationState(ApplicationState.RACK).value;
     }
 
@@ -103,7 +112,13 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
             return ec2region;
         EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
         if (state == null || state.getApplicationState(ApplicationState.DC) == null)
+        {
+            if (savedEndpoints == null)
+                savedEndpoints = SystemKeyspace.loadDcRackInfo();
+            if (savedEndpoints.containsKey(endpoint))
+                return savedEndpoints.get(endpoint).get("data_center");
             return DEFAULT_DC;
+        }
         return state.getApplicationState(ApplicationState.DC).value;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index 86476b1..4efb507 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.security.SSLFactory;
@@ -45,6 +46,8 @@ public class OutboundTcpConnectionPool
     OutboundTcpConnectionPool(InetAddress remoteEp)
     {
         id = remoteEp;
+        resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
+
         cmdCon = new OutboundTcpConnection(this);
         cmdCon.start();
         ackCon = new OutboundTcpConnection(this);
@@ -87,6 +90,7 @@ public class OutboundTcpConnectionPool
      */
     public void reset(InetAddress remoteEP)
     {
+        SystemKeyspace.updatePreferredIP(id, remoteEP);
         resetedEndpoint = remoteEP;
         for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon })
             conn.softCloseSocket();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db
----------------------------------------------------------------------
diff --git a/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db b/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db
index e93acef..c1bc2e2 100644
Binary files a/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db
and b/test/data/legacy-sstables/ic/Keyspace1/Keyspace1-Standard1-ic-0-Summary.db differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/test/data/serialization/2.0/db.RowMutation.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin
index c9fcc67..4b525d3 100644
Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin
differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/db4da73e/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
index db79a73..456d3ac 100644
--- a/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
+++ b/test/unit/org/apache/cassandra/locator/EC2SnitchTest.java
@@ -25,19 +25,35 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Map;
 
+import junit.framework.Assert;
+
+import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.OutboundTcpConnectionPool;
 import org.apache.cassandra.service.StorageService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class EC2SnitchTest
 {
     private static String az;
 
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        SchemaLoader.mkdirs();
+        SchemaLoader.cleanup();
+        StorageService.instance.initServer(0);
+    }
+
     private class TestEC2Snitch extends Ec2Snitch
     {
         public TestEC2Snitch() throws IOException, ConfigurationException
@@ -81,4 +97,26 @@ public class EC2SnitchTest
         assertEquals("us-east-2", snitch.getDatacenter(local));
         assertEquals("2d", snitch.getRack(local));
     }
+
+    @Test
+    public void testEc2MRSnitch() throws UnknownHostException
+    {
+        InetAddress me = InetAddress.getByName("127.0.0.2");
+        InetAddress com_ip = InetAddress.getByName("127.0.0.3");
+
+        OutboundTcpConnectionPool pool = MessagingService.instance().getConnectionPool(me);
+        Assert.assertEquals(me, pool.endPoint());
+        pool.reset(com_ip);
+        Assert.assertEquals(com_ip, pool.endPoint());
+
+        MessagingService.instance().destroyConnectionPool(me);
+        pool = MessagingService.instance().getConnectionPool(me);
+        Assert.assertEquals(com_ip, pool.endPoint());
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        StorageService.instance.stopClient();
+    }
 }


Mime
View raw message