cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [1/5] git commit: Add nodetool options to repair specific ranges. Patch by brandonwilliams, reviewed by yukim for CASSANDRA-5280
Date Fri, 22 Feb 2013 22:32:07 GMT
Add nodetool options to repair specific ranges.
Patch by brandonwilliams, reviewed by yukim for CASSANDRA-5280


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/00748405
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/00748405
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/00748405

Branch: refs/heads/cassandra-1.2
Commit: 00748405bc374ee8f5ade4891a6a1445df072344
Parents: 4c98854
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Fri Feb 22 15:43:42 2013 -0600
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Fri Feb 22 15:48:18 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/service/AntiEntropyService.java      |    2 +-
 .../apache/cassandra/service/StorageService.java   |   30 +++++++++++-
 .../cassandra/service/StorageServiceMBean.java     |    5 ++
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    9 +++-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   38 +++++++++++++++
 6 files changed, 81 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e1db62..1fe1160 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.1.11
  * cli: Add JMX authentication support (CASSANDRA-5080)
+ * nodetool: ability to repair specific range (CASSANDRA-5280)
 
 
 1.1.10

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index f3ca1c2..5c59954 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -169,7 +169,7 @@ public class AntiEntropyService
                 throw new IllegalArgumentException("Requested range intersects a local range
but is not fully contained in one; this would lead to imprecise repair");
             }
         }
-        if (rangeSuperSet == null || !replicaSets.containsKey(toRepair))
+        if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
             return Collections.emptySet();
 
         Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(rangeSuperSet));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 54d1c0b..05401e0 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1854,11 +1854,16 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
 
     public int forceRepairAsync(final String tableName, final boolean isSequential, final
boolean primaryRange, final String... columnFamilies)
     {
+        final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange())
: getLocalRanges(tableName);
+        return forceRepairAsync(tableName, isSequential, ranges, columnFamilies);
+    }
+
+    public int forceRepairAsync(final String tableName, final boolean isSequential, final
Collection<Range<Token>> ranges, final String... columnFamilies)
+    {
         if (Table.SYSTEM_TABLE.equals(tableName))
             return 0;
 
         final int cmd = nextRepairCommand.incrementAndGet();
-        final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange())
: getLocalRanges(tableName);
         if (ranges.size() > 0)
         {
             new Thread(new WrappedRunnable()
@@ -1872,7 +1877,17 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
                     List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
                     for (Range<Token> range : ranges)
                     {
-                        AntiEntropyService.RepairFuture future = forceTableRepair(range,
tableName, isSequential, columnFamilies);
+                        AntiEntropyService.RepairFuture future;
+                        try
+                        {
+                            future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+                        }
+                        catch (IllegalArgumentException e)
+                        {
+                            message = String.format("Repair session failed with error: %s",
e);
+                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+                            continue;
+                        }
                         if (future == null)
                             continue;
                         futures.add(future);
@@ -1914,6 +1929,17 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         return cmd;
     }
 
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName,
boolean isSequential, final String... columnFamilies)
+    {
+        Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+        Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+
+        logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and
column families {}",
+                new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
+        return forceRepairAsync(tableName, isSequential, Collections.singleton(new Range<Token>(parsedBeginToken,
parsedEndToken)), columnFamilies);
+    }
+
+
     /**
      * Trigger proactive repair for a table and column families.
      * @param tableName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index c34faf3..1261d2a 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -256,6 +256,11 @@ public interface StorageServiceMBean extends NotificationEmitter
     public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange,
String... columnFamilies);
 
     /**
+     * Same as forceRepairAsync, but handles a specified range
+     */
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName,
boolean isSequential, final String... columnFamilies);
+
+    /**
      * Triggers proactive repair for given column families, or all columnfamilies for the
given table
      * if none are explicitly listed.
      * @param tableName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 723bdf8..99cbab1 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -55,6 +55,8 @@ public class NodeCmd
     private static final Pair<String, String> PASSWORD_OPT = new Pair<String, String>("pw",
"password");
     private static final Pair<String, String> TAG_OPT = new Pair<String, String>("t",
"tag");
     private static final Pair<String, String> PRIMARY_RANGE_OPT = new Pair<String,
String>("pr", "partitioner-range");
+    private static final Pair<String, String> START_TOKEN_OPT = new Pair<String,
String>("st", "start-token");
+    private static final Pair<String, String> END_TOKEN_OPT = new Pair<String, String>("et",
"end-token");
     private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new Pair<String,
String>("snapshot", "with-snapshot");
 
     private static final String DEFAULT_HOST = "127.0.0.1";
@@ -76,6 +78,8 @@ public class NodeCmd
         options.addOption(TAG_OPT,      true, "optional name to give a snapshot");
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned
by the partitioner for the node");
         options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots");
+        options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
+        options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
     }
 
     public NodeCmd(NodeProbe probe)
@@ -1041,7 +1045,10 @@ public class NodeCmd
                 case REPAIR  :
                     boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
                     boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
-                    probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange,
columnFamilies);
+                    if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
+                        probe.forceRepairRangeAsync(System.out, keyspace, snapshot, cmd.getOptionValue(START_TOKEN_OPT.left),
cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+                    else
+                        probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange,
columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/00748405/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 036d653..44e64c4 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -231,6 +231,29 @@ public class NodeProbe
         }
     }
 
+    public void forceRepairRangeAsync(final PrintStream out, final String tableName, boolean
isSequential, final String startToken, final String endToken, String... columnFamilies) throws
IOException
+    {
+        RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
+        try
+        {
+            ssProxy.addNotificationListener(runner, null, null);
+            if (!runner.repairRangeAndWait(ssProxy,  isSequential, startToken, endToken))
+                failed = true;
+        }
+        catch (Exception e)
+        {
+            throw new IOException(e) ;
+        }
+        finally
+        {
+            try
+            {
+                ssProxy.removeNotificationListener(runner);
+            }
+            catch (ListenerNotFoundException ignored) {}
+        }
+    }
+
     public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String...
columnFamilies) throws IOException
     {
         ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
@@ -835,6 +858,21 @@ class RepairRunner implements NotificationListener
         return success;
     }
 
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential,
String startToken, String endToken) throws InterruptedException
+    {
+        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential,
columnFamilies);
+        if (cmd > 0)
+        {
+            condition.await();
+        }
+        else
+        {
+            String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()),
keyspace);
+            out.println(message);
+        }
+        return success;
+    }
+
     public void handleNotification(Notification notification, Object handback)
     {
         if ("repair".equals(notification.getType()))


Mime
View raw message