cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [3/8] git commit: Allow repairing between specific replicas patch by Sankalp Kohli; reviewed by Lyuben Todorov for CASSANDRA-6440
Date Tue, 18 Feb 2014 17:37:00 GMT
Allow repairing between specific replicas
patch by Sankalp Kohli; reviewed by Lyuben Todorov for CASSANDRA-6440


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f30b7720
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f30b7720
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f30b7720

Branch: refs/heads/trunk
Commit: f30b772006e43e0c2905638e1e271854f2a71f69
Parents: 500c62d
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Tue Feb 18 11:35:53 2014 -0600
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Tue Feb 18 11:35:53 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/repair/RepairSession.java  |  8 ++--
 .../cassandra/service/ActiveRepairService.java  | 40 ++++++++++++++++++--
 .../cassandra/service/StorageService.java       | 22 +++++------
 .../cassandra/service/StorageServiceMBean.java  |  4 +-
 .../org/apache/cassandra/tools/NodeCmd.java     | 11 +++++-
 .../org/apache/cassandra/tools/NodeProbe.java   | 16 ++++----
 .../service/AntiEntropyServiceTestAbstract.java | 37 ++++++++++++++++--
 8 files changed, 104 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f0c116f..fd1062e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.6
+ * Allow repairing between specific replicas (CASSANDRA-6440)
  * Allow per-dc enabling of hints (CASSANDRA-6157)
  * Add compatibility for Hadoop 0.2.x (CASSANDRA-5201)
  * Fix EstimatedHistogram races (CASSANDRA-6682)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index ebcd3f4..36b7226 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -105,12 +105,12 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      * @param dataCenters the data centers that should be part of the repair; null for all
DCs
      * @param cfnames names of columnfamilies
      */
-    public RepairSession(Range<Token> range, String keyspace, boolean isSequential,
Collection<String> dataCenters, String... cfnames)
+    public RepairSession(Range<Token> range, String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
     {
-        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, cfnames);
+        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, hosts, cfnames);
     }
 
-    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential,
Collection<String> dataCenters, String[] cfnames)
+    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, String[] cfnames)
     {
         this.id = id;
         this.isSequential = isSequential;
@@ -118,7 +118,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
         this.cfnames = cfnames;
         assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't
it";
         this.range = range;
-        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters);
+        this.endpoints = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts);
     }
 
     public UUID getId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index b77f216..00e43ea 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -91,9 +92,9 @@ public class ActiveRepairService
      *
      * @return Future for asynchronous call or null if there is no need to repair
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean
isSequential, Collection<String> dataCenters, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean
isSequential, Collection<String> dataCenters, Collection<String> hosts, String...
cfnames)
     {
-        RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters,
cfnames);
+        RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters,
hosts, cfnames);
         if (session.endpoints.isEmpty())
             return null;
         RepairFuture futureTask = new RepairFuture(session);
@@ -127,7 +128,7 @@ public class ActiveRepairService
     // add it to the sessions (avoid NPE in tests)
     RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
     {
-        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace,
false, null, new String[]{desc.columnFamily});
+        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace,
false, null, null, new String[]{desc.columnFamily});
         sessions.put(session.getId(), session);
         RepairFuture futureTask = new RepairFuture(session);
         executor.execute(futureTask);
@@ -143,7 +144,7 @@ public class ActiveRepairService
      *
      * @return neighbors with whom we share the provided range
      */
-    public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token>
toRepair, Collection<String> dataCenters)
+    public static Set<InetAddress> getNeighbors(String keyspaceName, Range<Token>
toRepair, Collection<String> dataCenters, Collection<String> hosts)
     {
         if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
             throw new IllegalArgumentException("The local data center must be part of the
repair");
@@ -182,6 +183,37 @@ public class ActiveRepairService
             }
             return Sets.intersection(neighbors, dcEndpoints);
         }
