cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [3/5] git commit: Merge branch 'cassandra-1.1' into cassandra-1.2
Date Fri, 22 Feb 2013 22:32:07 GMT
Merge branch 'cassandra-1.1' into cassandra-1.2

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/service/StorageService.java
	src/java/org/apache/cassandra/tools/NodeCmd.java
	src/java/org/apache/cassandra/tools/NodeProbe.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e8bffd0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e8bffd0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e8bffd0

Branch: refs/heads/trunk
Commit: 3e8bffd000926b4f2dc92fd2bd4c34f8570e9cc1
Parents: 068b53d 0074840
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Fri Feb 22 16:31:26 2013 -0600
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Fri Feb 22 16:31:26 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                        |   13 +++++-
 .../cassandra/service/AntiEntropyService.java      |    2 +-
 .../apache/cassandra/service/StorageService.java   |   31 +++++++++++-
 .../cassandra/service/StorageServiceMBean.java     |    5 ++
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    9 +++-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   38 +++++++++++++++
 6 files changed, 92 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8e77bf7,1fe1160..42a6e6d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,83 -1,9 +1,94 @@@
 -1.1.11
++1.2.3
++Merged from 1.1:
++=======
++ * nodetool: ability to repair specific range (CASSANDRA-5280)
++
 +1.2.2
 + * fix potential for multiple concurrent compactions of the same sstables
 +   (CASSANDRA-5256)
 + * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199)
 + * fix symlinks under data dir not working (CASSANDRA-5185)
 + * fix bug in compact storage metadata handling (CASSANDRA-5189)
 + * Validate login for USE queries (CASSANDRA-5207)
 + * cli: remove default username and password (CASSANDRA-5208)
 + * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694)
 + * allow configuration of internode socket buffer (CASSANDRA-3378)
 + * Make sstable directory picking blacklist-aware again (CASSANDRA-5193)
 + * Correctly expire gossip states for edge cases (CASSANDRA-5216)
 + * Improve handling of directory creation failures (CASSANDRA-5196)
 + * Expose secondary indicies to the rest of nodetool (CASSANDRA-4464)
 + * Binary protocol: avoid sending notification for 0.0.0.0 (CASSANDRA-5227)
 + * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
 + * CQL3 refactor to allow conversion function (CASSANDRA-5226)
 + * Fix drop of sstables in some circumstance (CASSANDRA-5232)
 + * Implement caching of authorization results (CASSANDRA-4295)
 + * Add support for LZ4 compression (CASSANDRA-5038)
 + * Fix missing columns in wide rows queries (CASSANDRA-5225)
 + * Simplify auth setup and make system_auth ks alterable (CASSANDRA-5112)
 + * Stop compactions from hanging during bootstrap (CASSANDRA-5244)
 + * fix compressed streaming sending extra chunk (CASSANDRA-5105)
 + * Add CQL3-based implementations of IAuthenticator and IAuthorizer
 +   (CASSANDRA-4898)
 + * Fix timestamp-based tomstone removal logic (CASSANDRA-5248)
   * cli: Add JMX authentication support (CASSANDRA-5080)
 + * Fix forceFlush behavior (CASSANDRA-5241)
 + * cqlsh: Add username autocompletion (CASSANDRA-5231)
 + * Fix CQL3 composite partition key error (CASSANDRA-5240)
 + * Allow IN clause on last clustering key (CASSANDRA-5230)
 +
 +
 +1.2.1
 + * stream undelivered hints on decommission (CASSANDRA-5128)
 + * GossipingPropertyFileSnitch loads saved dc/rack info if needed (CASSANDRA-5133)
 + * drain should flush system CFs too (CASSANDRA-4446)
 + * add inter_dc_tcp_nodelay setting (CASSANDRA-5148)
 + * re-allow wrapping ranges for start_token/end_token range pairing (CASSANDRA-5106)
 + * fix validation compaction of empty rows (CASSADRA-5136)
 + * nodetool methods to enable/disable hint storage/delivery (CASSANDRA-4750)
 + * disallow bloom filter false positive chance of 0 (CASSANDRA-5013)
 + * add threadpool size adjustment methods to JMXEnabledThreadPoolExecutor and 
 +   CompactionManagerMBean (CASSANDRA-5044)
 + * fix hinting for dropped local writes (CASSANDRA-4753)
 + * off-heap cache doesn't need mutable column container (CASSANDRA-5057)
 + * apply disk_failure_policy to bad disks on initial directory creation 
 +   (CASSANDRA-4847)
 + * Optimize name-based queries to use ArrayBackedSortedColumns (CASSANDRA-5043)
 + * Fall back to old manifest if most recent is unparseable (CASSANDRA-5041)
 + * pool [Compressed]RandomAccessReader objects on the partitioned read path
 +   (CASSANDRA-4942)
 + * Add debug logging to list filenames processed by Directories.migrateFile 
 +   method (CASSANDRA-4939)
 + * Expose black-listed directories via JMX (CASSANDRA-4848)
 + * Log compaction merge counts (CASSANDRA-4894)
 + * Minimize byte array allocation by AbstractData{Input,Output} (CASSANDRA-5090)
 + * Add SSL support for the binary protocol (CASSANDRA-5031)
 + * Allow non-schema system ks modification for shuffle to work (CASSANDRA-5097)
 + * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972)
 + * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101)
 + * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102)
 + * Ensure CL guarantees on digest mismatch (CASSANDRA-5113)
 + * Validate correctly selects on composite partition key (CASSANDRA-5122)
 + * Fix exception when adding collection (CASSANDRA-5117)
 + * Handle states for non-vnode clusters correctly (CASSANDRA-5127)
 + * Refuse unrecognized replication and compaction strategy options (CASSANDRA-4795)
 + * Pick the correct value validator in sstable2json for cql3 tables (CASSANDRA-5134)
 + * Validate login for describe_keyspace, describe_keyspaces and set_keyspace
 +   (CASSANDRA-5144)
 + * Fix inserting empty maps (CASSANDRA-5141)
 + * Don't remove tokens from System table for node we know (CASSANDRA-5121)
 + * fix streaming progress report for compresed files (CASSANDRA-5130)
 + * Coverage analysis for low-CL queries (CASSANDRA-4858)
 + * Stop interpreting dates as valid timeUUID value (CASSANDRA-4936)
 + * Adds E notation for floating point numbers (CASSANDRA-4927)
 + * Detect (and warn) unintentional use of the cql2 thrift methods when cql3 was
 +   intended (CASSANDRA-5172)
