cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [3/8] cassandra git commit: Add DC-aware sequential repair
Date Mon, 24 Nov 2014 21:26:20 GMT
Add DC-aware sequential repair

patch by Jimmy Mårdell; reviewed by yukim for CASSANDRA-8193


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/41469ecf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/41469ecf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/41469ecf

Branch: refs/heads/trunk
Commit: 41469ecf6a27e94441f96ef905ed3b5354c23987
Parents: 17de36f
Author: Jimmy Mårdell <yarin@spotify.com>
Authored: Mon Nov 24 15:07:33 2014 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Mon Nov 24 15:09:41 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../DatacenterAwareRequestCoordinator.java      |  73 +++++++++++
 .../cassandra/repair/IRequestCoordinator.java   |  28 ++++
 .../cassandra/repair/IRequestProcessor.java     |  23 ++++
 .../repair/ParallelRequestCoordinator.java      |  49 +++++++
 .../org/apache/cassandra/repair/RepairJob.java  |  32 ++++-
 .../cassandra/repair/RepairParallelism.java     |  22 ++++
 .../apache/cassandra/repair/RepairSession.java  |  14 +-
 .../cassandra/repair/RequestCoordinator.java    | 128 -------------------
 .../repair/SequentialRequestCoordinator.java    |  58 +++++++++
 .../cassandra/service/ActiveRepairService.java  |   6 +-
 .../cassandra/service/StorageService.java       |  64 ++++++----
 .../cassandra/service/StorageServiceMBean.java  |  19 ++-
 .../org/apache/cassandra/tools/NodeCmd.java     |  21 ++-
 .../org/apache/cassandra/tools/NodeProbe.java   |  30 +++--
 .../apache/cassandra/tools/NodeToolHelp.yaml    |   1 +
 .../repair/RequestCoordinatorTest.java          | 124 ++++++++++++++++++
 17 files changed, 506 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fe23248..7519653 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,7 @@
  * Allow concurrent writing of the same table in the same JVM using
    CQLSSTableWriter (CASSANDRA-7463)
  * Fix totalDiskSpaceUsed calculation (CASSANDRA-8205)
+ * Add DC-aware sequential repair (CASSANDRA-8193)
 
 
 2.0.11:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/DatacenterAwareRequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/DatacenterAwareRequestCoordinator.java b/src/java/org/apache/cassandra/repair/DatacenterAwareRequestCoordinator.java