+        else if (hosts != null)
+        {
+            Set<InetAddress> specifiedHost = new HashSet<>();
+            for (final String host : hosts)
+            {
+                try
+                {
+                    final InetAddress endpoint = InetAddress.getByName(host.trim());
+                    if (endpoint.equals(FBUtilities.getBroadcastAddress()) || neighbors.contains(endpoint))
+                        specifiedHost.add(endpoint);
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new IllegalArgumentException("Unknown host specified " + host,
e);
+                }
+            }
+
+            if (!specifiedHost.contains(FBUtilities.getBroadcastAddress()))
+                throw new IllegalArgumentException("The current host must be part of the
repair");
+
+            if (specifiedHost.size() <= 1)
+            {
+                String msg = "Repair requires at least two endpoints that are neighbours
before it can continue, the endpoint used for this repair is %s, " +
+                             "other available neighbours are %s but these neighbours were
not part of the supplied list of hosts to use during the repair (%s).";
+                throw new IllegalArgumentException(String.format(msg, specifiedHost, neighbors,
hosts));
+            }
+
+            specifiedHost.remove(FBUtilities.getBroadcastAddress());
+            return specifiedHost;
+
+        }
 
         return neighbors;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index c323a19..4be95b2 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2418,13 +2418,13 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         sendNotification(jmxNotification);
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean isSequential, final
Collection<String> dataCenters, final boolean primaryRange, final String... columnFamilies)
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final
Collection<String> dataCenters, final Collection<String> hosts, final boolean
primaryRange, final String... columnFamilies)
     {
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace)
: getLocalRanges(keyspace);
-        return forceRepairAsync(keyspace, isSequential, dataCenters, ranges, columnFamilies);
+        return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, columnFamilies);
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean isSequential, final
Collection<String> dataCenters, final Collection<Range<Token>> ranges, final
String... columnFamilies)
+    public int forceRepairAsync(final String keyspace, final boolean isSequential, final
Collection<String> dataCenters, final Collection<String> hosts,  final Collection<Range<Token>>
ranges, final String... columnFamilies)
     {
         if (Keyspace.SYSTEM_KS.equals(keyspace) || ranges.isEmpty())
             return 0;
@@ -2432,7 +2432,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         final int cmd = nextRepairCommand.incrementAndGet();
         if (ranges.size() > 0)
         {
-            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters,
columnFamilies)).start();
+            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters,
hosts, columnFamilies)).start();
         }
         return cmd;
     }
@@ -2456,14 +2456,14 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         return cmd;
     }
 
-    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName,
boolean isSequential, Collection<String> dataCenters, final String... columnFamilies)
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName,
boolean isSequential, Collection<String> dataCenters, final Collection<String>
hosts, final String... columnFamilies)
     {
         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
         Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
 
         logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and
column families {}",
                     parsedBeginToken, parsedEndToken, keyspaceName, columnFamilies);
-        return forceRepairAsync(keyspaceName, isSequential, dataCenters, Collections.singleton(new
Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+        return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, Collections.singleton(new
Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
     }
 
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName,
boolean isSequential, boolean isLocal, final String... columnFamilies)
@@ -2517,10 +2517,10 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         {
             dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
         }
-        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, columnFamilies);
+        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, columnFamilies);
     }
 
-    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace,
final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String>
dataCenters, final String... columnFamilies)
+    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace,
final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String>
dataCenters, final Collection<String> hosts, final String... columnFamilies)
     {
         return new FutureTask<Object>(new WrappedRunnable()
         {
@@ -2544,7 +2544,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
                     RepairFuture future;
                     try
                     {
-                        future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters,
columnFamilies);
+                        future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters,
hosts, columnFamilies);
                     }
                     catch (IllegalArgumentException e)
                     {
@@ -2594,7 +2594,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         }, null);
     }
 
-    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String
keyspaceName, boolean isSequential, Collection<String> dataCenters, final String...
columnFamilies) throws IOException
+    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String
keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String>
hosts, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName,
columnFamilies))
@@ -2608,7 +2608,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
             return null;
         }
 
-        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential,
dataCenters, names.toArray(new String[names.size()]));
+        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential,
dataCenters, hosts, names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index f949dcc..ed260b8 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -257,12 +257,12 @@ public interface StorageServiceMBean extends NotificationEmitter
      *
      * @return Repair command number, or 0 if nothing to repair
      */
-    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String>
dataCenters, boolean primaryRange, String... columnFamilies);
+    public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String>
dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
 
     /**
      * Same as forceRepairAsync, but handles a specified range
      */
-    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName,
boolean isSequential, Collection<String> dataCenters, final String... columnFamilies);
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName,
boolean isSequential, Collection<String> dataCenters, final Collection<String>
hosts,  final String... columnFamilies);
 
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index fb29342..bc81615 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -67,6 +67,7 @@ public class NodeCmd
     private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr",
"partitioner-range");
     private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par",
"parallel");
     private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local",
"in-local-dc");
+    private static final Pair<String, String> HOST_REPAIR_OPT = Pair.create("hosts",
"in-host");
     private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc");
     private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st", "start-token");
     private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token");
@@ -97,6 +98,7 @@ public class NodeCmd
         options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel.");
         options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same
datacenter");
         options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified
