cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject cassandra git commit: Add support to rebuild from targeted replicas
Date Wed, 24 Aug 2016 16:26:38 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 9484783a8 -> 824cb768d


Add support to rebuild from targeted replicas

Patch by Geoffrey Yu; Reviewed by Paulo Motta for CASSANDRA-9875


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/824cb768
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/824cb768
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/824cb768

Branch: refs/heads/trunk
Commit: 824cb768d2aa45e1889653ab2c98cc0bc63e594e
Parents: 9484783
Author: Geoffrey Yu <geoffrey_yu@apple.com>
Authored: Tue Aug 9 21:36:30 2016 -0700
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Aug 24 17:25:47 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/dht/RangeStreamer.java | 18 ++++++++
 .../cassandra/service/StorageService.java       | 47 +++++++++++++++++++-
 .../cassandra/service/StorageServiceMBean.java  |  2 +-
 .../org/apache/cassandra/tools/NodeProbe.java   |  4 +-
 .../cassandra/tools/nodetool/Rebuild.java       |  7 ++-
 6 files changed, 73 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 12bedfa..af7d0dd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
  * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
  * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
  * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index ee2d792..282ff04 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -122,6 +122,24 @@ public class RangeStreamer
         }
     }
 
+    /**
+     * Source filter which only includes endpoints contained within a provided set.
+     */
+    public static class WhitelistedSourcesFilter implements ISourceFilter
+    {
+        private final Set<InetAddress> whitelistedSources;
+
+        public WhitelistedSourcesFilter(Set<InetAddress> whitelistedSources)
+        {
+            this.whitelistedSources = whitelistedSources;
+        }
+
+        public boolean shouldInclude(InetAddress endpoint)
+        {
+            return whitelistedSources.contains(endpoint);
+        }
+    }
+
     public RangeStreamer(TokenMetadata metadata,
                          Collection<Token> tokens,
                          InetAddress address,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index bc67ac9..7eb21c0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1114,10 +1114,10 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
 
     public void rebuild(String sourceDc)
     {
-        rebuild(sourceDc, null, null);
+        rebuild(sourceDc, null, null, null);
     }
 
-    public void rebuild(String sourceDc, String keyspace, String tokens)
+    public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources)
     {
         // check on going rebuild
         if (!isRebuilding.compareAndSet(false, true))
@@ -1176,6 +1176,49 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
                     if (tokenScanner.hasNext())
                         throw new IllegalArgumentException("Unexpected string: " + tokenScanner.next());
                 }
+
+                // Ensure all specified ranges are actually ranges owned by this host
+                Collection<Range<Token>> localRanges = getLocalRanges(keyspace);
+                for (Range<Token> specifiedRange : ranges)
+                {
+                    boolean foundParentRange = false;
+                    for (Range<Token> localRange : localRanges)
+                    {
+                        if (localRange.contains(specifiedRange))
+                        {
+                            foundParentRange = true;
+                            break;
+                        }
+                    }
+                    if (!foundParentRange)
+                    {
+                        throw new IllegalArgumentException(String.format("The specified range
%s is not a range that is owned by this node. Please ensure that all token ranges specified
to be rebuilt belong to this node.", specifiedRange.toString()));
+                    }
+                }
+
+                if (specificSources != null)
+                {
+                    String[] stringHosts = specificSources.split(",");
+                    Set<InetAddress> sources = new HashSet<>(stringHosts.length);
+                    for (String stringHost : stringHosts)
+                    {
+                        try
+                        {
+                            InetAddress endpoint = InetAddress.getByName(stringHost);
+                            if (FBUtilities.getBroadcastAddress().equals(endpoint))
+                            {
+                                throw new IllegalArgumentException("This host was specified
as a source for rebuilding. Sources for a rebuild can only be other nodes in the cluster.");
+                            }
+                            sources.add(endpoint);
+                        }
+                        catch (UnknownHostException ex)
+                        {
+                            throw new IllegalArgumentException("Unknown host specified "
+ stringHost, ex);
+                        }
+                    }
+                    streamer.addSourceFilter(new RangeStreamer.WhitelistedSourcesFilter(sources));
+                }
+
                 streamer.addRanges(keyspace, ranges);
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0f93177..d6f6bd6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -580,7 +580,7 @@ public interface StorageServiceMBean extends NotificationEmitter
      * @param tokens Range of tokens to rebuild or null to rebuild all token ranges. In the
format of:
      *               "(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]"
      */
-    public void rebuild(String sourceDc, String keyspace, String tokens);
+    public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources);
 
     /** Starts a bulk load and blocks until it completes. */
     public void bulkLoad(String directory);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 89e7bda..5d6dff0 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1132,9 +1132,9 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.describeRingJMX(keyspaceName);
     }
 
-    public void rebuild(String sourceDc, String keyspace, String tokens)
+    public void rebuild(String sourceDc, String keyspace, String tokens, String specificSources)
     {
-        ssProxy.rebuild(sourceDc, keyspace, tokens);
+        ssProxy.rebuild(sourceDc, keyspace, tokens, specificSources);
     }
 
     public List<String> sampleKeyRange()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/824cb768/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
index 865f9fe..b27e674 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Rebuild.java
@@ -41,6 +41,11 @@ public class Rebuild extends NodeToolCmd
             description = "Use -ts to rebuild specific token ranges, in the format of \"(start_token_1,end_token_1],(start_token_2,end_token_2],...(start_token_n,end_token_n]\".")
     private String tokens = null;
 
+    @Option(title = "specific_sources",
+            name = {"-s", "--sources"},
+            description = "Use -s to specify hosts that this node should stream from when
-ts is used. Multiple hosts should be separated using commas (e.g. 127.0.0.1,127.0.0.2,...)")
+    private String specificSources = null;
+
     @Override
     public void execute(NodeProbe probe)
     {
@@ -50,6 +55,6 @@ public class Rebuild extends NodeToolCmd
             throw new IllegalArgumentException("Cannot specify tokens without keyspace.");
         }
 
-        probe.rebuild(sourceDataCenterName, keyspace, tokens);
+        probe.rebuild(sourceDataCenterName, keyspace, tokens, specificSources);
     }
 }
\ No newline at end of file


Mime
View raw message