cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Add command to rebuild node without merkle tree calculations
Date Mon, 30 Jan 2012 15:24:02 GMT
Updated Branches:
  refs/heads/trunk 8a5436a2b -> 2deee7a4e


Add command to rebuild node without merkle tree calculations

patch by scode; reviewed by jbellis and slebresne for CASSANDRA-3483


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2deee7a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2deee7a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2deee7a4

Branch: refs/heads/trunk
Commit: 2deee7a4e70c3e17344f7bbd53e9a72a8ac99dba
Parents: 8a5436a
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Jan 30 16:22:39 2012 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon Jan 30 16:22:39 2012 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../org/apache/cassandra/dht/BootStrapper.java     |  105 +------
 .../org/apache/cassandra/dht/RangeStreamer.java    |  245 +++++++++++++++
 .../apache/cassandra/service/StorageService.java   |   19 +-
 .../cassandra/service/StorageServiceMBean.java     |    9 +
 .../apache/cassandra/streaming/OperationType.java  |    3 +-
 .../cassandra/streaming/StreamInSession.java       |    2 +-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    6 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |    5 +
 .../org/apache/cassandra/dht/BootStrapperTest.java |   43 ++--
 10 files changed, 318 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0a303f5..eec1583 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,8 @@
  * Add wide row support for ColumnFamilyInputFormat (CASSANDRA-3264)
  * Allow extending CompositeType comparator (CASSANDRA-3657)
  * Avoids over-paging during get_count (CASSANDRA-3798)
+ * Add new command to rebuild a node without (repair) merkle tree calculations
+   (CASSANDRA-3483)
 
 
 1.0.8

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 5b84867..6108214 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -58,9 +58,9 @@ public class BootStrapper
 {
     private static final Logger logger = LoggerFactory.getLogger(BootStrapper.class);
 
-    /* endpoints that need to be bootstrapped */
+    /* endpoint that needs to be bootstrapped */
     protected final InetAddress address;
-    /* tokens of the nodes being bootstrapped. */
+    /* token of the node being bootstrapped. */
     protected final Token<?> token;
     protected final TokenMetadata tokenMetadata;
     private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of
30s
@@ -80,52 +80,17 @@ public class BootStrapper
         if (logger.isDebugEnabled())
             logger.debug("Beginning bootstrap process");
 
-        final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>>
rangesToFetch = HashMultimap.create();
+        RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, OperationType.BOOTSTRAP);
+        streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
 
-        int requests = 0;
         for (String table : Schema.instance.getNonSystemTables())
         {
-            Map<InetAddress, Collection<Range<Token>>> workMap = getWorkMap(getRangesWithSources(table)).asMap();
-            for (Map.Entry<InetAddress, Collection<Range<Token>>> entry
: workMap.entrySet())
-            {
-                requests++;
-                rangesToFetch.put(table, entry);
-            }
-        }
-
-        final CountDownLatch latch = new CountDownLatch(requests);
-        for (final String table : rangesToFetch.keySet())
-        {
-            /* Send messages to respective folks to stream data over to me */
-            for (Map.Entry<InetAddress, Collection<Range<Token>>> entry
: rangesToFetch.get(table))
-            {
-                final InetAddress source = entry.getKey();
-                Collection<Range<Token>> ranges = entry.getValue();
-                final Runnable callback = new Runnable()
-                {
-                    public void run()
-                    {
-                        latch.countDown();
-                        if (logger.isDebugEnabled())
-                            logger.debug(String.format("Removed %s/%s as a bootstrap source;
remaining is %s",
-                                                       source, table, latch.getCount()));
-                    }
-                };
-                if (logger.isDebugEnabled())
-                    logger.debug("Bootstrapping from " + source + " ranges " + StringUtils.join(ranges,
", "));
-                StreamIn.requestRanges(source, table, ranges, callback, OperationType.BOOTSTRAP);
-            }
+            AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
+            streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, token,
address));
         }
 