datacenters (comma separated)");
+        options.addOption(HOST_REPAIR_OPT, true, "only repair against specified nodes (comma
separated)");
         options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
         options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
         options.addOption(UPGRADE_ALL_SSTABLE_OPT, false, "includes sstables that are already
on the most recent version during upgradesstables");
@@ -1626,16 +1628,21 @@ public class NodeCmd
                     boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left);
                     boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
                     boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left);
+                    boolean specificHosts = cmd.hasOption(HOST_REPAIR_OPT.left);
                     boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
                     Collection<String> dataCenters = null;
+                    Collection<String> hosts = null;
+
                     if (specificDC)
                         dataCenters = Arrays.asList(cmd.getOptionValue(DC_REPAIR_OPT.left).split(","));
                     else if (localDC)
                         dataCenters = Arrays.asList(probe.getDataCenter());
+                    else if(specificHosts)
+                        hosts  = Arrays.asList(cmd.getOptionValue(HOST_REPAIR_OPT.left).split(","));
                     if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
-                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters,
cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters,
hosts, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
                     else
-                        probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters,
primaryRange, columnFamilies);
+                        probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters,
hosts, primaryRange, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceKeyspaceFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 28cafb7..594b41b 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -215,14 +215,14 @@ public class NodeProbe
         ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies);
     }
 
-    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean
isSequential, Collection<String> dataCenters, boolean primaryRange, String... columnFamilies)
throws IOException
+    public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean
isSequential, Collection<String> dataCenters, final Collection<String> hosts,
 boolean primaryRange, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, primaryRange))
+            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, hosts, primaryRange))
                 failed = true;
         }
         catch (Exception e)
@@ -240,14 +240,14 @@ public class NodeProbe
         }
     }
 
-    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean
isSequential, Collection<String> dataCenters, final String startToken, final String
endToken, String... columnFamilies) throws IOException
+    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean
isSequential, Collection<String> dataCenters, final Collection<String> hosts,
final String startToken, final String endToken, String... columnFamilies) throws IOException
     {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, startToken,
endToken))
+            if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, hosts, startToken,
endToken))
                 failed = true;
         }
         catch (Exception e)
@@ -1045,16 +1045,16 @@ class RepairRunner implements NotificationListener
         this.columnFamilies = columnFamilies;
     }
 
-    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String>
dataCenters, boolean primaryRangeOnly) throws Exception
+    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String>
dataCenters, final Collection<String> hosts, boolean primaryRangeOnly) throws Exception
     {
-        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, primaryRangeOnly,
columnFamilies);
+        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, hosts, primaryRangeOnly,
columnFamilies);
         waitForRepair();
         return success;
     }
 
-    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential,
Collection<String> dataCenters, String startToken, String endToken) throws Exception
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential,
Collection<String> dataCenters, final Collection<String> hosts, String startToken,
String endToken) throws Exception
     {
-        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential,
dataCenters, columnFamilies);
+        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential,
dataCenters, hosts, columnFamilies);
         waitForRepair();
         return success;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f30b7720/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 1123fc0..eeb297a 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -124,7 +124,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null,
null));
         }
         assertEquals(expected, neighbors);
     }
@@ -147,7 +147,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, null,
null));
         }
         assertEquals(expected, neighbors);
     }
@@ -169,7 +169,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()),
null));
         }
         assertEquals(expected, neighbors);
     }
@@ -197,11 +197,40 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         Set<InetAddress> neighbors = new HashSet<InetAddress>();
         for (Range<Token> range : ranges)
         {
-            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter())));
+            neighbors.addAll(ActiveRepairService.getNeighbors(keyspaceName, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()),
null));
         }
         assertEquals(expected, neighbors);
     }
 
+    @Test
+    public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
+    {
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are
returned
+        addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
+        AbstractReplicationStrategy ars = Keyspace.open(keyspaceName).getReplicationStrategy();
+        List<InetAddress> expected = new ArrayList<>();
+        for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
+        {
+            expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
+        }
+
+        expected.remove(FBUtilities.getBroadcastAddress());
+        Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
+
+       assertEquals(expected.get(0), ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(),
null, hosts).iterator().next());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable
+    {
+        addTokens(2 * Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor());
+        //Dont give local endpoint
+        Collection<String> hosts = Arrays.asList("127.0.0.3");
+        ActiveRepairService.getNeighbors(keyspaceName, StorageService.instance.getLocalRanges(keyspaceName).iterator().next(),
null, hosts);
+    }
+
     Set<InetAddress> addTokens(int max) throws Throwable
     {
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();


Mime
View raw message