new file mode 100644
index 0000000..ab3e03e
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/DatacenterAwareRequestCoordinator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+public class DatacenterAwareRequestCoordinator implements IRequestCoordinator<InetAddress>
+{
+    private Map<String, Queue<InetAddress>> requestsByDatacenter = new HashMap<>();
+    private int remaining = 0;
+    private final IRequestProcessor<InetAddress> processor;
+
+    protected DatacenterAwareRequestCoordinator(IRequestProcessor<InetAddress> processor)
+    {
+        this.processor = processor;
+    }
+
+    public void add(InetAddress request)
+    {
+        String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(request);
+        Queue<InetAddress> queue = requestsByDatacenter.get(dc);
+        if (queue == null)
+        {
+            queue = new LinkedList<>();
+            requestsByDatacenter.put(dc, queue);
+        }
+        queue.add(request);
+        remaining++;
+    }
+
+    public void start()
+    {
+        for (Queue<InetAddress> requests : requestsByDatacenter.values())
+        {
+            if (!requests.isEmpty())
+              processor.process(requests.peek());
+        }
+    }
+
+    // Returns how many request remains
+    public int completed(InetAddress request)
+    {
+        String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(request);
+        Queue<InetAddress> requests = requestsByDatacenter.get(dc);
+        assert requests != null;
+        assert request.equals(requests.peek());
+        requests.poll();
+        if (!requests.isEmpty())
+            processor.process(requests.peek());
+        return --remaining;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/IRequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/IRequestCoordinator.java b/src/java/org/apache/cassandra/repair/IRequestCoordinator.java
new file mode 100644
index 0000000..db2ffe3
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/IRequestCoordinator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+public interface IRequestCoordinator<R>
+{
+    void add(R request);
+
+    void start();
+
+    // Returns how many request remains
+    int completed(R request);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/IRequestProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/IRequestProcessor.java b/src/java/org/apache/cassandra/repair/IRequestProcessor.java
new file mode 100644
index 0000000..c7b49d7
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/IRequestProcessor.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+public interface IRequestProcessor<R>
+{
+    void process(R request);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/ParallelRequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ParallelRequestCoordinator.java b/src/java/org/apache/cassandra/repair/ParallelRequestCoordinator.java
new file mode 100644
index 0000000..839bb43
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/ParallelRequestCoordinator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class ParallelRequestCoordinator<R> implements IRequestCoordinator<R>
+{
+    private final Set<R> requests = new HashSet<>();
+    private final IRequestProcessor<R> processor;
+
+    public ParallelRequestCoordinator(IRequestProcessor<R> processor)
+    {
+        this.processor = processor;
+    }
+
+    @Override
+    public void add(R request) { requests.add(request); }
+
+    @Override
+    public void start()
+    {
+        for (R request : requests)
+            processor.process(request);
+    }
+
+    @Override
+    public int completed(R request)
+    {
+        requests.remove(request);
+        return requests.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 931f95a..7c791aa 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -43,9 +43,9 @@ public class RepairJob
     private static Logger logger = LoggerFactory.getLogger(RepairJob.class);
 
     public final RepairJobDesc desc;
-    private final boolean isSequential;
+    private final RepairParallelism parallelismDegree;
     // first we send tree requests. this tracks the endpoints remaining to hear from
-    private final RequestCoordinator<InetAddress> treeRequests;
+    private final IRequestCoordinator<InetAddress> treeRequests;
     // tree responses are then tracked here
     private final List<TreeResponse> trees = new ArrayList<>();
     // once all responses are received, each tree is compared with each other, and differencer tasks
@@ -68,21 +68,38 @@ public class RepairJob
                      String keyspace,
                      String columnFamily,
                      Range<Token> range,
-                     boolean isSequential,
+                     RepairParallelism parallelismDegree,
                      ListeningExecutorService taskExecutor)
     {
         this.listener = listener;
         this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range);
-        this.isSequential = isSequential;
+        this.parallelismDegree = parallelismDegree;
         this.taskExecutor = taskExecutor;
-        this.treeRequests = new RequestCoordinator<InetAddress>(isSequential)
+
+        IRequestProcessor<InetAddress> processor = new IRequestProcessor<InetAddress>()
         {
-            public void send(InetAddress endpoint)
+            @Override
+            public void process(InetAddress endpoint)
             {
                 ValidationRequest request = new ValidationRequest(desc, gcBefore);
                 MessagingService.instance().sendOneWay(request.createMessage(), endpoint);
             }
         };
+
+        switch (parallelismDegree)
+        {
+            case SEQUENTIAL:
+                this.treeRequests = new SequentialRequestCoordinator<>(processor);
+                break;
+            case PARALLEL:
+                this.treeRequests = new ParallelRequestCoordinator<>(processor);
+                break;
+            case DATACENTER_AWARE:
+                this.treeRequests = new DatacenterAwareRequestCoordinator(processor);
+                break;
+            default:
+                throw new AssertionError("Unknown degree of parallelism specified");
+        }
     }
 
     /**
@@ -102,7 +119,8 @@ public class RepairJob
         List<InetAddress> allEndpoints = new ArrayList<>(endpoints);
         allEndpoints.add(FBUtilities.getBroadcastAddress());
 
-        if (isSequential)
+        // Create a snapshot at all nodes unless we're using pure parallel repairs
+        if (parallelismDegree != RepairParallelism.PARALLEL)
         {
             List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size());
             for (InetAddress endpoint : allEndpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/RepairParallelism.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairParallelism.java b/src/java/org/apache/cassandra/repair/RepairParallelism.java
new file mode 100644
index 0000000..12c22ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairParallelism.java
@@ -0,0 +1,22 @@
+package org.apache.cassandra.repair;
+
+/**
+ * Specify the degree of parallelism when calculating the merkle trees in a repair job.
+ */
+public enum RepairParallelism
+{
+    /**
+     * One node at a time
+     */
+    SEQUENTIAL,
+
+    /**
+     * All nodes at the same time
+     */
+    PARALLEL,
+
+    /**
+     * One node per data center at a time
+     */
+    DATACENTER_AWARE
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index c9a9671..f2b95eb 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -83,7 +83,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
     private final UUID id;
     public final String keyspace;
     private final String[] cfnames;
-    public final boolean isSequential;
+    public final RepairParallelism parallelismDegree;
     /** Range to repair */
     public final Range<Token> range;
     public final Set<InetAddress> endpoints;
@@ -110,19 +110,19 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
      *
      * @param range range to repair
      * @param keyspace name of keyspace
-     * @param isSequential true if performing repair on snapshots sequentially
+     * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
      * @param dataCenters the data centers that should be part of the repair; null for all DCs
      * @param cfnames names of columnfamilies
      */
-    public RepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
+    public RepairSession(Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
     {
-        this(UUIDGen.getTimeUUID(), range, keyspace, isSequential, dataCenters, hosts, cfnames);
+        this(UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, dataCenters, hosts, cfnames);
     }
 
-    public RepairSession(UUID id, Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String[] cfnames)
+    public RepairSession(UUID id, Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String[] cfnames)
     {
         this.id = id;
-        this.isSequential = isSequential;
+        this.parallelismDegree = parallelismDegree;
         this.keyspace = keyspace;
         this.cfnames = cfnames;
         assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@ -270,7 +270,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan
             // Create and queue a RepairJob for each column family
             for (String cfname : cfnames)
             {
-                RepairJob job = new RepairJob(this, id, keyspace, cfname, range, isSequential, taskExecutor);
+                RepairJob job = new RepairJob(this, id, keyspace, cfname, range, parallelismDegree, taskExecutor);
                 jobs.offer(job);
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/RequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RequestCoordinator.java b/src/java/org/apache/cassandra/repair/RequestCoordinator.java
deleted file mode 100644
index ed089ef..0000000
--- a/src/java/org/apache/cassandra/repair/RequestCoordinator.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.repair;
-
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Set;
-
-/**
-*/
-public abstract class RequestCoordinator<R>
-{
-    private final Order<R> orderer;
-
-    public RequestCoordinator(boolean isSequential)
-    {
-        this.orderer = isSequential ? new SequentialOrder(this) : new ParallelOrder(this);
-    }
-
-    public abstract void send(R request);
-
-    public void add(R request)
-    {
-        orderer.add(request);
-    }
-
-    public void start()
-    {
-        orderer.start();
-    }
-
-    // Returns how many request remains
-    public int completed(R request)
-    {
-        return orderer.completed(request);
-    }
-
-    private static abstract class Order<R>
-    {
-        protected final RequestCoordinator<R> coordinator;
-
-        Order(RequestCoordinator<R> coordinator)
-        {
-            this.coordinator = coordinator;
-        }
-
-        public abstract void add(R request);
-        public abstract void start();
-        public abstract int completed(R request);
-    }
-
-    private static class SequentialOrder<R> extends Order<R>
-    {
-        private final Queue<R> requests = new LinkedList<>();
-
-        SequentialOrder(RequestCoordinator<R> coordinator)
-        {
-            super(coordinator);
-        }
-
-        public void add(R request)
-        {
-            requests.add(request);
-        }
-
-        public void start()
-        {
-            if (requests.isEmpty())
-                return;
-
-            coordinator.send(requests.peek());
-        }
-
-        public int completed(R request)
-        {
-            assert request.equals(requests.peek());
-            requests.poll();
-            int remaining = requests.size();
-            if (remaining != 0)
-                coordinator.send(requests.peek());
-            return remaining;
-        }
-    }
-
-    private static class ParallelOrder<R> extends Order<R>
-    {
-        private final Set<R> requests = new HashSet<>();
-
-        ParallelOrder(RequestCoordinator<R> coordinator)
-        {
-            super(coordinator);
-        }
-
-        public void add(R request)
-        {
-            requests.add(request);
-        }
-
-        public void start()
-        {
-            for (R request : requests)
-                coordinator.send(request);
-        }
-
-        public int completed(R request)
-        {
-            requests.remove(request);
-            return requests.size();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/repair/SequentialRequestCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SequentialRequestCoordinator.java b/src/java/org/apache/cassandra/repair/SequentialRequestCoordinator.java
new file mode 100644
index 0000000..6bb5de9
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/SequentialRequestCoordinator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class SequentialRequestCoordinator<R> implements IRequestCoordinator<R>
+{
+    private final Queue<R> requests = new LinkedList<>();
+    private final IRequestProcessor<R> processor;
+
+    public SequentialRequestCoordinator(IRequestProcessor<R> processor)
+    {
+        this.processor = processor;
+    }
+
+    @Override
+    public void add(R request)
+    {
+        requests.add(request);
+    }
+
+    @Override
+    public void start()
+    {
+        if (requests.isEmpty())
+            return;
+
+        processor.process(requests.peek());
+    }
+
+    @Override
+    public int completed(R request)
+    {
+        assert request.equals(requests.peek());
+        requests.poll();
+        int remaining = requests.size();
+        if (remaining != 0)
+            processor.process(requests.peek());
+        return remaining;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index aac9f9a..da81e8f 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -92,9 +92,9 @@ public class ActiveRepairService
      *
      * @return Future for asynchronous call or null if there is no need to repair
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, String... cfnames)
     {
-        RepairSession session = new RepairSession(range, keyspace, isSequential, dataCenters, hosts, cfnames);
+        RepairSession session = new RepairSession(range, keyspace, parallelismDegree, dataCenters, hosts, cfnames);
         if (session.endpoints.isEmpty())
             return null;
         RepairFuture futureTask = new RepairFuture(session);
@@ -128,7 +128,7 @@ public class ActiveRepairService
     // add it to the sessions (avoid NPE in tests)
     RepairFuture submitArtificialRepairSession(RepairJobDesc desc)
     {
-        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, false, null, null, new String[]{desc.columnFamily});
+        RepairSession session = new RepairSession(desc.sessionId, desc.range, desc.keyspace, RepairParallelism.PARALLEL, null, null, new String[]{desc.columnFamily});
         sessions.put(session.getId(), session);
         RepairFuture futureTask = new RepairFuture(session);
         executor.execute(futureTask);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 4bc1eee..3d42d1c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -77,6 +77,7 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.repair.RepairFuture;
 import org.apache.cassandra.repair.RepairMessageVerbHandler;
+import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.service.paxos.CommitVerbHandler;
 import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
@@ -334,7 +335,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         {
             throw new IllegalStateException("No configured daemon");
         }
-        
+
         try
         {
             daemon.nativeServer.start();
@@ -432,10 +433,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS) == null)
                 throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new ByteArrayInputStream(getApplicationStateValue(DatabaseDescriptor.getReplaceAddress(), ApplicationState.TOKENS))));
-            
+
             SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own so we receive hints, etc
             Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
-            return tokens;        
+            return tokens;
         }
         catch (IOException e)
         {
@@ -2408,16 +2409,21 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
     {
+        return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies);
+    }
+
+    public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final boolean primaryRange, final String... columnFamilies)
+    {
         // when repairing only primary range, dataCenter nor hosts can be set
         if (primaryRange && (dataCenters != null || hosts != null))
         {
             throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
         }
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
-        return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, columnFamilies);
+        return forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, ranges, columnFamilies);
     }
 
-    public int forceRepairAsync(final String keyspace, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts,  final Collection<Range<Token>> ranges, final String... columnFamilies)
+    public int forceRepairAsync(final String keyspace, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts,  final Collection<Range<Token>> ranges, final String... columnFamilies)
     {
         if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
             return 0;
@@ -2425,7 +2431,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         final int cmd = nextRepairCommand.incrementAndGet();
         if (ranges.size() > 0)
         {
-            new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, columnFamilies)).start();
+            new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, hosts, columnFamilies)).start();
         }
         return cmd;
     }
@@ -2438,37 +2444,42 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
         }
         final Collection<Range<Token>> ranges = primaryRange ? getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
-        return forceRepairAsync(keyspace, isSequential, isLocal, ranges, columnFamilies);
+        return forceRepairAsync(keyspace, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges, columnFamilies);
     }
 