-        try
-        {
-            latch.await();
-            StorageService.instance.finishBootstrapping();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
+        streamer.fetch();
+        StorageService.instance.finishBootstrapping();
     }
 
     /**
@@ -197,31 +162,6 @@ public class BootStrapper
         return maxEndpoint;
     }
 
-    /** get potential sources for each range, ordered by proximity (as determined by EndpointSnitch)
*/
-    Multimap<Range<Token>, InetAddress> getRangesWithSources(String table)
-    {
-        assert tokenMetadata.sortedTokens().size() > 0;
-        final AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy();
-        Collection<Range<Token>> myRanges = strat.getPendingAddressRanges(tokenMetadata,
token, address);
-
-        Multimap<Range<Token>, InetAddress> myRangeAddresses = ArrayListMultimap.create();
-        Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(tokenMetadata);
-        for (Range<Token> myRange : myRanges)
-        {
-            for (Range<Token> range : rangeAddresses.keySet())
-            {
-                if (range.contains(myRange))
-                {
-                    List<InetAddress> preferred = DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(address,
rangeAddresses.get(range));
-                    myRangeAddresses.putAll(myRange, preferred);
-                    break;
-                }
-            }
-            assert myRangeAddresses.keySet().contains(myRange);
-        }
-        return myRangeAddresses;
-    }
-
     static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
     {
         Message message = new Message(FBUtilities.getBroadcastAddress(),
@@ -244,35 +184,6 @@ public class BootStrapper
         throw new RuntimeException("Bootstrap failed, could not obtain token from: " + maxEndpoint);
     }
 
-    public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>,
InetAddress> rangesWithSourceTarget)
-    {
-        return getWorkMap(rangesWithSourceTarget, FailureDetector.instance);
-    }
-
-    static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>,
InetAddress> rangesWithSourceTarget, IFailureDetector failureDetector)
-    {
-        /*
-         * Map whose key is the source node and the value is a map whose key is the
-         * target and value is the list of ranges to be sent to it.
-        */
-        Multimap<InetAddress, Range<Token>> sources = ArrayListMultimap.create();
-
-        // TODO look for contiguous ranges and map them to the same source
-        for (Range<Token> range : rangesWithSourceTarget.keySet())
-        {
-            for (InetAddress source : rangesWithSourceTarget.get(range))
-            {
-                // ignore the local IP...
-                if (failureDetector.isAlive(source) && !source.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    sources.put(source, range);
-                    break;
-                }
-            }
-        }
-        return sources;
-    }
-
     public static class BootstrapTokenVerbHandler implements IVerbHandler
     {
         public void doVerb(Message message, String id)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
new file mode 100644
index 0000000..9b2b41a
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetector;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.streaming.StreamIn;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Assists in streaming ranges to a node.
+ */
+public class RangeStreamer
+{
+    private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class);
+
+    private final TokenMetadata metadata;
+    private final InetAddress address;
+    private final OperationType opType;
+    private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>>
toFetch = HashMultimap.create();
+    private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>();
+
+    /**
+     * A filter applied to sources to stream from when constructing a fetch map.
+     */
+    public static interface ISourceFilter
+    {
+        public boolean shouldInclude(InetAddress endpoint);
+    }
+
+    /**
+     * Source filter which excludes any endpoints that are not alive according to a
+     * failure detector.
+     */
+    public static class FailureDetectorSourceFilter implements ISourceFilter
+    {
+        private final IFailureDetector fd;
+
+        public FailureDetectorSourceFilter(IFailureDetector fd)
+        {
+            this.fd = fd;
+        }
+
+        public boolean shouldInclude(InetAddress endpoint)
+        {
+            return fd.isAlive(endpoint);
+        }
+    }
+
+    /**
+     * Source filter which excludes any endpoints that are not in a specific data center.
+     */
+    public static class SingleDatacenterFilter implements ISourceFilter
+    {
+        private final String sourceDc;
+        private final IEndpointSnitch snitch;
+
+        public SingleDatacenterFilter(IEndpointSnitch snitch, String sourceDc)
+        {
+            this.sourceDc = sourceDc;
+            this.snitch = snitch;
+        }
+
+        public boolean shouldInclude(InetAddress endpoint)
+        {
+            return snitch.getDatacenter(endpoint).equals(sourceDc);
+        }
+    }
+
+    public RangeStreamer(TokenMetadata metadata, InetAddress address, OperationType opType)
+    {
+        this.metadata = metadata;
+        this.address = address;
+        this.opType = opType;
+    }
+
+    public void addSourceFilter(ISourceFilter filter)
+    {
+        sourceFilters.add(filter);
+    }
+
+    public void addRanges(String table, Collection<Range<Token>> ranges)
+    {
+        Multimap<Range<Token>, InetAddress> rangesForTable = getAllRangesWithSourcesFor(table,
ranges);
+
+        if (logger.isDebugEnabled())
+        {
+            for (Map.Entry<Range<Token>, InetAddress> entry: rangesForTable.entries())
+                logger.debug(String.format("%s: range %s exists on %s", opType, entry.getKey(),
entry.getValue()));
+        }
+
+        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : getRangeFetchMap(rangesForTable,
sourceFilters).asMap().entrySet())
+        {
+            if (logger.isDebugEnabled())
+            {
+                for (Range r : entry.getValue())
+                    logger.debug(String.format("%s: range %s from source %s for table %s",
opType, r, entry.getKey(), table));
+            }
+            toFetch.put(table, entry);
+        }
+    }
+
+    /**
+     * Get a map of all ranges and their respective sources that are candidates for streaming
the given ranges
+     * to us. For each range, the list of sources is sorted by proximity relative to the
given destAddress.
+     */
+    private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String
table, Collection<Range<Token>> desiredRanges)
+    {
+        AbstractReplicationStrategy strat = Table.open(table).getReplicationStrategy();
+        Multimap<Range<Token>, InetAddress> rangeAddresses = strat.getRangeAddresses(metadata);
+
+        Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create();
+        for (Range<Token> desiredRange : desiredRanges)
+        {
+            for (Range<Token> range : rangeAddresses.keySet())
+            {
+                if (range.contains(desiredRange))
+                {
+                    List<InetAddress> preferred = DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(address,
rangeAddresses.get(range));
+                    rangeSources.putAll(desiredRange, preferred);
+                    break;
+                }
+            }
+
+            if (!rangeSources.keySet().contains(desiredRange))
+                throw new IllegalStateException("No sources found for " + desiredRange);
+        }
+
+        return rangeSources;
+    }
+
+    /**
+     * @param rangesWithSources The ranges we want to fetch (key) and their potential sources
(value)
+     * @param sourceFilters A (possibly empty) collection of source filters to apply. In
addition to any filters given
+     *                      here, we always exclude ourselves.
+     * @return
+     */
+    private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>,
InetAddress> rangesWithSources,
+                                                                        Collection<ISourceFilter>
sourceFilters)
+    {
+        Multimap<InetAddress, Range<Token>> rangeFetchMapMap = HashMultimap.create();
+        for (Range<Token> range : rangesWithSources.keySet())
+        {
+            boolean foundSource = false;
+
+            outer:
+            for (InetAddress address : rangesWithSources.get(range))
+            {
+                if (address.equals(FBUtilities.getBroadcastAddress()))
+                    continue;
+
+                for (ISourceFilter filter : sourceFilters)
+                {
+                    if (!filter.shouldInclude(address))
+                        continue outer;
+                }
+
+                rangeFetchMapMap.put(address, range);
+                foundSource = true;
+            }
+
+            if (!foundSource)
+                throw new IllegalStateException("unable to find sufficient sources for streaming
range " + range);
+        }
+
+        return rangeFetchMapMap;
+    }
+
+    public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>,
InetAddress> rangesWithSourceTarget)
+    {
+        return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new
FailureDetectorSourceFilter(FailureDetector.instance)));
+    }
+
+    // For testing purposes
+    Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>>
toFetch()
+    {
+        return toFetch;
+    }
+
+    public void fetch()
+    {
+        final CountDownLatch latch = new CountDownLatch(toFetch().entries().size());
+        for (Map.Entry<String, Map.Entry<InetAddress, Collection<Range<Token>>>>
entry : toFetch.entries())
+        {
+            final String table = entry.getKey();
+            final InetAddress source = entry.getValue().getKey();
+            Collection<Range<Token>> ranges = entry.getValue().getValue();
+            /* Send messages to respective folks to stream data over to me */
+            Runnable callback = new Runnable()
+            {
+                public void run()
+                {
+                    latch.countDown();
+                    if (logger.isDebugEnabled())
+                        logger.debug(String.format("Removed %s/%s as a %s source; remaining
is %s",
+                                     source, table, opType, latch.getCount()));
+                }
+            };
+            if (logger.isDebugEnabled())
+                logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges,
", "));
+            StreamIn.requestRanges(source, table, ranges, callback, opType);
+        }
+
+        try
+        {
+            latch.await();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4ebeddc..441a9bc 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -44,6 +44,8 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.migration.AddKeyspace;
+import org.apache.cassandra.db.migration.Migration;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
@@ -667,6 +669,21 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         return joined;
     }
 