- Merged from 1.1:
++
++
++1.1.11
+  * nodetool: ability to repair specific range (CASSANDRA-5280)
+ 
+ 
+ 1.1.10
   * fix saved key cache not loading at startup (CASSANDRA-5166)
   * fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170)
   * fix sstable maxtimestamp for row deletes and pre-1.1.1 sstables (CASSANDRA-5153)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 9ce4bf0,05401e0..11ae98b
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2306,21 -1851,95 +2306,36 @@@ public class StorageService extends Not
          jmxNotification.setUserData(userObject);
          sendNotification(jmxNotification);
      }
--
 -    public int forceRepairAsync(final String tableName, final boolean isSequential, final
boolean primaryRange, final String... columnFamilies)
 +    public int forceRepairAsync(final String keyspace, final boolean isSequential, final
boolean isLocal, final boolean primaryRange, final String... columnFamilies)
      {
 -        final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange())
: getLocalRanges(tableName);
 -        return forceRepairAsync(tableName, isSequential, ranges, columnFamilies);
++        final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange())
: getLocalRanges(keyspace);
++        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
+     }
+ 
 -    public int forceRepairAsync(final String tableName, final boolean isSequential, final
Collection<Range<Token>> ranges, final String... columnFamilies)
++    public int forceRepairAsync(final String keyspace, final boolean isSequential, final
boolean isLocal, final Collection<Range<Token>> ranges, final String... columnFamilies)
+     {
 -        if (Table.SYSTEM_TABLE.equals(tableName))
 +        if (Table.SYSTEM_KS.equals(keyspace) || Tracing.TRACE_KS.equals(keyspace))
              return 0;
  
          final int cmd = nextRepairCommand.incrementAndGet();
-         final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange())
: getLocalRanges(keyspace);
          if (ranges.size() > 0)
          {
 -            new Thread(new WrappedRunnable()
 -            {
 -                protected void runMayThrow() throws Exception
 -                {
 -                    String message = String.format("Starting repair command #%d, repairing
%d ranges for keyspace %s", cmd, ranges.size(), tableName);
 -                    logger_.info(message);
 -                    sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
 -
 -                    List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
 -                    for (Range<Token> range : ranges)
 -                    {
 -                        AntiEntropyService.RepairFuture future;
 -                        try
 -                        {
 -                            future = forceTableRepair(range, tableName, isSequential, columnFamilies);
 -                        }
 -                        catch (IllegalArgumentException e)
 -                        {
 -                            message = String.format("Repair session failed with error: %s",
e);
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 -                            continue;
 -                        }
 -                        if (future == null)
 -                            continue;
 -                        futures.add(future);
 -                        // wait for a session to be done with its differencing before starting
the next one
 -                        try
 -                        {
 -                            future.session.differencingDone.await();
 -                        }
 -                        catch (InterruptedException e)
 -                        {
 -                            message = "Interrupted while waiting for the differencing of
repair session " + future.session + " to be done. Repair may be imprecise.";
 -                            logger_.error(message, e);
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 -                        }
 -                    }
 -                    for (AntiEntropyService.RepairFuture future : futures)
 -                    {
 -                        try
 -                        {
 -                            future.get();
 -                            message = String.format("Repair session %s for range %s finished",
future.session.getName(), future.session.getRange().toString());
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
 -                        }
 -                        catch (ExecutionException e)
 -                        {
 -                            message = String.format("Repair session %s for range %s failed
with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 -                        }
 -                        catch (Exception e)
 -                        {
 -                            message = String.format("Repair session %s for range %s failed
with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
 -                            sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 -                        }
 -                    }
 -                    sendNotification("repair", String.format("Repair command #%d finished",
cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
 -                }
 -            }).start();
 +            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, columnFamilies)).start();
          }
          return cmd;
      }
  
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName,
boolean isSequential, final String... columnFamilies)
++    public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName,
boolean isSequential, boolean isLocal, final String... columnFamilies)
+     {
+         Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
+         Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
+ 
 -        logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and
column families {}",
++        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and
column families {}",
+                 new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
 -        return forceRepairAsync(tableName, isSequential, Collections.singleton(new Range<Token>(parsedBeginToken,
parsedEndToken)), columnFamilies);
++        return forceRepairAsync(tableName, isSequential, isLocal, Collections.singleton(new
Range<Token>(parsedBeginToken, parsedEndToken)), columnFamilies);
+     }
+ 
+ 
      /**
       * Trigger proactive repair for a table and column families.
       * @param tableName
@@@ -2342,78 -2019,28 +2357,88 @@@
          Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
          Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken);
  
 -        logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and
column families {}",
 -                     new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies});
 -        AntiEntropyService.RepairFuture future = forceTableRepair(new Range<Token>(parsedBeginToken,
parsedEndToken), tableName, isSequential, columnFamilies);
 -        if (future == null)
 +        logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and
column families {}",
 +                    parsedBeginToken, parsedEndToken, tableName, columnFamilies);
 +        forceTableRepairRange(tableName, Collections.singleton(new Range<Token>(parsedBeginToken,
parsedEndToken)), isSequential, isLocal, columnFamilies);
 +    }
 +
 +    public void forceTableRepairRange(final String tableName, final Collection<Range<Token>>
ranges, boolean isSequential, boolean  isLocal, final String... columnFamilies) throws IOException
 +    {
 +        if (Table.SYSTEM_KS.equals(tableName) || Tracing.TRACE_KS.equals(tableName))
              return;
 -        try
 -        {
 -            future.get();
 -        }
 -        catch (Exception e)
 +        createRepairTask(nextRepairCommand.incrementAndGet(), tableName, ranges, isSequential,
isLocal, columnFamilies).run();
 +    }
 +
 +    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace,
final Collection<Range<Token>> ranges, final boolean isSequential, final boolean
isLocal, final String... columnFamilies)
 +    {
 +        FutureTask<Object> task = new FutureTask<Object>(new WrappedRunnable()
          {
 -            logger_.error("Repair session " + future.session.getName() + " failed.", e);
 -        }
 +            protected void runMayThrow() throws Exception
 +            {
 +                String message = String.format("Starting repair command #%d, repairing %d
ranges for keyspace %s", cmd, ranges.size(), keyspace);
 +                logger.info(message);
 +                sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()});
 +
 +                List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
 +                for (Range<Token> range : ranges)
 +                {
-                     AntiEntropyService.RepairFuture future = forceTableRepair(range, keyspace,
isSequential, isLocal, columnFamilies);
++                    AntiEntropyService.RepairFuture future;
++                    try
++                    {
++                        future = forceTableRepair(range, keyspace, isSequential, isLocal,
columnFamilies);
++                    }
++                    catch (IllegalArgumentException e)
++                    {
++                        logger.error("Repair session failed:", e);
++                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++                        continue;
++                    }
 +                    if (future == null)
 +                        continue;
 +                    futures.add(future);
 +                    // wait for a session to be done with its differencing before starting
the next one
 +                    try
 +                    {
 +                        future.session.differencingDone.await();
 +                    }
 +                    catch (InterruptedException e)
 +                    {
 +                        message = "Interrupted while waiting for the differencing of repair
session " + future.session + " to be done. Repair may be imprecise.";
 +                        logger.error(message, e);
 +                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 +                    }
 +                }
 +                for (AntiEntropyService.RepairFuture future : futures)
 +                {
 +                    try
 +                    {
 +                        future.get();
 +                        message = String.format("Repair session %s for range %s finished",
future.session.getName(), future.session.getRange().toString());
 +                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
 +                    }
 +                    catch (ExecutionException e)
 +                    {
 +                        message = String.format("Repair session %s for range %s failed with
error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage());
 +                        logger.error(message, e);
 +                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 +                    }
 +                    catch (Exception e)
 +                    {
 +                        message = String.format("Repair session %s for range %s failed with
error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage());
 +                        logger.error(message, e);
 +                        sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
 +                    }
 +                }
 +                sendNotification("repair", String.format("Repair command #%d finished",
cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
 +            }
 +        }, null);
 +        return task;
      }
  
 -    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range,
final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
 +    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range,
final String tableName, boolean isSequential, boolean  isLocal, final String... columnFamilies)
throws IOException
      {
          ArrayList<String> names = new ArrayList<String>();
 -        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
 +        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName,
columnFamilies))
          {
              names.add(cfStore.getColumnFamilyName());
          }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 067d08a,1261d2a..aa0881b
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -261,11 -251,16 +261,16 @@@ public interface StorageServiceMBean ex
       *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
       *
       * @return Repair command number, or 0 if nothing to repair
 -     * @see #forceTableRepair(String, boolean, String...)
 +     * @see #forceTableRepair(String, boolean, boolean, String...)
       */
 -    public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange,