-    public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, Collection<Range<Token>> ranges, String... columnFamilies)
+    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, boolean isLocal, Collection<Range<Token>> ranges, String... columnFamilies)
     {
         if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
             return 0;
 
         final int cmd = nextRepairCommand.incrementAndGet();
-        if (!FBUtilities.isUnix() && isSequential)
+        if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
         {
             logger.warn("Snapshot-based repair is not yet supported on Windows.  Reverting to parallel repair.");
-            isSequential = false;
+            parallelismDegree = RepairParallelism.PARALLEL;
         }
-        new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, columnFamilies)).start();
+        new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree, isLocal, columnFamilies)).start();
         return cmd;
     }
 
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
     {
+        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, columnFamilies);
+    }
+
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
+    {
         Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
 
         logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
                     repairingRange, keyspaceName, columnFamilies);
 
-        if (!FBUtilities.isUnix() && isSequential)
+        if (!FBUtilities.isUnix() && parallelismDegree != RepairParallelism.PARALLEL)
         {
             logger.warn("Snapshot-based repair is not yet supported on Windows.  Reverting to parallel repair.");
-            isSequential = false;
+            parallelismDegree = RepairParallelism.PARALLEL;
         }
-        return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, columnFamilies);
+        return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters, hosts, repairingRange, columnFamilies);
     }
 
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies)
@@ -2486,7 +2497,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     public void forceKeyspaceRepair(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
     {
-        forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential, isLocal, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, columnFamilies);
     }
 
     public void forceKeyspaceRepairPrimaryRange(final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
@@ -2498,7 +2509,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
         }
 