+    public void rebuild(String sourceDc)
+    {
+        logger_.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
+
+        RangeStreamer streamer = new RangeStreamer(tokenMetadata_, FBUtilities.getBroadcastAddress(),
OperationType.REBUILD);
+        streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
+        if (sourceDc != null)
+            streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(),
sourceDc));
+
+        for (String table : Schema.instance.getNonSystemTables())
+            streamer.addRanges(table, getLocalRanges(table));
+
+        streamer.fetch();
+    }
+
     public void setStreamThroughputMbPerSec(int value)
     {
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value);
@@ -2289,7 +2306,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
             // associating table with range-to-endpoints map
             rangesToStreamByTable.put(table, rangeWithEndpoints);
 
-            Multimap<InetAddress, Range<Token>> workMap = BootStrapper.getWorkMap(rangesToFetchWithPreferredEndpoints);
+            Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
             rangesToFetch.put(table, workMap);
 
             if (logger_.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 9d0ed6e..6903cb2 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -345,6 +345,15 @@ public interface StorageServiceMBean
     public boolean isIncrementalBackupsEnabled();
     public void setIncrementalBackupsEnabled(boolean value);
 
+    /**
+     * Initiate a process of streaming data for which we are responsible from other nodes.
It is similar to bootstrap
+     * except meant to be used on a node which is already in the cluster (typically containing
no data) as an
+     * alternative to running repair.
+     *
+     * @param sourceDc Name of DC from which to select sources for streaming or null to pick
any node
+     */
+    public void rebuild(String sourceDc);
+
     public void bulkLoad(String directory);
 
     public void rescheduleFailedDeletions();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/streaming/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/OperationType.java b/src/java/org/apache/cassandra/streaming/OperationType.java
index ddf60ad..beb226b 100644
--- a/src/java/org/apache/cassandra/streaming/OperationType.java
+++ b/src/java/org/apache/cassandra/streaming/OperationType.java
@@ -27,6 +27,7 @@ public enum OperationType
     BOOTSTRAP,
     UNBOOTSTRAP,
     RESTORE_REPLICA_COUNT,
-    BULK_LOAD;
+    BULK_LOAD,
+    REBUILD
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 4e552ee..2e25436 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -111,7 +111,7 @@ public class StreamInSession
     public void finished(PendingFile remoteFile, SSTableReader reader) throws IOException
     {
         if (logger.isDebugEnabled())
-            logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
+            logger.debug("Finished {} (from {}). Sending ack to {}", new Object[] {remoteFile,
getHost(), this});
 
         assert reader != null;
         readers.add(reader);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 7692cd0..520b78f 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -102,6 +102,7 @@ public class NodeCmd
         JOIN,
         MOVE,
         NETSTATS,
+        REBUILD,
         REFRESH,
         REMOVETOKEN,
         REPAIR,
@@ -155,6 +156,7 @@ public class NodeCmd
         addCmdHelp(header, "setcompactionthroughput <value_in_mb>", "Set the MB/s throughput
cap for compaction in the system, or 0 to disable throttling.");
         addCmdHelp(header, "setstreamthroughput <value_in_mb>", "Set the MB/s throughput
cap for streaming in the system, or 0 to disable throttling.");
         addCmdHelp(header, "describering [keyspace]", "Shows the token ranges info of a given
keyspace.");
+        addCmdHelp(header, "rebuild [src-dc-name]", "Rebuild data by streaming from other
nodes (similarly to bootstrap)");
 
         // Two args
         addCmdHelp(header, "snapshot [keyspaces...] -t [snapshotName]", "Take a snapshot
of the specified keyspaces using optional name snapshotName");
@@ -702,6 +704,10 @@ public class NodeCmd
                 case SETSTREAMTHROUGHPUT :
                     if (arguments.length != 1) { badUse("Missing value argument."); }
                     probe.setStreamThroughput(Integer.valueOf(arguments[0]));
+
+                case REBUILD :
+                    if (arguments.length > 1) { badUse("Too many arguments."); }
+                    probe.rebuild(arguments.length == 1 ? arguments[0] : null);
                     break;
 
                 case REMOVETOKEN :

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 621a9f6..22678ae 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -638,6 +638,11 @@ public class NodeProbe
     {
         return ssProxy.describeRingJMX(keyspaceName);
     }
+
+    public void rebuild(String sourceDc)
+    {
+        ssProxy.rebuild(sourceDc);
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2deee7a4/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 1095dd3..72e5bf9 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -22,25 +22,26 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.HashMap;
+import java.util.Set;
 import java.util.Map;
 
-import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.config.Schema;
-import org.apache.commons.lang.StringUtils;
-import static org.junit.Assert.assertEquals;
 import org.junit.Test;
 
-import com.google.common.collect.Multimap;
-
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.OperationType;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.junit.Assert.assertEquals;
+
 public class BootStrapperTest extends CleanupHelper
 {
     @Test
@@ -164,17 +165,7 @@ public class BootStrapperTest extends CleanupHelper
 
         TokenMetadata tmd = ss.getTokenMetadata();
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        BootStrapper b = new BootStrapper(myEndpoint, myToken, tmd);
-        Multimap<Range<Token>, InetAddress> res = b.getRangesWithSources(table);
-        
-        int transferCount = 0;
-        for (Map.Entry<Range<Token>, Collection<InetAddress>> e : res.asMap().entrySet())
-        {
-            assert e.getValue() != null && e.getValue().size() > 0 : StringUtils.join(e.getValue(),
", ");
-            transferCount++;
-        }
-
-        assertEquals(replicationFactor, transferCount);
+        RangeStreamer s = new RangeStreamer(tmd, myEndpoint, OperationType.BOOTSTRAP);
         IFailureDetector mockFailureDetector = new IFailureDetector()
         {
             public boolean isAlive(InetAddress ep)
@@ -189,12 +180,22 @@ public class BootStrapperTest extends CleanupHelper
             public void remove(InetAddress ep) { throw new UnsupportedOperationException();
}
             public void clear(InetAddress ep) { throw new UnsupportedOperationException();
}
         };
-        Multimap<InetAddress, Range<Token>> temp = BootStrapper.getWorkMap(res,
mockFailureDetector);
+        s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
+        s.addRanges(table, Table.open(table).getReplicationStrategy().getPendingAddressRanges(tmd,
myToken, myEndpoint));
+
+        Collection<Map.Entry<InetAddress, Collection<Range<Token>>>>
toFetch = s.toFetch().get(table);
+
+        // Check we get get RF new ranges in total
+        Set<Range<Token>> ranges = new HashSet<Range<Token>>();
+        for (Map.Entry<InetAddress, Collection<Range<Token>>> e : toFetch)
+            ranges.addAll(e.getValue());
+
+        assertEquals(replicationFactor, ranges.size());
+
         // there isn't any point in testing the size of these collections for any specific
size.  When a random partitioner
         // is used, they will vary.
-        assert temp.keySet().size() > 0;
-        assert temp.asMap().values().iterator().next().size() > 0;
-        assert !temp.keySet().iterator().next().equals(myEndpoint);
+        assert toFetch.iterator().next().getValue().size() > 0;
+        assert !toFetch.iterator().next().getKey().equals(myEndpoint);
     }
 
     private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException


Mime
View raw message