Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0E057E85E for ; Fri, 22 Feb 2013 22:32:08 +0000 (UTC) Received: (qmail 23589 invoked by uid 500); 22 Feb 2013 22:32:07 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 23522 invoked by uid 500); 22 Feb 2013 22:32:07 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 23483 invoked by uid 99); 22 Feb 2013 22:32:07 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Feb 2013 22:32:07 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1AA3482EF84; Fri, 22 Feb 2013 22:32:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brandonwilliams@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [3/5] git commit: Merge branch 'cassandra-1.1' into cassandra-1.2 Message-Id: <20130222223207.1AA3482EF84@tyr.zones.apache.org> Date: Fri, 22 Feb 2013 22:32:07 +0000 (UTC) Merge branch 'cassandra-1.1' into cassandra-1.2 Conflicts: CHANGES.txt src/java/org/apache/cassandra/service/StorageService.java src/java/org/apache/cassandra/tools/NodeCmd.java src/java/org/apache/cassandra/tools/NodeProbe.java Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e8bffd0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e8bffd0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e8bffd0 Branch: refs/heads/trunk Commit: 3e8bffd000926b4f2dc92fd2bd4c34f8570e9cc1 Parents: 068b53d 0074840 Author: Brandon Williams Authored: Fri Feb 22 16:31:26 2013 -0600 Committer: Brandon Williams Committed: Fri Feb 22 16:31:26 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 13 +++++- .../cassandra/service/AntiEntropyService.java | 2 +- .../apache/cassandra/service/StorageService.java | 31 +++++++++++- .../cassandra/service/StorageServiceMBean.java | 5 ++ src/java/org/apache/cassandra/tools/NodeCmd.java | 9 +++- src/java/org/apache/cassandra/tools/NodeProbe.java | 38 +++++++++++++++ 6 files changed, 92 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 8e77bf7,1fe1160..42a6e6d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,83 -1,9 +1,94 @@@ -1.1.11 ++1.2.3 ++Merged from 1.1: ++======= ++ * nodetool: ability to repair specific range (CASSANDRA-5280) ++ +1.2.2 + * fix potential for multiple concurrent compactions of the same sstables + (CASSANDRA-5256) + * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199) + * fix symlinks under data dir not working (CASSANDRA-5185) + * fix bug in compact storage metadata handling (CASSANDRA-5189) + * Validate login for USE queries (CASSANDRA-5207) + * cli: remove default username and password (CASSANDRA-5208) + * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694) + * allow configuration of internode socket buffer (CASSANDRA-3378) + * Make sstable directory picking blacklist-aware again (CASSANDRA-5193) + * Correctly expire gossip states for edge cases (CASSANDRA-5216) + * Improve handling of directory creation failures (CASSANDRA-5196) + * Expose secondary indicies to the rest of nodetool (CASSANDRA-4464) + * Binary protocol: avoid sending notification for 0.0.0.0 (CASSANDRA-5227) + * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366) + * CQL3 refactor to allow conversion function (CASSANDRA-5226) + * Fix drop of sstables in some circumstance (CASSANDRA-5232) + * Implement caching of authorization results (CASSANDRA-4295) + * Add support for LZ4 compression (CASSANDRA-5038) + * Fix missing columns in wide rows queries (CASSANDRA-5225) + * Simplify auth setup and make system_auth ks alterable (CASSANDRA-5112) + * Stop compactions from hanging during bootstrap (CASSANDRA-5244) + * fix compressed streaming sending extra chunk (CASSANDRA-5105) + * Add CQL3-based implementations of IAuthenticator and IAuthorizer + (CASSANDRA-4898) + * Fix timestamp-based tomstone removal logic (CASSANDRA-5248) * cli: Add JMX authentication support (CASSANDRA-5080) + * Fix forceFlush behavior (CASSANDRA-5241) + * cqlsh: Add username autocompletion (CASSANDRA-5231) + * Fix CQL3 composite partition key error (CASSANDRA-5240) + * Allow IN clause on last clustering key (CASSANDRA-5230) + + +1.2.1 + * stream undelivered hints on decommission (CASSANDRA-5128) + * GossipingPropertyFileSnitch loads saved dc/rack info if needed (CASSANDRA-5133) + * drain should flush system CFs too (CASSANDRA-4446) + * add inter_dc_tcp_nodelay setting (CASSANDRA-5148) + * re-allow wrapping ranges for start_token/end_token range pairing (CASSANDRA-5106) + * fix validation compaction of empty rows (CASSADRA-5136) + * nodetool methods to enable/disable hint storage/delivery (CASSANDRA-4750) + * disallow bloom filter false positive chance of 0 (CASSANDRA-5013) + * add threadpool size adjustment methods to JMXEnabledThreadPoolExecutor and + CompactionManagerMBean (CASSANDRA-5044) + * fix hinting for dropped local writes (CASSANDRA-4753) + * off-heap cache doesn't need mutable column container (CASSANDRA-5057) + * apply disk_failure_policy to bad disks on initial directory creation + (CASSANDRA-4847) + * Optimize name-based queries to use ArrayBackedSortedColumns (CASSANDRA-5043) + * Fall back to old manifest if most recent is unparseable (CASSANDRA-5041) + * pool [Compressed]RandomAccessReader objects on the partitioned read path + (CASSANDRA-4942) + * Add debug logging to list filenames processed by Directories.migrateFile + method (CASSANDRA-4939) + * Expose black-listed directories via JMX (CASSANDRA-4848) + * Log compaction merge counts (CASSANDRA-4894) + * Minimize byte array allocation by AbstractData{Input,Output} (CASSANDRA-5090) + * Add SSL support for the binary protocol (CASSANDRA-5031) + * Allow non-schema system ks modification for shuffle to work (CASSANDRA-5097) + * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972) + * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101) + * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102) + * Ensure CL guarantees on digest mismatch (CASSANDRA-5113) + * Validate correctly selects on composite partition key (CASSANDRA-5122) + * Fix exception when adding collection (CASSANDRA-5117) + * Handle states for non-vnode clusters correctly (CASSANDRA-5127) + * Refuse unrecognized replication and compaction strategy options (CASSANDRA-4795) + * Pick the correct value validator in sstable2json for cql3 tables (CASSANDRA-5134) + * Validate login for describe_keyspace, describe_keyspaces and set_keyspace + (CASSANDRA-5144) + * Fix inserting empty maps (CASSANDRA-5141) + * Don't remove tokens from System table for node we know (CASSANDRA-5121) + * fix streaming progress report for compresed files (CASSANDRA-5130) + * Coverage analysis for low-CL queries (CASSANDRA-4858) + * Stop interpreting dates as valid timeUUID value (CASSANDRA-4936) + * Adds E notation for floating point numbers (CASSANDRA-4927) + * Detect (and warn) unintentional use of the cql2 thrift methods when cql3 was + intended (CASSANDRA-5172) - Merged from 1.1: ++ ++ ++1.1.11 + * nodetool: ability to repair specific range (CASSANDRA-5280) + + + 1.1.10 * fix saved key cache not loading at startup (CASSANDRA-5166) * fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170) * fix sstable maxtimestamp for row deletes and pre-1.1.1 sstables (CASSANDRA-5153) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/AntiEntropyService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 9ce4bf0,05401e0..11ae98b --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2306,21 -1851,95 +2306,36 @@@ public class StorageService extends Not jmxNotification.setUserData(userObject); sendNotification(jmxNotification); } -- - public int forceRepairAsync(final String tableName, final boolean isSequential, final boolean primaryRange, final String... columnFamilies) + public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies) { - final Collection> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName); - return forceRepairAsync(tableName, isSequential, ranges, columnFamilies); ++ final Collection> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(keyspace); ++ return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies); + } + - public int forceRepairAsync(final String tableName, final boolean isSequential, final Collection> ranges, final String... columnFamilies) ++ public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final Collection> ranges, final String... columnFamilies) + { - if (Table.SYSTEM_TABLE.equals(tableName)) + if (Table.SYSTEM_KS.equals(keyspace) || Tracing.TRACE_KS.equals(keyspace)) return 0; final int cmd = nextRepairCommand.incrementAndGet(); - final Collection> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(keyspace); if (ranges.size() > 0) { - new Thread(new WrappedRunnable() - { - protected void runMayThrow() throws Exception - { - String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), tableName); - logger_.info(message); - sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()}); - - List futures = new ArrayList(ranges.size()); - for (Range range : ranges) - { - AntiEntropyService.RepairFuture future; - try - { - future = forceTableRepair(range, tableName, isSequential, columnFamilies); - } - catch (IllegalArgumentException e) - { - message = String.format("Repair session failed with error: %s", e); - sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); - continue; - } - if (future == null) - continue; - futures.add(future); - // wait for a session to be done with its differencing before starting the next one - try - { - future.session.differencingDone.await(); - } - catch (InterruptedException e) - { - message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise."; - logger_.error(message, e); - sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); - } - } - for (AntiEntropyService.RepairFuture future : futures) - { - try - { - future.get(); - message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString()); - sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()}); - } - catch (ExecutionException e) - { - message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage()); - sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); - } - catch (Exception e) - { - message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage()); - sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); - } - } - sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()}); - } - }).start(); + new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, columnFamilies)).start(); } return cmd; } - public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) ++ public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) + { + Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); + Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken); + - logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", ++ logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", + new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies}); - return forceRepairAsync(tableName, isSequential, Collections.singleton(new Range(parsedBeginToken, parsedEndToken)), columnFamilies); ++ return forceRepairAsync(tableName, isSequential, isLocal, Collections.singleton(new Range(parsedBeginToken, parsedEndToken)), columnFamilies); + } + + /** * Trigger proactive repair for a table and column families. * @param tableName @@@ -2342,78 -2019,28 +2357,88 @@@ Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken); Token parsedEndToken = getPartitioner().getTokenFactory().fromString(endToken); - logger_.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", - new Object[] {parsedBeginToken, parsedEndToken, tableName, columnFamilies}); - AntiEntropyService.RepairFuture future = forceTableRepair(new Range(parsedBeginToken, parsedEndToken), tableName, isSequential, columnFamilies); - if (future == null) + logger.info("starting user-requested repair of range ({}, {}] for keyspace {} and column families {}", + parsedBeginToken, parsedEndToken, tableName, columnFamilies); + forceTableRepairRange(tableName, Collections.singleton(new Range(parsedBeginToken, parsedEndToken)), isSequential, isLocal, columnFamilies); + } + + public void forceTableRepairRange(final String tableName, final Collection> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException + { + if (Table.SYSTEM_KS.equals(tableName) || Tracing.TRACE_KS.equals(tableName)) return; - try - { - future.get(); - } - catch (Exception e) + createRepairTask(nextRepairCommand.incrementAndGet(), tableName, ranges, isSequential, isLocal, columnFamilies).run(); + } + + private FutureTask createRepairTask(final int cmd, final String keyspace, final Collection> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies) + { + FutureTask task = new FutureTask(new WrappedRunnable() { - logger_.error("Repair session " + future.session.getName() + " failed.", e); - } + protected void runMayThrow() throws Exception + { + String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace); + logger.info(message); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()}); + + List futures = new ArrayList(ranges.size()); + for (Range range : ranges) + { - AntiEntropyService.RepairFuture future = forceTableRepair(range, keyspace, isSequential, isLocal, columnFamilies); ++ AntiEntropyService.RepairFuture future; ++ try ++ { ++ future = forceTableRepair(range, keyspace, isSequential, isLocal, columnFamilies); ++ } ++ catch (IllegalArgumentException e) ++ { ++ logger.error("Repair session failed:", e); ++ sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); ++ continue; ++ } + if (future == null) + continue; + futures.add(future); + // wait for a session to be done with its differencing before starting the next one + try + { + future.session.differencingDone.await(); + } + catch (InterruptedException e) + { + message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise."; + logger.error(message, e); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); + } + } + for (AntiEntropyService.RepairFuture future : futures) + { + try + { + future.get(); + message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString()); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()}); + } + catch (ExecutionException e) + { + message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage()); + logger.error(message, e); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); + } + catch (Exception e) + { + message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage()); + logger.error(message, e); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); + } + } + sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()}); + } + }, null); + return task; } - public AntiEntropyService.RepairFuture forceTableRepair(final Range range, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException + public AntiEntropyService.RepairFuture forceTableRepair(final Range range, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException { ArrayList names = new ArrayList(); - for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies)) + for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, tableName, columnFamilies)) { names.add(cfStore.getColumnFamilyName()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index 067d08a,1261d2a..aa0881b --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -261,11 -251,16 +261,16 @@@ public interface StorageServiceMBean ex * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status * * @return Repair command number, or 0 if nothing to repair - * @see #forceTableRepair(String, boolean, String...) + * @see #forceTableRepair(String, boolean, boolean, String...) */ - public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies); + public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, String... columnFamilies); /** + * Same as forceRepairAsync, but handles a specified range + */ - public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies); ++ public int forceRepairRangeAsync(String beginToken, String endToken, final String tableName, boolean isSequential, boolean isLocal, final String... columnFamilies); + + /** * Triggers proactive repair for given column families, or all columnfamilies for the given table * if none are explicitly listed. * @param tableName http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeCmd.java index b74ffb7,99cbab1..9b844b1 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@@ -55,16 -48,16 +55,18 @@@ import org.apache.cassandra.utils.Pair public class NodeCmd { - private static final Pair SNAPSHOT_COLUMNFAMILY_OPT = new Pair("cf", "column-family"); - private static final Pair HOST_OPT = new Pair("h", "host"); - private static final Pair PORT_OPT = new Pair("p", "port"); - private static final Pair USERNAME_OPT = new Pair("u", "username"); - private static final Pair PASSWORD_OPT = new Pair("pw", "password"); - private static final Pair TAG_OPT = new Pair("t", "tag"); - private static final Pair PRIMARY_RANGE_OPT = new Pair("pr", "partitioner-range"); - private static final Pair START_TOKEN_OPT = new Pair("st", "start-token"); - private static final Pair END_TOKEN_OPT = new Pair("et", "end-token"); - private static final Pair SNAPSHOT_REPAIR_OPT = new Pair("snapshot", "with-snapshot"); + private static final Pair SNAPSHOT_COLUMNFAMILY_OPT = Pair.create("cf", "column-family"); + private static final Pair HOST_OPT = Pair.create("h", "host"); + private static final Pair PORT_OPT = Pair.create("p", "port"); + private static final Pair USERNAME_OPT = Pair.create("u", "username"); + private static final Pair PASSWORD_OPT = Pair.create("pw", "password"); + private static final Pair TAG_OPT = Pair.create("t", "tag"); + private static final Pair TOKENS_OPT = Pair.create("T", "tokens"); + private static final Pair PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range"); + private static final Pair SNAPSHOT_REPAIR_OPT = Pair.create("snapshot", "with-snapshot"); + private static final Pair LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc"); ++ private static final Pair START_TOKEN_OPT = Pair.create("st", "start-token"); ++ private static final Pair END_TOKEN_OPT = Pair.create("et", "end-token"); private static final String DEFAULT_HOST = "127.0.0.1"; private static final int DEFAULT_PORT = 7199; @@@ -81,10 -76,10 +83,12 @@@ options.addOption(USERNAME_OPT, true, "remote jmx agent username"); options.addOption(PASSWORD_OPT, true, "remote jmx agent password"); options.addOption(TAG_OPT, true, "optional name to give a snapshot"); + options.addOption(TOKENS_OPT, false, "display all tokens"); options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node"); options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots"); + options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter"); + options.addOption(START_TOKEN_OPT, true, "token at which repair range starts"); + options.addOption(END_TOKEN_OPT, true, "token at which repair range ends"); } public NodeCmd(NodeProbe probe) @@@ -1345,9 -1044,11 +1349,12 @@@ { case REPAIR : boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left); + boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left); boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left); - probe.forceRepairAsync(System.out, keyspace, snapshot, localDC, primaryRange, columnFamilies); + if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left)) - probe.forceRepairRangeAsync(System.out, keyspace, snapshot, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies); ++ probe.forceRepairRangeAsync(System.out, keyspace, snapshot, localDC, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies); + else - probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies); ++ probe.forceRepairAsync(System.out, keyspace, snapshot, localDC, primaryRange, columnFamilies); break; case FLUSH : try { probe.forceTableFlush(keyspace, columnFamilies); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index ed93359,44e64c4..ce4407f --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -235,14 -231,37 +235,37 @@@ public class NodeProb } } - public void forceRepairRangeAsync(final PrintStream out, final String tableName, boolean isSequential, final String startToken, final String endToken, String... columnFamilies) throws IOException ++ public void forceRepairRangeAsync(final PrintStream out, final String tableName, boolean isSequential, boolean isLocal, final String startToken, final String endToken, String... columnFamilies) throws IOException + { + RepairRunner runner = new RepairRunner(out, tableName, columnFamilies); + try + { + ssProxy.addNotificationListener(runner, null, null); - if (!runner.repairRangeAndWait(ssProxy, isSequential, startToken, endToken)) ++ if (!runner.repairRangeAndWait(ssProxy, isSequential, isLocal, startToken, endToken)) + failed = true; + } + catch (Exception e) + { + throw new IOException(e) ; + } + finally + { + try + { + ssProxy.removeNotificationListener(runner); + } + catch (ListenerNotFoundException ignored) {} + } + } + - public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException + public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException { - ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies); + ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, isLocal, columnFamilies); } - public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, String... columnFamilies) throws IOException + public void forceTableRepairRange(String beginToken, String endToken, String tableName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException { - ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, columnFamilies); + ssProxy.forceTableRepairRange(beginToken, endToken, tableName, isSequential, isLocal, columnFamilies); } public void invalidateKeyCache() throws IOException @@@ -929,6 -853,21 +952,21 @@@ class RepairRunner implements Notificat else { String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); + out.println(message); + } + return success; + } + - public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, String startToken, String endToken) throws InterruptedException ++ public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean isLocal, String startToken, String endToken) throws InterruptedException + { - cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, columnFamilies); ++ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, isLocal, columnFamilies); + if (cmd > 0) + { + condition.await(); + } + else + { + String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); out.println(message); } return success;