-        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential, false, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, getLocalPrimaryRanges(keyspaceName), isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, false, columnFamilies);
     }
 
     public void forceKeyspaceRepairRange(String beginToken, String endToken, final String keyspaceName, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
@@ -2507,14 +2518,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         logger.info("starting user-requested repair of range {} for keyspace {} and column families {}",
                            repairingRange, keyspaceName, columnFamilies);
-        forceKeyspaceRepairRange(keyspaceName, repairingRange, isSequential, isLocal, columnFamilies);
+        forceKeyspaceRepairRange(keyspaceName, repairingRange, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, columnFamilies);
     }
 
-    public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final String... columnFamilies) throws IOException
+    public void forceKeyspaceRepairRange(final String keyspaceName, final Collection<Range<Token>> ranges, RepairParallelism parallelismDegree, boolean isLocal, final String... columnFamilies) throws IOException
     {
         if (ranges.isEmpty() || Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor() < 2)
             return;
-        createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, isSequential, isLocal, columnFamilies).run();
+        createRepairTask(nextRepairCommand.incrementAndGet(), keyspaceName, ranges, parallelismDegree, isLocal, columnFamilies).run();
     }
 
     /**
@@ -2556,17 +2567,17 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return repairingRange;
     }
 
-    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final boolean isLocal, final String... columnFamilies)
+    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final RepairParallelism parallelismDegree, final boolean isLocal, final String... columnFamilies)
     {
         Set<String> dataCenters = null;
         if (isLocal)
         {
             dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
         }
-        return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, columnFamilies);
+        return createRepairTask(cmd, keyspace, ranges, parallelismDegree, dataCenters, null, columnFamilies);
     }
 
-    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final boolean isSequential, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
+    private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final Collection<Range<Token>> ranges, final RepairParallelism parallelismDegree, final Collection<String> dataCenters, final Collection<String> hosts, final String... columnFamilies)
     {
         if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
         {
@@ -2587,7 +2598,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                     RepairFuture future;
                     try
                     {
-                        future = forceKeyspaceRepair(range, keyspace, isSequential, dataCenters, hosts, columnFamilies);
+                        future = forceKeyspaceRepair(range, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies);
                     }
                     catch (IllegalArgumentException e)
                     {
@@ -2639,6 +2650,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, final String... columnFamilies) throws IOException
     {
+        return forceKeyspaceRepair(range, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, columnFamilies);
+    }
+
+    public RepairFuture forceKeyspaceRepair(final Range<Token> range, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, final String... columnFamilies) throws IOException
+    {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies))
         {
@@ -2651,7 +2667,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return null;
         }
 
-        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, isSequential, dataCenters, hosts, names.toArray(new String[names.size()]));
+        return ActiveRepairService.instance.submitRepairSession(range, keyspaceName, parallelismDegree, dataCenters, hosts, names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0e2f55d..2386fc8 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.service;
 
+import org.apache.cassandra.repair.RepairParallelism;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -260,10 +262,25 @@ public interface StorageServiceMBean extends NotificationEmitter
     public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
 
     /**
+     * Invoke repair asynchronously.
+     * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean.
+     * Notification format is:
+     *   type: "repair"
+     *   userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status
+     *
+     * @return Repair command number, or 0 if nothing to repair
+     */
+    public int forceRepairAsync(String keyspace, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRange, String... columnFamilies);
+
+    /**
      * Same as forceRepairAsync, but handles a specified range
      */
     public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts,  final String... columnFamilies);
 
