Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AE48310F39 for ; Mon, 24 Nov 2014 21:26:19 +0000 (UTC) Received: (qmail 20343 invoked by uid 500); 24 Nov 2014 21:26:18 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 20236 invoked by uid 500); 24 Nov 2014 21:26:18 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 19808 invoked by uid 99); 24 Nov 2014 21:26:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Nov 2014 21:26:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 41EA2A17B3A; Mon, 24 Nov 2014 21:26:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yukim@apache.org To: commits@cassandra.apache.org Date: Mon, 24 Nov 2014 21:26:24 -0000 Message-Id: <933940f737714442810e869964354dcb@git.apache.org> In-Reply-To: <420a6dcc390b4774bc94033c72a128e9@git.apache.org> References: <420a6dcc390b4774bc94033c72a128e9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [7/8] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1 Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: src/java/org/apache/cassandra/repair/RepairJob.java src/java/org/apache/cassandra/repair/RepairSession.java src/java/org/apache/cassandra/service/ActiveRepairService.java src/java/org/apache/cassandra/service/StorageService.java src/java/org/apache/cassandra/service/StorageServiceMBean.java src/java/org/apache/cassandra/tools/NodeCmd.java src/java/org/apache/cassandra/tools/NodeProbe.java src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/326a9ff2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/326a9ff2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/326a9ff2 Branch: refs/heads/cassandra-2.1 Commit: 326a9ff2f831eeafedbc37b7a4b8f8f4a709e399 Parents: eac7781 41469ec Author: Yuki Morishita Authored: Mon Nov 24 15:21:34 2014 -0600 Committer: Yuki Morishita Committed: Mon Nov 24 15:21:34 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../DatacenterAwareRequestCoordinator.java | 73 +++++++++++ .../cassandra/repair/IRequestCoordinator.java | 28 ++++ .../cassandra/repair/IRequestProcessor.java | 23 ++++ .../repair/ParallelRequestCoordinator.java | 49 +++++++ .../org/apache/cassandra/repair/RepairJob.java | 32 ++++- .../cassandra/repair/RepairParallelism.java | 22 ++++ .../apache/cassandra/repair/RepairSession.java | 14 +- .../cassandra/repair/RequestCoordinator.java | 128 ------------------- .../repair/SequentialRequestCoordinator.java | 58 +++++++++ .../cassandra/service/ActiveRepairService.java | 6 +- .../cassandra/service/StorageService.java | 49 +++++-- .../cassandra/service/StorageServiceMBean.java | 20 ++- .../org/apache/cassandra/tools/NodeProbe.java | 29 +++-- .../org/apache/cassandra/tools/NodeTool.java | 14 +- .../repair/RequestCoordinatorTest.java | 124 ++++++++++++++++++ 16 files changed, 499 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index c9e35d5,7519653..fa3ce8a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -26,34 -12,7 +26,35 @@@ Merged from 2.0 * Avoid overlap in L1 when L0 contains many nonoverlapping sstables (CASSANDRA-8211) * Improve PropertyFileSnitch logging (CASSANDRA-8183) - * Abort liveRatio calculation if the memtable is flushed (CASSANDRA-8164) ++ * Add DC-aware sequential repair (CASSANDRA-8193) + + +2.1.2 + * (cqlsh) parse_for_table_meta errors out on queries with undefined + grammars (CASSANDRA-8262) + * (cqlsh) Fix SELECT ... TOKEN() function broken in C* 2.1.1 (CASSANDRA-8258) + * Fix Cassandra crash when running on JDK8 update 40 (CASSANDRA-8209) + * Optimize partitioner tokens (CASSANDRA-8230) + * Improve compaction of repaired/unrepaired sstables (CASSANDRA-8004) + * Make cache serializers pluggable (CASSANDRA-8096) + * Fix issues with CONTAINS (KEY) queries on secondary indexes + (CASSANDRA-8147) + * Fix read-rate tracking of sstables for some queries (CASSANDRA-8239) + * Fix default timestamp in QueryOptions (CASSANDRA-8246) + * Set socket timeout when reading remote version (CASSANDRA-8188) + * Refactor how we track live size (CASSANDRA-7852) + * Make sure unfinished compaction files are removed (CASSANDRA-8124) + * Fix shutdown when run as Windows service (CASSANDRA-8136) + * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031) + * Fix race in RecoveryManagerTest (CASSANDRA-8176) + * Avoid IllegalArgumentException while sorting sstables in + IndexSummaryManager (CASSANDRA-8182) + * Shutdown JVM on file descriptor exhaustion (CASSANDRA-7579) + * Add 'die' policy for commit log and disk failure (CASSANDRA-7927) + * Fix installing as service on Windows (CASSANDRA-8115) + * Fix CREATE TABLE for CQL2 (CASSANDRA-8144) + * Avoid boxing in ColumnStats min/max trackers (CASSANDRA-8109) +Merged from 2.0: * Correctly handle non-text column names in cql3 (CASSANDRA-8178) * Fix deletion for indexes on primary key columns (CASSANDRA-8206) * Add 'nodetool statusgossip' (CASSANDRA-8125) http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairJob.java index 8057ed5,7c791aa..20d5d97 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@@ -73,12 -72,14 +73,14 @@@ public class RepairJo ListeningExecutorService taskExecutor) { this.listener = listener; - this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range); + this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range); - this.isSequential = isSequential; + this.parallelismDegree = parallelismDegree; this.taskExecutor = taskExecutor; - this.treeRequests = new RequestCoordinator(isSequential) + + IRequestProcessor processor = new IRequestProcessor() { - public void send(InetAddress endpoint) + @Override + public void process(InetAddress endpoint) { ValidationRequest request = new ValidationRequest(desc, gcBefore); MessagingService.instance().sendOneWay(request.createMessage(), endpoint); http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairSession.java index 346f3f4,f2b95eb..0580ebb --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@@ -114,20 -110,19 +114,20 @@@ public class RepairSession extends Wrap * * @param range range to repair * @param keyspace name of keyspace - * @param isSequential true if performing repair on snapshots sequentially + * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees - * @param dataCenters the data centers that should be part of the repair; null for all DCs + * @param endpoints the data centers that should be part of the repair; null for all DCs * @param cfnames names of columnfamilies */ - public RepairSession(UUID parentRepairSession, Range range, String keyspace, boolean isSequential, Set endpoints, String... cfnames) - public RepairSession(Range range, String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, String... cfnames) ++ public RepairSession(UUID parentRepairSession, Range range, String keyspace, RepairParallelism parallelismDegree, Set endpoints, String... cfnames) { - this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, cfnames); - this(UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, dataCenters, hosts, cfnames); ++ this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, cfnames); } - public RepairSession(UUID parentRepairSession, UUID id, Range range, String keyspace, boolean isSequential, Set endpoints, String[] cfnames) - public RepairSession(UUID id, Range range, String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, String[] cfnames) ++ public RepairSession(UUID parentRepairSession, UUID id, Range range, String keyspace, RepairParallelism parallelismDegree, Set endpoints, String[] cfnames) { + this.parentRepairSession = parentRepairSession; this.id = id; - this.isSequential = isSequential; + this.parallelismDegree = parallelismDegree; this.keyspace = keyspace; this.cfnames = cfnames; assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; @@@ -284,10 -270,10 +284,10 @@@ // Create and queue a RepairJob for each column family for (String cfname : cfnames) { - RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor); - RepairJob job = new RepairJob(this, id, keyspace, cfname, range, parallelismDegree, taskExecutor); ++ RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, parallelismDegree, taskExecutor); jobs.offer(job); } - + logger.debug("Sending tree requests to endpoints {}", endpoints); jobs.peek().sendTreeRequests(endpoints); // block whatever thread started this session until all requests have been returned: http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 68c2fae,da81e8f..d43143e --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -116,9 -92,9 +116,9 @@@ public class ActiveRepairServic * * @return Future for asynchronous call or null if there is no need to repair */ - public RepairFuture submitRepairSession(UUID parentRepairSession, Range range, String keyspace, boolean isSequential, Set endpoints, String... cfnames) - public RepairFuture submitRepairSession(Range range, String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, String... cfnames) ++ public RepairFuture submitRepairSession(UUID parentRepairSession, Range range, String keyspace, RepairParallelism parallelismDegree, Set endpoints, String... cfnames) { - RepairSession session = new RepairSession(parentRepairSession, range, keyspace, isSequential, endpoints, cfnames); - RepairSession session = new RepairSession(range, keyspace, parallelismDegree, dataCenters, hosts, cfnames); ++ RepairSession session = new RepairSession(parentRepairSession, range, keyspace, parallelismDegree, endpoints, cfnames); if (session.endpoints.isEmpty()) return null; RepairFuture futureTask = new RepairFuture(session); @@@ -152,9 -128,7 +152,9 @@@ // add it to the sessions (avoid NPE in tests) RepairFuture submitArtificialRepairSession(RepairJobDesc desc) { - RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, null, null, new String[]{desc.columnFamily}); + Set neighbours = new HashSet<>(); + neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null)); - RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, false, neighbours, new String[]{desc.columnFamily}); ++ RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, neighbours, new String[]{desc.columnFamily}); sessions.put(session.getId(), session); RepairFuture futureTask = new RepairFuture(session); executor.execute(futureTask); http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 79cea8e,3d42d1c..38cca10 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -2489,86 -2407,112 +2490,106 @@@ public class StorageService extends Not sendNotification(jmxNotification); } - public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection dataCenters, final Collection hosts, final boolean primaryRange, final String... columnFamilies) + public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException { - return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies); ++ return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, fullRepair, columnFamilies); + } + - public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection dataCenters, final Collection hosts, final boolean primaryRange, final String... columnFamilies) ++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) + { - // when repairing only primary range, dataCenter nor hosts can be set - if (primaryRange && (dataCenters != null || hosts != null)) + Collection> ranges; + if (primaryRange) { - throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + // when repairing only primary range, neither dataCenters nor hosts can be set + if (dataCenters == null && hosts == null) + ranges = getPrimaryRanges(keyspace); + // except dataCenters only contain local DC (i.e. -local) + else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) + ranges = getPrimaryRangesWithinDC(keyspace); + else + throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); } - final Collection> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); - return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, columnFamilies); + else + { + ranges = getLocalRanges(keyspace); + } + - return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies); ++ return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, fullRepair, columnFamilies); } - public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection dataCenters, final Collection hosts, final Collection> ranges, final String... columnFamilies) + public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, Collection> ranges, boolean fullRepair, String... columnFamilies) + { ++ return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, ranges, fullRepair, columnFamilies); ++ } ++ ++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, Collection> ranges, boolean fullRepair, String... columnFamilies) + { if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) return 0; - final int cmd = nextRepairCommand.incrementAndGet(); + int cmd = nextRepairCommand.incrementAndGet(); if (ranges.size() > 0) { - if (!FBUtilities.isUnix() && isSequential) - new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, hosts, columnFamilies)).start(); ++ if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL) + { + logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); - isSequential = false; ++ parallelismDegree = RepairParallelism.PARALLEL; + } - new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, fullRepair, columnFamilies)).start(); ++ new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies)).start(); } return cmd; } - public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final boolean primaryRange, final String... columnFamilies) + public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies) { - // when repairing only primary range, you cannot repair only on local DC - if (primaryRange && isLocal) + Collection> ranges; + if (primaryRange) { - throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace); } - final Collection> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace); - return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges, columnFamilies); - } - - public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, boolean isLocal, Collection> ranges, String... columnFamilies) - { - if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) - return 0; - - final int cmd = nextRepairCommand.incrementAndGet(); - if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL) + else { - logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); - parallelismDegree = RepairParallelism.PARALLEL; + ranges = getLocalRanges(keyspace); } - new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, isLocal, columnFamilies)).start(); - return cmd; + + return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies); } - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection dataCenters, final Collection hosts, final String... columnFamilies) + public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection> ranges, boolean fullRepair, String... columnFamilies) { - return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, columnFamilies); ++ return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges, fullRepair, columnFamilies); + } + - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, final String... columnFamilies) ++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, boolean isLocal, Collection> ranges, boolean fullRepair, String... columnFamilies) + { - Collection> repairingRange = createRepairRangeFrom(beginToken, endToken); - - logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", - repairingRange, keyspaceName, columnFamilies); + if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) + return 0; + int cmd = nextRepairCommand.incrementAndGet(); - if (!FBUtilities.isUnix() && isSequential) + if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL) { logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); - isSequential = false; + parallelismDegree = RepairParallelism.PARALLEL; } - new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start(); - return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, columnFamilies); - } - - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) - { - Set dataCenters = null; - if (isLocal) - { - dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); - } - return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, columnFamilies); ++ new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, isLocal, fullRepair, columnFamilies)).start(); + return cmd; } - /** - * Trigger proactive repair for a keyspace and column families. - */ - public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean fullRepair, String... columnFamilies) throws IOException { - forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, columnFamilies); ++ return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, fullRepair, columnFamilies); + } + - public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException ++ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean fullRepair, String... columnFamilies) + { - // primary range repair can only be performed for whole cluster. - // NOTE: we should omit the param but keep API as is for now. - if (isLocal) - { - throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); - } + Collection> repairingRange = createRepairRangeFrom(beginToken, endToken); - forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, false, columnFamilies); + logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", + repairingRange, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, fullRepair, columnFamilies); ++ return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, fullRepair, columnFamilies); } - public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) { Collection> repairingRange = createRepairRangeFrom(beginToken, endToken); @@@ -2616,30 -2567,17 +2637,30 @@@ return repairingRange; } - private FutureTask createRepairTask(final int cmd, final String keyspace, final Collection> ranges, final RepairParallelism parallelismDegree, final boolean isLocal, final String... columnFamilies) + private FutureTask createRepairTask(int cmd, + String keyspace, + Collection> ranges, - boolean isSequential, ++ RepairParallelism parallelismDegree, + boolean isLocal, + boolean fullRepair, + String... columnFamilies) { Set dataCenters = null; if (isLocal) { dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); } - return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, fullRepair, columnFamilies); - return createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, null, columnFamilies); ++ return createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, null, fullRepair, columnFamilies); } - private FutureTask createRepairTask(final int cmd, final String keyspace, final Collection> ranges, final RepairParallelism parallelismDegree, final Collection dataCenters, final Collection hosts, final String... columnFamilies) + private FutureTask createRepairTask(final int cmd, + final String keyspace, + final Collection> ranges, - final boolean isSequential, ++ final RepairParallelism parallelismDegree, + final Collection dataCenters, + final Collection hosts, + final boolean fullRepair, + final String... columnFamilies) { if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) { @@@ -2650,71 -2588,24 +2671,71 @@@ { protected void runMayThrow() throws Exception { - String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (seq=%b, full=%b)", cmd, ranges.size(), keyspace, isSequential, fullRepair); - String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace); ++ String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (parallelism=%s, full=%b)", cmd, ranges.size(), keyspace, parallelismDegree, fullRepair); logger.info(message); sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()}); - if (isSequential && !fullRepair) - List futures = new ArrayList<>(ranges.size()); ++ if (parallelismDegree != RepairParallelism.PARALLEL && !fullRepair) + { + message = "It is not possible to mix sequential repair and incremental repairs."; + logger.error(message); + sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + return; + } + + Set allNeighbors = new HashSet<>(); + Map> rangeToNeighbors = new HashMap<>(); for (Range range : ranges) { - RepairFuture future; try { - future = forceKeyspaceRepair(range, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies); + Set neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts); + rangeToNeighbors.put(range, neighbors); + allNeighbors.addAll(neighbors); } catch (IllegalArgumentException e) { - logger.error("Repair session failed:", e); - sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()}); - continue; + logger.error("Repair failed:", e); + sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + return; } + } + + // Validate columnfamilies + List columnFamilyStores = new ArrayList<>(); + try + { + Iterables.addAll(columnFamilyStores, getValidColumnFamilies(false, false, keyspace, columnFamilies)); + } + catch (IllegalArgumentException e) + { + sendNotification("repair", e.getMessage(), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + return; + } + + UUID parentSession = null; + if (!fullRepair) + { + try + { + parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores); + } + catch (Throwable t) + { + sendNotification("repair", String.format("Repair failed with error %s", t.getMessage()), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + return; + } + } + + List futures = new ArrayList<>(ranges.size()); + String[] cfnames = new String[columnFamilyStores.size()]; + for (int i = 0; i < columnFamilyStores.size(); i++) + { + cfnames[i] = columnFamilyStores.get(i).name; + } + for (Range range : ranges) + { - RepairFuture future = ActiveRepairService.instance.submitRepairSession(parentSession, range, keyspace, isSequential, rangeToNeighbors.get(range), cfnames); ++ RepairFuture future = ActiveRepairService.instance.submitRepairSession(parentSession, range, keyspace, parallelismDegree, rangeToNeighbors.get(range), cfnames); if (future == null) continue; futures.add(future); http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index 8ae44ff,2386fc8..e7d6f14 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -272,14 -259,30 +274,30 @@@ public interface StorageServiceMBean ex * * @return Repair command number, or 0 if nothing to repair */ - public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, final Collection hosts, boolean primaryRange, String... columnFamilies); + public int forceRepairAsync(String keyspace, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException; /** + * Invoke repair asynchronously. + * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. + * Notification format is: + * type: "repair" + * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status + * + * @return Repair command number, or 0 if nothing to repair + */ - public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, boolean primaryRange, String... columnFamilies); ++ public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies); + + /** * Same as forceRepairAsync, but handles a specified range */ - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection dataCenters, final Collection hosts, final String... columnFamilies); + public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean repairedAt, String... columnFamilies) throws IOException; /** + * Same as forceRepairAsync, but handles a specified range + */ - public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, final String... columnFamilies); ++ public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean fullRepair, String... columnFamilies); + + /** * Invoke repair asynchronously. * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. * Notification format is: http://git-wip-us.apache.org/repos/asf/cassandra/blob/326a9ff2/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index d495786,261d416..1d05887 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -244,14 -211,24 +245,19 @@@ public class NodeProbe implements AutoC ssProxy.forceKeyspaceFlush(keyspaceName, columnFamilies); } - public void forceKeyspaceRepair(String keyspaceName, boolean isSequential, boolean isLocal, String... columnFamilies) throws IOException - { - ssProxy.forceKeyspaceRepair(keyspaceName, isSequential, isLocal, columnFamilies); - } - - public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection dataCenters, final Collection hosts, boolean primaryRange, String... columnFamilies) throws IOException + public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException { - forceRepairAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies); ++ forceRepairAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, fullRepair, columnFamilies); + } + - public void forceRepairAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, boolean primaryRange, String... columnFamilies) throws IOException ++ public void forceRepairAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException + { RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies); try { jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); - if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, hosts, primaryRange, fullRepair)) - if (!runner.repairAndWait(ssProxy, parallelismDegree, dataCenters, hosts, primaryRange)) ++ if (!runner.repairAndWait(ssProxy, parallelismDegree, dataCenters, hosts, primaryRange, fullRepair)) failed = true; } catch (Exception e) @@@ -265,22 -242,22 +271,27 @@@ ssProxy.removeNotificationListener(runner); jmxc.removeConnectionNotificationListener(runner); } - catch (Throwable ignored) {} + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + out.println("Exception occurred during clean-up. " + t); + } } } - public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection dataCenters, final Collection hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException + + public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection dataCenters, Collection hosts, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException { - forceRepairRangeAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, startToken, endToken, columnFamilies); ++ forceRepairRangeAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, startToken, endToken, fullRepair, columnFamilies); + } + - public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException ++ public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException + { RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies); try { jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); - if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, hosts, startToken, endToken, fullRepair)) - if (!runner.repairRangeAndWait(ssProxy, parallelismDegree, dataCenters, hosts, startToken, endToken)) ++ if (!runner.repairRangeAndWait(ssProxy, parallelismDegree, dataCenters, hosts, startToken, endToken, fullRepair)) failed = true; } catch (Exception e) @@@ -1287,16 -1070,16 +1298,16 @@@ class RepairRunner implements Notificat this.columnFamilies = columnFamilies; } - public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection dataCenters, Collection hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception - public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, boolean primaryRangeOnly) throws Exception ++ public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception { - cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies); - cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, columnFamilies); ++ cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies); waitForRepair(); return success; } - public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection dataCenters, Collection hosts, String startToken, String endToken, boolean fullRepair) throws Exception - public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection dataCenters, final Collection hosts, String startToken, String endToken) throws Exception ++ public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection dataCenters, Collection hosts, String startToken, String endToken, boolean fullRepair) throws Exception { - cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, hosts, fullRepair, columnFamilies); - cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies); ++ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies); waitForRepair(); return success; }