String... columnFamilies);
 +    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal,
boolean primaryRange, String... columnFamilies);
  
      /**
+      * Same as forceRepairAsync, but handles a specified range
+      */
 -    public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName,
boolean isSequential, final String... columnFamilies);
++    public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName,
boolean isSequential, boolean isLocal, final String... columnFamilies);
+ 
+     /**
       * Triggers proactive repair for given column families, or all columnfamilies for the
given table
       * if none are explicitly listed.
       * @param tableName

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeCmd.java
index b74ffb7,99cbab1..9b844b1
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@@ -55,16 -48,16 +55,18 @@@ import org.apache.cassandra.utils.Pair
  
  public class NodeCmd
  {
 -    private static final Pair<String, String> SNAPSHOT_COLUMNFAMILY_OPT = new Pair<String,
String>("cf", "column-family");
 -    private static final Pair<String, String> HOST_OPT = new Pair<String, String>("h",
"host");
 -    private static final Pair<String, String> PORT_OPT = new Pair<String, String>("p",
"port");
 -    private static final Pair<String, String> USERNAME_OPT = new Pair<String, String>("u",
 "username");
 -    private static final Pair<String, String> PASSWORD_OPT = new Pair<String, String>("pw",
"password");
 -    private static final Pair<String, String> TAG_OPT = new Pair<String, String>("t",
"tag");
 -    private static final Pair<String, String> PRIMARY_RANGE_OPT = new Pair<String,
String>("pr", "partitioner-range");
 -    private static final Pair<String, String> START_TOKEN_OPT = new Pair<String,
String>("st", "start-token");
 -    private static final Pair<String, String> END_TOKEN_OPT = new Pair<String,
String>("et", "end-token");
 -    private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new Pair<String,
String>("snapshot", "with-snapshot");
 +    private static final Pair<String, String> SNAPSHOT_COLUMNFAMILY_OPT = Pair.create("cf",
"column-family");
 +    private static final Pair<String, String> HOST_OPT = Pair.create("h", "host");
 +    private static final Pair<String, String> PORT_OPT = Pair.create("p", "port");
 +    private static final Pair<String, String> USERNAME_OPT = Pair.create("u", "username");
 +    private static final Pair<String, String> PASSWORD_OPT = Pair.create("pw", "password");
 +    private static final Pair<String, String> TAG_OPT = Pair.create("t", "tag");
 +    private static final Pair<String, String> TOKENS_OPT = Pair.create("T", "tokens");
 +    private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr",
"partitioner-range");
 +    private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = Pair.create("snapshot",
"with-snapshot");
 +    private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local",
"in-local-dc");
++    private static final Pair<String, String> START_TOKEN_OPT = Pair.create("st",
"start-token");
++    private static final Pair<String, String> END_TOKEN_OPT = Pair.create("et", "end-token");
  
      private static final String DEFAULT_HOST = "127.0.0.1";
      private static final int DEFAULT_PORT = 7199;
@@@ -81,10 -76,10 +83,12 @@@
          options.addOption(USERNAME_OPT, true, "remote jmx agent username");
          options.addOption(PASSWORD_OPT, true, "remote jmx agent password");
          options.addOption(TAG_OPT,      true, "optional name to give a snapshot");
 +        options.addOption(TOKENS_OPT,   false, "display all tokens");
          options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned
by the partitioner for the node");
          options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots");
 +        options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the
same datacenter");
+         options.addOption(START_TOKEN_OPT, true, "token at which repair range starts");
+         options.addOption(END_TOKEN_OPT, true, "token at which repair range ends");
      }
  
      public NodeCmd(NodeProbe probe)
@@@ -1345,9 -1044,11 +1349,12 @@@
              {
                  case REPAIR  :
                      boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
 +                    boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
                      boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left);
-                     probe.forceRepairAsync(System.out, keyspace, snapshot, localDC, primaryRange,
columnFamilies);
+                     if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
 -                        probe.forceRepairRangeAsync(System.out, keyspace, snapshot, cmd.getOptionValue(START_TOKEN_OPT.left),
cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
++                        probe.forceRepairRangeAsync(System.out, keyspace, snapshot, localDC,
cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+                     else
 -                        probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange,
columnFamilies);
++                        probe.forceRepairAsync(System.out, keyspace, snapshot, localDC,
primaryRange, columnFamilies);
                      break;
                  case FLUSH   :
                      try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index ed93359,44e64c4..ce4407f
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -235,14 -231,37 +235,37 @@@ public class NodeProb
          }
      }
  
 -    public void forceRepairRangeAsync(final PrintStream out, final String tableName, boolean
isSequential, final String startToken, final String endToken, String... columnFamilies) throws
IOException
++    public void forceRepairRangeAsync(final PrintStream out, final String tableName, boolean
isSequential, boolean isLocal, final String startToken, final String endToken, String... columnFamilies)
throws IOException
+     {
+         RepairRunner runner = new RepairRunner(out, tableName, columnFamilies);
+         try
+         {
+             ssProxy.addNotificationListener(runner, null, null);
 -            if (!runner.repairRangeAndWait(ssProxy,  isSequential, startToken, endToken))
++            if (!runner.repairRangeAndWait(ssProxy,  isSequential, isLocal, startToken,
endToken))
+                 failed = true;
+         }
+         catch (Exception e)
+         {
+             throw new IOException(e) ;
+         }
+         finally
+         {
+             try
+             {
+                 ssProxy.removeNotificationListener(runner);
+             }
+             catch (ListenerNotFoundException ignored) {}
+         }
+     }
+ 
 -    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String...
columnFamilies) throws IOException
 +    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean
isLocal, String... columnFamilies) throws IOException
      {
 -        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
 +        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, isLocal, columnFamilies);
      }
  
 -    public void forceTableRepairRange(String beginToken, String endToken, String tableName,
boolean isSequential, String... columnFamilies) throws IOException
 +    public void forceTableRepairRange(String beginToken, String endToken, String tableName,
boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException
      {
 -        ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies);
 +        ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, isLocal,
columnFamilies);
      }
  
      public void invalidateKeyCache() throws IOException
@@@ -929,6 -853,21 +952,21 @@@ class RepairRunner implements Notificat
          else
          {
              String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()),
keyspace);
+             out.println(message);
+         }
+         return success;
+     }
+ 
 -    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential,
String startToken, String endToken) throws InterruptedException
++    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential,
boolean isLocal, String startToken, String endToken) throws InterruptedException
+     {
 -        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential,
columnFamilies);
++        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential,
isLocal, columnFamilies);
+         if (cmd > 0)
+         {
+             condition.await();
+         }
+         else
+         {
+             String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()),
keyspace);
              out.println(message);
          }
          return success;


Mime
View raw message