cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/5] git commit: (Hadoop) allow ACFRW to limit nodes to local DC
Date Sun, 17 Aug 2014 17:50:49 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk fe0572778 -> 4e334ab82


(Hadoop) allow ACFRW to limit nodes to local DC

patch by Robbie Strickland; reviewed by Aleksey Yeschenko for
CASSANDRA-7252


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b87741c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b87741c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b87741c0

Branch: refs/heads/trunk
Commit: b87741c077e74b2ae3fda3da2417dc1965c0c4ed
Parents: 115bbe4
Author: Robbie Strickland <rostrickland@gmail.com>
Authored: Sun Aug 17 20:40:39 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Sun Aug 17 20:40:39 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/client/RingCache.java  | 63 ++++++++++----------
 .../apache/cassandra/hadoop/ConfigHelper.java   | 11 ++++
 3 files changed, 45 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b87741c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 987c227..94169c1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * (Hadoop) allow ACFRW to limit nodes to local DC (CASSANDRA-7252)
  * (cqlsh) Wait up to 10 sec for a tracing session (CASSANDRA-7222)
  * Fix NPE in FileCacheService.sizeInBytes (CASSANDRA-7756)
  * (cqlsh) cqlsh should automatically disable tracing when selecting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b87741c0/src/java/org/apache/cassandra/client/RingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/client/RingCache.java b/src/java/org/apache/cassandra/client/RingCache.java
index 3308471..cc9b1b2 100644
--- a/src/java/org/apache/cassandra/client/RingCache.java
+++ b/src/java/org/apache/cassandra/client/RingCache.java
@@ -61,44 +61,47 @@ public class RingCache
 
     public void refreshEndpointMap()
     {
-            try {
+        try
+        {
+            Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
 
-                Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
+            String keyspace = ConfigHelper.getOutputKeyspace(conf);
+            List<TokenRange> ring = ConfigHelper.getOutputLocalDCOnly(conf)
+                                  ? client.describe_local_ring(keyspace)
+                                  : client.describe_ring(keyspace);
+            rangeMap = ArrayListMultimap.create();
 
-                List<TokenRange> ring = client.describe_ring(ConfigHelper.getOutputKeyspace(conf));
-                rangeMap = ArrayListMultimap.create();
-
-                for (TokenRange range : ring)
+            for (TokenRange range : ring)
+            {
+                Token<?> left = partitioner.getTokenFactory().fromString(range.start_token);
+                Token<?> right = partitioner.getTokenFactory().fromString(range.end_token);
+                Range<Token> r = new Range<Token>(left, right, partitioner);
+                for (String host : range.endpoints)
                 {
-                    Token<?> left = partitioner.getTokenFactory().fromString(range.start_token);
-                    Token<?> right = partitioner.getTokenFactory().fromString(range.end_token);
-                    Range<Token> r = new Range<Token>(left, right, partitioner);
-                    for (String host : range.endpoints)
+                    try
+                    {
+                        rangeMap.put(r, InetAddress.getByName(host));
+                    }
+                    catch (UnknownHostException e)
                     {
-                        try
-                        {
-                            rangeMap.put(r, InetAddress.getByName(host));
-                        }
-                        catch (UnknownHostException e)
-                        {
-                            throw new AssertionError(e); // host strings are IPs
-                        }
+                        throw new AssertionError(e); // host strings are IPs
                     }
                 }
             }
-            catch (InvalidRequestException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (TException e)
-            {
-                logger.debug("Error contacting seed list" + ConfigHelper.getOutputInitialAddress(conf)
+ " " + e.getMessage());
-            }
         }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (TException e)
+        {
+            logger.debug("Error contacting seed list" + ConfigHelper.getOutputInitialAddress(conf)
+ " " + e.getMessage());
+        }
+    }
 
     /** ListMultimap promises to return a List for get(K) */
     public List<InetAddress> getEndpoint(Range<Token> range)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b87741c0/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
index f062bfc..10cfe8e 100644
--- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
@@ -67,6 +67,7 @@ public class ConfigHelper
     private static final String WRITE_CONSISTENCY_LEVEL = "cassandra.consistencylevel.write";
     private static final String OUTPUT_COMPRESSION_CLASS = "cassandra.output.compression.class";
     private static final String OUTPUT_COMPRESSION_CHUNK_LENGTH = "cassandra.output.compression.length";
+    private static final String OUTPUT_LOCAL_DC_ONLY = "cassandra.output.local.dc.only";
     private static final String THRIFT_FRAMED_TRANSPORT_SIZE_IN_MB = "cassandra.thrift.framed.size_mb";
 
     private static final Logger logger = LoggerFactory.getLogger(ConfigHelper.class);
@@ -513,6 +514,16 @@ public class ConfigHelper
         }
     }
 
+    public static boolean getOutputLocalDCOnly(Configuration conf)
+    {
+        return Boolean.parseBoolean(conf.get(OUTPUT_LOCAL_DC_ONLY, "false"));
+    }
+
+    public static void setOutputLocalDCOnly(Configuration conf, boolean localDCOnly)
+    {
+        conf.set(OUTPUT_LOCAL_DC_ONLY, Boolean.toString(localDCOnly));
+    }
+
     public static Cassandra.Client getClientFromInputAddressList(Configuration conf) throws
IOException
     {
         return getClientFromAddressList(conf, ConfigHelper.getInputInitialAddress(conf).split(","),
ConfigHelper.getInputRpcPort(conf));


Mime
View raw message