+    /**
+     * Same as forceRepairAsync, but handles a specified range
+     */
+    public int forceRepairRangeAsync(String beginToken, String endToken, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts,  final String... columnFamilies);
 
     /**
      * Invoke repair asynchronously.
@@ -477,7 +494,7 @@ public interface StorageServiceMBean extends NotificationEmitter
 
     /**
      * Enables/Disables tracing for the whole system. Only thrift requests can start tracing currently.
-     * 
+     *
      * @param probability
      *            ]0,1[ will enable tracing on a partial number of requests with the provided probability. 0 will
      *            disable tracing and 1 will enable tracing for all requests (which mich severely cripple the system)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index a397244..2d7809a 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.StorageProxyMBean;
 import org.apache.cassandra.streaming.StreamState;
@@ -70,6 +71,7 @@ public class NodeCmd
     private static final Pair<String, String> TOKENS_OPT = Pair.create("T", "tokens");
     private static final Pair<String, String> PRIMARY_RANGE_OPT = Pair.create("pr", "partitioner-range");
     private static final Pair<String, String> PARALLEL_REPAIR_OPT = Pair.create("par", "parallel");
+    private static final Pair<String, String> DCPARALLEL_REPAIR_OPT = Pair.create("dcpar", "dcparallel");
     private static final Pair<String, String> LOCAL_DC_REPAIR_OPT = Pair.create("local", "in-local-dc");
     private static final Pair<String, String> HOST_REPAIR_OPT = Pair.create("hosts", "in-host");
     private static final Pair<String, String> DC_REPAIR_OPT = Pair.create("dc", "in-dc");
@@ -100,6 +102,7 @@ public class NodeCmd
         options.addOption(TOKENS_OPT,   false, "display all tokens");
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned by the partitioner for the node");
         options.addOption(PARALLEL_REPAIR_OPT, false, "repair nodes in parallel.");
+        options.addOption(DCPARALLEL_REPAIR_OPT, false, "repair data centers in parallel.");
         options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against nodes in the same datacenter");
         options.addOption(DC_REPAIR_OPT, true, "only repair against nodes in the specified datacenters (comma separated)");
         options.addOption(HOST_REPAIR_OPT, true, "only repair against specified nodes (comma separated)");
@@ -203,10 +206,10 @@ public class NodeCmd
         StringBuilder header = new StringBuilder(512);
         header.append("\nAvailable commands\n");
         final NodeToolHelp ntHelp = loadHelp();
-        Collections.sort(ntHelp.commands, new Comparator<NodeToolHelp.NodeToolCommand>() 
+        Collections.sort(ntHelp.commands, new Comparator<NodeToolHelp.NodeToolCommand>()
         {
             @Override
-            public int compare(NodeToolHelp.NodeToolCommand o1, NodeToolHelp.NodeToolCommand o2) 
+            public int compare(NodeToolHelp.NodeToolCommand o1, NodeToolHelp.NodeToolCommand o2)
             {
                 return o1.name.compareTo(o2.name);
             }
@@ -525,7 +528,7 @@ public class NodeCmd
         }
     }
 
-    private Map<String, SetHostStat> getOwnershipByDc(boolean resolveIp, Map<String, String> tokenToEndpoint, 
+    private Map<String, SetHostStat> getOwnershipByDc(boolean resolveIp, Map<String, String> tokenToEndpoint,
                                                       Map<InetAddress, Float> ownerships) throws UnknownHostException
     {
         Map<String, SetHostStat> ownershipByDc = Maps.newLinkedHashMap();
@@ -574,7 +577,7 @@ public class NodeCmd
         public final Float owns;
         public final String token;
 
-        public HostStat(String token, InetAddress endpoint, boolean resolveIp, Float owns) 
+        public HostStat(String token, InetAddress endpoint, boolean resolveIp, Float owns)
         {
             this.token = token;
             this.endpoint = endpoint;
@@ -1668,7 +1671,11 @@ public class NodeCmd
             switch (nc)
             {
                 case REPAIR  :
-                    boolean sequential = !cmd.hasOption(PARALLEL_REPAIR_OPT.left);
+                    RepairParallelism parallelismDegree = RepairParallelism.SEQUENTIAL;
+                    if (cmd.hasOption(PARALLEL_REPAIR_OPT.left))
+                        parallelismDegree = RepairParallelism.PARALLEL;
+                    else if (cmd.hasOption(DCPARALLEL_REPAIR_OPT.left))
+                        parallelismDegree = RepairParallelism.DATACENTER_AWARE;
                     boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
                     boolean specificDC = cmd.hasOption(DC_REPAIR_OPT.left);
                     boolean specificHosts = cmd.hasOption(HOST_REPAIR_OPT.left);
@@ -1686,9 +1693,9 @@ public class NodeCmd
                     else if(specificHosts)
                         hosts  = Arrays.asList(cmd.getOptionValue(HOST_REPAIR_OPT.left).split(","));
                     if (cmd.hasOption(START_TOKEN_OPT.left) || cmd.hasOption(END_TOKEN_OPT.left))
-                        probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters, hosts, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+                        probe.forceRepairRangeAsync(System.out, keyspace, parallelismDegree, dataCenters, hosts, cmd.getOptionValue(START_TOKEN_OPT.left), cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
                     else
-                        probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, hosts, primaryRange, columnFamilies);
+                        probe.forceRepairAsync(System.out, keyspace, parallelismDegree, dataCenters, hosts, primaryRange, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceKeyspaceFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 849e368..261d416 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.gms.FailureDetectorMBean;
 import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
+import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.StreamManagerMBean;
@@ -217,12 +218,17 @@ public class NodeProbe
 
     public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts,  boolean primaryRange, String... columnFamilies) throws IOException
     {
+        forceRepairAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, primaryRange, columnFamilies);
+    }
+
+    public void forceRepairAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts,  boolean primaryRange, String... columnFamilies) throws IOException
+    {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, hosts, primaryRange))
+            if (!runner.repairAndWait(ssProxy, parallelismDegree, dataCenters, hosts, primaryRange))
                 failed = true;
         }
         catch (Exception e)
@@ -239,15 +245,19 @@ public class NodeProbe
             catch (Throwable ignored) {}
         }
     }
-
     public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException
     {
+        forceRepairRangeAsync(out, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts, startToken, endToken, columnFamilies);
+    }
+
+    public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, final String startToken, final String endToken, String... columnFamilies) throws IOException
+    {
         RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies);
         try
         {
             jmxc.addConnectionNotificationListener(runner, null, null);
             ssProxy.addNotificationListener(runner, null, null);
-            if (!runner.repairRangeAndWait(ssProxy,  isSequential, dataCenters, hosts, startToken, endToken))
+            if (!runner.repairRangeAndWait(ssProxy, parallelismDegree, dataCenters, hosts, startToken, endToken))
                 failed = true;
         }
         catch (Exception e)
@@ -896,17 +906,17 @@ public class NodeProbe
     {
         return failed;
     }
-    
+
     public long getReadRepairAttempted()
     {
         return spProxy.getReadRepairAttempted();
     }
-    
+
     public long getReadRepairRepairedBlocking()
     {
         return spProxy.getReadRepairRepairedBlocking();
     }
-    
+
     public long getReadRepairRepairedBackground()
     {
         return spProxy.getReadRepairRepairedBackground();
@@ -1060,16 +1070,16 @@ class RepairRunner implements NotificationListener
         this.columnFamilies = columnFamilies;
     }
 
-    public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRangeOnly) throws Exception
+    public boolean repairAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, boolean primaryRangeOnly) throws Exception
     {
-        cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, hosts, primaryRangeOnly, columnFamilies);
+        cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree, dataCenters, hosts, primaryRangeOnly, columnFamilies);
         waitForRepair();
         return success;
     }
 
-    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, final Collection<String> hosts, String startToken, String endToken) throws Exception
+    public boolean repairRangeAndWait(StorageServiceMBean ssProxy, RepairParallelism parallelismDegree, Collection<String> dataCenters, final Collection<String> hosts, String startToken, String endToken) throws Exception
     {
-        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, hosts, columnFamilies);
+        cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, parallelismDegree, dataCenters, hosts, columnFamilies);
         waitForRepair();
         return success;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
index 523335e..b254bac 100644
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
@@ -155,6 +155,7 @@ commands:
     help: |
       Repair one or more column families
          Use -dc to repair specific datacenters (csv list).
+         Use -dcpar to repair datacenters in parallel.
          Use -et to specify a token at which repair range ends.
          Use -local to only repair against nodes in the same datacenter.
          Use -pr to repair only the first range returned by the partitioner.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/41469ecf/test/unit/org/apache/cassandra/repair/RequestCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RequestCoordinatorTest.java b/test/unit/org/apache/cassandra/repair/RequestCoordinatorTest.java
new file mode 100644
index 0000000..ad6eabe
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/RequestCoordinatorTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.AbstractEndpointSnitch;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+public class RequestCoordinatorTest implements IRequestProcessor<InetAddress>
+{
+    private InetAddress[] endpoints;
+    private List<InetAddress> activeRequests;
+    private static Random random = new Random(0);
+
+    @Before
+    public void setup() throws UnknownHostException
+    {
+        endpoints = new InetAddress[12];
+        for (int i = 0; i < 12; i++)
+            endpoints[i] = InetAddress.getByName("127.0.0." + (i + 1));
+        activeRequests = new ArrayList<>();
+        DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+        {
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return "rack1";
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                // 127.0.0.1, 127.0.0.2, 127.0.0.3 -> datacenter1
+                // 127.0.0.4, 127.0.0.5, 127.0.0.6 -> datacenter2 etc
+                int no = endpoint.getAddress()[3] - 1;
+                return "datacenter" + (no / 3 + 1);
+            }
+
+            @Override
+            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            {
+                return 0;
+            }
+        });
+    }
+
+    @Override
+    public void process(InetAddress request)
+    {
+        activeRequests.add(request);
+    }
+
+    @Test
+    public void testSequentialRequestCoordinator()
+    {
+        SequentialRequestCoordinator<InetAddress> coordinator = new SequentialRequestCoordinator<>(this);
+        for (InetAddress endpoint : endpoints)
+            coordinator.add(endpoint);
+        coordinator.start();
+        int max = finishRequests(coordinator);
+        assertEquals(1, max);
+    }
+
+    @Test
+    public void testParallelRequestCoordinator()
+    {
+        ParallelRequestCoordinator<InetAddress> coordinator = new ParallelRequestCoordinator<>(this);
+        for (InetAddress endpoint : endpoints)
+            coordinator.add(endpoint);
+        coordinator.start();
+        int max = finishRequests(coordinator);
+        assertEquals(endpoints.length, max);
+    }
+
+    @Test
+    public void testDatacenterAwareRequestCoordinator()
+    {
+        DatacenterAwareRequestCoordinator coordinator = new DatacenterAwareRequestCoordinator(this);
+        for (InetAddress endpoint : endpoints)
+            coordinator.add(endpoint);
+        coordinator.start();
+        int max = finishRequests(coordinator);
+        assertEquals(4, max);
+    }
+
+    private int finishRequests(IRequestCoordinator<InetAddress> requestCoordinator)
+    {
+        int max = 0;
+        while (activeRequests.size() > 0)
+        {
+            max = Math.max(max, activeRequests.size());
+            // Finish a request
+            int ix = random.nextInt(activeRequests.size());
+            InetAddress finished = activeRequests.get(ix);
+            activeRequests.remove(ix);
+            requestCoordinator.completed(finished);
+        }
+        return max;
+    }
+}


Mime
View raw message