cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [4/5] cassandra git commit: Repair common subranges of a set of nodes in one session reviewed by Stefania Alborghetti for CASSANDRA-5220
Date Thu, 06 Aug 2015 13:29:18 GMT
Repair common subranges of a set of nodes in one session
reviewed by Stefania Alborghetti for CASSANDRA-5220


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0dd50a6c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0dd50a6c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0dd50a6c

Branch: refs/heads/trunk
Commit: 0dd50a6cdc81ec9ff1367238876d476affcf60e2
Parents: bf47408
Author: Marcus Olsson <marcus.olsson@ericsson.com>
Authored: Thu Aug 6 08:23:10 2015 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Thu Aug 6 08:23:20 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  19 +-
 .../compaction/CompactionStrategyManager.java   |  24 +-
 .../apache/cassandra/net/MessagingService.java  |   6 +
 .../org/apache/cassandra/repair/RepairJob.java  |  17 +-
 .../apache/cassandra/repair/RepairJobDesc.java  |  41 +-
 .../repair/RepairMessageVerbHandler.java        |   4 +-
 .../apache/cassandra/repair/RepairRunnable.java |  45 +-
 .../apache/cassandra/repair/RepairSession.java  |  48 +-
 .../cassandra/repair/RepairSessionResult.java   |   6 +-
 .../org/apache/cassandra/repair/SyncTask.java   |   6 +-
 .../repair/SystemDistributedKeyspace.java       |  27 +-
 .../apache/cassandra/repair/TreeResponse.java   |   8 +-
 .../apache/cassandra/repair/ValidationTask.java |  14 +-
 .../org/apache/cassandra/repair/Validator.java  |  83 +--
 .../repair/messages/ValidationComplete.java     |  47 +-
 .../cassandra/service/ActiveRepairService.java  |   4 +-
 .../org/apache/cassandra/utils/MerkleTrees.java | 434 +++++++++++++++
 .../serialization/3.0/gms.EndpointState.bin     | Bin 0 -> 73 bytes
 test/data/serialization/3.0/gms.Gossip.bin      | Bin 0 -> 158 bytes
 .../serialization/3.0/service.SyncComplete.bin  | Bin 0 -> 362 bytes
 .../serialization/3.0/service.SyncRequest.bin   | Bin 0 -> 219 bytes
 .../3.0/service.ValidationComplete.bin          | Bin 0 -> 1251 bytes
 .../3.0/service.ValidationRequest.bin           | Bin 0 -> 167 bytes
 .../cassandra/AbstractSerializationsTester.java |   3 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../cassandra/repair/LocalSyncTaskTest.java     |  23 +-
 .../cassandra/repair/RepairSessionTest.java     |   3 +-
 .../apache/cassandra/repair/ValidatorTest.java  |  19 +-
 .../cassandra/service/SerializationsTest.java   |  28 +-
 .../apache/cassandra/utils/MerkleTreeTest.java  |   1 -
 .../apache/cassandra/utils/MerkleTreesTest.java | 538 +++++++++++++++++++
 32 files changed, 1256 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1d1ad0f..80e0e50 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta1
+ * Repair improvements when using vnodes (CASSANDRA-5220)
  * Disable scripted UDFs by default (CASSANDRA-9889)
  * Add transparent data encryption core classes (CASSANDRA-9945)
  * Bytecode inspection for Java-UDFs (CASSANDRA-9890)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 92cc249..8aa16d5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -69,6 +69,8 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
@@ -76,7 +78,7 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.UUIDGen;
@@ -1045,7 +1047,7 @@ public class CompactionManager implements CompactionManagerMBean
 
                 for (SSTableReader sstable : sstableCandidates.sstables)
                 {
-                    if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singletonList(validator.desc.range)))
+                    if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(validator.desc.ranges))
                     {
                         sstablesToValidate.add(sstable);
                     }
@@ -1074,19 +1076,20 @@ public class CompactionManager implements CompactionManagerMBean
                     gcBefore = getDefaultGcBefore(cfs, nowInSec);
             }
 
-            // Create Merkle tree suitable to hold estimated partitions for given range.
-            // We blindly assume that partition is evenly distributed on all sstables for now.
+            // Create Merkle trees suitable to hold estimated partitions for the given ranges.
+            // We blindly assume that a partition is evenly distributed on all sstables for now.
             long numPartitions = 0;
             for (SSTableReader sstable : sstables)
             {
-                numPartitions += sstable.estimatedKeysForRanges(singleton(validator.desc.range));
+                numPartitions += sstable.estimatedKeysForRanges(validator.desc.ranges);
             }
             // determine tree depth from number of partitions, but cap at 20 to prevent large tree.
             int depth = numPartitions > 0 ? (int) Math.min(Math.floor(Math.log(numPartitions)), 20) : 0;
-            MerkleTree tree = new MerkleTree(cfs.getPartitioner(), validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, depth));
+            MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
+            tree.addMerkleTrees((int) Math.pow(2, depth), validator.desc.ranges);
 
             long start = System.nanoTime();
-            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.range);
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(sstables, validator.desc.ranges);
                  ValidationCompactionController controller = new ValidationCompactionController(cfs, gcBefore);
                  CompactionIterator ci = new ValidationCompactionIterator(scanners.scanners, controller, nowInSec, metrics))
             {
@@ -1119,7 +1122,7 @@ public class CompactionManager implements CompactionManagerMBean
                              duration,
                              depth,
                              numPartitions,
-                             MerkleTree.serializer.serializedSize(tree, 0),
+                             MerkleTrees.serializer.serializedSize(tree, 0),
                              validator.desc);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index e5aff5d..4f6dfa2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.Callable;
 
+import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -340,7 +341,7 @@ public class CompactionStrategyManager implements INotificationConsumer
      * @return
      */
     @SuppressWarnings("resource")
-    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables,  Collection<Range<Token>> ranges)
     {
         List<SSTableReader> repairedSSTables = new ArrayList<>();
         List<SSTableReader> unrepairedSSTables = new ArrayList<>();
@@ -352,19 +353,26 @@ public class CompactionStrategyManager implements INotificationConsumer
                 unrepairedSSTables.add(sstable);
         }
 
+        Set<ISSTableScanner> scanners = new HashSet<>(sstables.size());
 
-        AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
-        AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+        for (Range<Token> range : ranges)
+        {
+            AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+            AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+
+            for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners))
+            {
+                if (!scanners.add(scanner))
+                    scanner.close();
+            }
+        }
 
-        List<ISSTableScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
-        scanners.addAll(repairedScanners.scanners);
-        scanners.addAll(unrepairedScanners.scanners);
-        return new AbstractCompactionStrategy.ScannerList(scanners);
+        return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners));
     }
 
     public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables)
     {
-        return getScanners(sstables, null);
+        return getScanners(sstables, Collections.singleton(null));
     }
 
     public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 422fdb3..e10b4cb 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -1111,6 +1111,12 @@ public final class MessagingService implements MessagingServiceMBean
         return StorageService.instance.getTokenMetadata().partitioner;
     }
 
+    public static void validatePartitioner(Collection<? extends AbstractBounds<?>> allBounds)
+    {
+        for (AbstractBounds<?> bounds : allBounds)
+            validatePartitioner(bounds);
+    }
+
     public static void validatePartitioner(AbstractBounds<?> bounds)
     {
         if (globalPartitioner() != bounds.left.getPartitioner())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index ac20428..1e54f88 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -48,21 +48,14 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
      *
      * @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
-     * @param parallelismDegree how to run repair job in parallel
-     * @param repairedAt when the repair occurred (millis)
-     * @param taskExecutor Executor to run various repair tasks
      */
-    public RepairJob(RepairSession session,
-                     String columnFamily,
-                     RepairParallelism parallelismDegree,
-                     long repairedAt,
-                     ListeningExecutorService taskExecutor)
+    public RepairJob(RepairSession session, String columnFamily)
     {
         this.session = session;
-        this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRange());
-        this.repairedAt = repairedAt;
-        this.taskExecutor = taskExecutor;
-        this.parallelismDegree = parallelismDegree;
+        this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
+        this.repairedAt = session.repairedAt;
+        this.taskExecutor = session.taskExecutor;
+        this.parallelismDegree = session.parallelismDegree;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairJobDesc.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJobDesc.java b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
index 1dd67c7..05adbf9 100644
--- a/src/java/org/apache/cassandra/repair/RepairJobDesc.java
+++ b/src/java/org/apache/cassandra/repair/RepairJobDesc.java
@@ -18,6 +18,8 @@
 package org.apache.cassandra.repair;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.UUID;
 
 import com.google.common.base.Objects;
@@ -47,21 +49,21 @@ public class RepairJobDesc
     public final String keyspace;
     public final String columnFamily;
     /** repairing range  */
-    public final Range<Token> range;
+    public final Collection<Range<Token>> ranges;
 
-    public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Range<Token> range)
+    public RepairJobDesc(UUID parentSessionId, UUID sessionId, String keyspace, String columnFamily, Collection<Range<Token>> ranges)
     {
         this.parentSessionId = parentSessionId;
         this.sessionId = sessionId;
         this.keyspace = keyspace;
         this.columnFamily = columnFamily;
-        this.range = range;
+        this.ranges = ranges;
     }
 
     @Override
     public String toString()
     {
-        return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + range + "]";
+        return "[repair #" + sessionId + " on " + keyspace + "/" + columnFamily + ", " + ranges + "]";
     }
 
     @Override
@@ -74,7 +76,7 @@ public class RepairJobDesc
 
         if (!columnFamily.equals(that.columnFamily)) return false;
         if (!keyspace.equals(that.keyspace)) return false;
-        if (range != null ? !range.equals(that.range) : that.range != null) return false;
+        if (ranges != null ? that.ranges == null || (ranges.size() != that.ranges.size()) || (ranges.size() == that.ranges.size() && !ranges.containsAll(that.ranges)) : that.ranges != null) return false;
         if (!sessionId.equals(that.sessionId)) return false;
         if (parentSessionId != null ? !parentSessionId.equals(that.parentSessionId) : that.parentSessionId != null) return false;
 
@@ -84,7 +86,7 @@ public class RepairJobDesc
     @Override
     public int hashCode()
     {
-        return Objects.hashCode(sessionId, keyspace, columnFamily, range);
+        return Objects.hashCode(sessionId, keyspace, columnFamily, ranges);
     }
 
     private static class RepairJobDescSerializer implements IVersionedSerializer<RepairJobDesc>
@@ -100,8 +102,10 @@ public class RepairJobDesc
             UUIDSerializer.serializer.serialize(desc.sessionId, out, version);
             out.writeUTF(desc.keyspace);
             out.writeUTF(desc.columnFamily);
-            MessagingService.validatePartitioner(desc.range);
-            AbstractBounds.tokenSerializer.serialize(desc.range, out, version);
+            MessagingService.validatePartitioner(desc.ranges);
+            out.writeInt(desc.ranges.size());
+            for (Range<Token> rt : desc.ranges)
+                AbstractBounds.tokenSerializer.serialize(rt, out, version);
         }
 
         public RepairJobDesc deserialize(DataInputPlus in, int version) throws IOException
@@ -115,8 +119,19 @@ public class RepairJobDesc
             UUID sessionId = UUIDSerializer.serializer.deserialize(in, version);
             String keyspace = in.readUTF();
             String columnFamily = in.readUTF();
-            Range<Token> range = (Range<Token>)AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
-            return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range);
+
+            int nRanges = in.readInt();
+            Collection<Range<Token>> ranges = new ArrayList<>();
+            Range<Token> range;
+
+            for (int i = 0; i < nRanges; i++)
+            {
+                range = (Range<Token>) AbstractBounds.tokenSerializer.deserialize(in,
+                        MessagingService.globalPartitioner(), version);
+                ranges.add(range);
+            }
+
+            return new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, ranges);
         }
 
         public long serializedSize(RepairJobDesc desc, int version)
@@ -131,7 +146,11 @@ public class RepairJobDesc
             size += UUIDSerializer.serializer.serializedSize(desc.sessionId, version);
             size += TypeSizes.sizeof(desc.keyspace);
             size += TypeSizes.sizeof(desc.columnFamily);
-            size += AbstractBounds.tokenSerializer.serializedSize(desc.range, version);
+            size += TypeSizes.sizeof(desc.ranges.size());
+            for (Range<Token> rt : desc.ranges)
+            {
+                size += AbstractBounds.tokenSerializer.serializedSize(rt, version);
+            }
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index d765ae6..28a3bf5 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -79,14 +79,14 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 case SNAPSHOT:
                     logger.debug("Snapshotting {}", desc);
                     ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
-                    final Range<Token> repairingRange = desc.range;
+                    final Collection<Range<Token>> repairingRange = desc.ranges;
                     Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
                     {
                         public boolean apply(SSTableReader sstable)
                         {
                             return sstable != null &&
                                    !sstable.metadata.isIndex() && // exclude SSTables from 2i
-                                   new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+                                   new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(repairingRange);
                         }
                     }, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 28511db..9401c03 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -146,17 +147,19 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         }
 
         final Set<InetAddress> allNeighbors = new HashSet<>();
-        Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
+        List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>();
         try
         {
             for (Range<Token> range : options.getRanges())
             {
-                    Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
-                                                                                  options.getDataCenters(),
-                                                                                  options.getHosts());
-                    rangeToNeighbors.put(range, neighbors);
-                    allNeighbors.addAll(neighbors);
+                Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range,
+                                                                              options.getDataCenters(),
+                                                                              options.getHosts());
+
+                addRangeToNeighbors(commonRanges, range, neighbors);
+                allNeighbors.addAll(neighbors);
             }
+
             progress.incrementAndGet();
         }
         catch (IllegalArgumentException e)
@@ -210,13 +213,13 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                                                                                          "internal"));
 
         List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
-        for (Range<Token> range : options.getRanges())
+        for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges)
         {
             final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
-                                                              range,
+                                                              p.right,
                                                               keyspace,
                                                               options.getParallelism(),
-                                                              rangeToNeighbors.get(range),
+                                                              p.left,
                                                               repairedAt,
                                                               executor,
                                                               cfnames);
@@ -228,7 +231,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                 public void onSuccess(RepairSessionResult result)
                 {
                     String message = String.format("Repair session %s for range %s finished", session.getId(),
-                                                   session.getRange().toString());
+                                                   session.getRanges().toString());
                     logger.info(message);
                     fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
                                                              progress.incrementAndGet(),
@@ -239,7 +242,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                 public void onFailure(Throwable t)
                 {
                     String message = String.format("Repair session %s for range %s failed with error %s",
-                                                   session.getId(), session.getRange().toString(), t.getMessage());
+                                                   session.getId(), session.getRanges().toString(), t.getMessage());
                     logger.error(message, t);
                     fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS,
                                                              progress.incrementAndGet(),
@@ -265,7 +268,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                 {
                     if (sessionResult != null)
                     {
-                        successfulRanges.add(sessionResult.range);
+                        successfulRanges.addAll(sessionResult.ranges);
                     }
                     else
                     {
@@ -325,6 +328,24 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         });
     }
 
+    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
+    {
+        for (int i = 0; i < neighborRangeList.size(); i++)
+        {
+            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
+
+            if (p.left.containsAll(neighbors))
+            {
+                p.right.add(range);
+                return;
+            }
+        }
+
+        List<Range<Token>> ranges = new ArrayList<>();
+        ranges.add(range);
+        neighborRangeList.add(Pair.create(neighbors, ranges));
+    }
+
     private Thread createQueryThread(final int cmd, final UUID sessionId)
     {
         return new Thread(new WrappedRunnable()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..a52b352 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -37,13 +37,13 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.Pair;
 
 /**
- * Coordinates the (active) repair of a token range.
+ * Coordinates the (active) repair of a list of non overlapping token ranges.
  *
- * A given RepairSession repairs a set of replicas for a given range on a list
+ * A given RepairSession repairs a set of replicas for a given set of ranges on a list
  * of column families. For each of the column family to repair, RepairSession
  * creates a {@link RepairJob} that handles the repair of that CF.
  *
@@ -64,11 +64,11 @@ import org.apache.cassandra.utils.Pair;
  * A given session will execute the first phase (validation phase) of each of it's job
  * sequentially. In other words, it will start the first job and only start the next one
  * once that first job validation phase is complete. This is done so that the replica only
- * create one merkle tree at a time, which is our way to ensure that such creation starts
+ * create one merkle tree per range at a time, which is our way to ensure that such creation starts
  * roughly at the same time on every node (see CASSANDRA-2816). However the synchronization
  * phases are allowed to run concurrently (with each other and with validation phases).
  *
- * A given RepairJob has 2 modes: either sequential or not (isSequential flag). If sequential,
+ * A given RepairJob has 2 modes: either sequential or not (RepairParallelism). If sequential,
  * it will requests merkle tree creation from each replica in sequence (though in that case
  * we still first send a message to each node to flush and snapshot data so each merkle tree
  * creation is still done on similar data, even if the actual creation is not
@@ -88,9 +88,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     private final String[] cfnames;
     public final RepairParallelism parallelismDegree;
     /** Range to repair */
-    public final Range<Token> range;
+    public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
-    private final long repairedAt;
+    public final long repairedAt;
 
     // number of validations left to be performed
     private final AtomicInteger validationRemaining;
@@ -103,7 +103,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
 
     // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
-    private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
+    public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
 
     private volatile boolean terminated = false;
 
@@ -112,7 +112,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      *
      * @param parentRepairSession the parent sessions id
      * @param id this sessions id
-     * @param range range to repair
+     * @param ranges ranges to repair
      * @param keyspace name of keyspace
      * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
      * @param endpoints the data centers that should be part of the repair; null for all DCs
@@ -121,7 +121,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      */
     public RepairSession(UUID parentRepairSession,
                          UUID id,
-                         Range<Token> range,
+                         Collection<Range<Token>> ranges,
                          String keyspace,
                          RepairParallelism parallelismDegree,
                          Set<InetAddress> endpoints,
@@ -135,7 +135,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.parallelismDegree = parallelismDegree;
         this.keyspace = keyspace;
         this.cfnames = cfnames;
-        this.range = range;
+        this.ranges = ranges;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
         this.validationRemaining = new AtomicInteger(cfnames.length);
@@ -146,9 +146,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         return id;
     }
 
-    public Range<Token> getRange()
+    public Collection<Range<Token>> getRanges()
     {
-        return range;
+        return ranges;
     }
 
     public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, ValidationTask task)
@@ -166,9 +166,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      *
      * @param desc repair job description
      * @param endpoint endpoint that sent merkle tree
-     * @param tree calculated merkle tree, or null if validation failed
+     * @param trees calculated merkle trees, or null if validation failed
      */
-    public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTree tree)
+    public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTrees trees)
     {
         ValidationTask task = validating.remove(Pair.create(desc, endpoint));
         if (task == null)
@@ -180,7 +180,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
         logger.info("[repair #{}] {}", getId(), message);
         Tracing.traceRepair(message);
-        task.treeReceived(tree);
+        task.treesReceived(trees);
 
         // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
         // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
@@ -234,15 +234,15 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         if (terminated)
             return;
 
-        logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames)));
-        Tracing.traceRepair("Syncing range {}", range);
-        SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, range, endpoints);
+        logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), ranges, keyspace, Arrays.toString(cfnames)));
+        Tracing.traceRepair("Syncing range {}", ranges);
+        SystemDistributedKeyspace.startRepairs(getId(), parentRepairSession, keyspace, cfnames, ranges, endpoints);
 
         if (endpoints.isEmpty())
         {
-            logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", range));
+            logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", ranges));
             Tracing.traceRepair(message);
-            set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList()));
+            set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList()));
             SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
             return;
         }
@@ -265,7 +265,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length);
         for (String cfname : cfnames)
         {
-            RepairJob job = new RepairJob(this, cfname, parallelismDegree, repairedAt, taskExecutor);
+            RepairJob job = new RepairJob(this, cfname);
             executor.execute(job);
             jobs.add(job);
         }
@@ -277,8 +277,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
             {
                 // this repair session is completed
                 logger.info("[repair #{}] {}", getId(), "Session completed successfully");
-                Tracing.traceRepair("Completed sync of range {}", range);
-                set(new RepairSessionResult(id, keyspace, range, results));
+                Tracing.traceRepair("Completed sync of range {}", ranges);
+                set(new RepairSessionResult(id, keyspace, ranges, results));
 
                 taskExecutor.shutdown();
                 // mark this session as terminated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/RepairSessionResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
index 4551608..d4fff37 100644
--- a/src/java/org/apache/cassandra/repair/RepairSessionResult.java
+++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
@@ -30,14 +30,14 @@ public class RepairSessionResult
 {
     public final UUID sessionId;
     public final String keyspace;
-    public final Range<Token> range;
+    public final Collection<Range<Token>> ranges;
     public final Collection<RepairResult> repairJobResults;
 
-    public RepairSessionResult(UUID sessionId, String keyspace, Range<Token> range, Collection<RepairResult> repairJobResults)
+    public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults)
     {
         this.sessionId = sessionId;
         this.keyspace = keyspace;
-        this.range = range;
+        this.ranges = ranges;
         this.repairJobResults = repairJobResults;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java
index 7350a66..8adec6f 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.repair;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.util.concurrent.AbstractFuture;
@@ -27,7 +26,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * SyncTask will calculate the difference of MerkleTree between two nodes
@@ -56,8 +55,7 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna
     public void run()
     {
         // compare trees, and collect differences
-        List<Range<Token>> differences = new ArrayList<>();
-        differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
+        List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees);
 
         stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 70e74db..9cf6c3e 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -130,7 +130,7 @@ public final class SystemDistributedKeyspace
         processSilent(fmtQuery);
     }
 
-    public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Range<Token> range, Iterable<InetAddress> endpoints)
+    public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[] cfnames, Collection<Range<Token>> ranges, Iterable<InetAddress> endpoints)
     {
         String coordinator = FBUtilities.getBroadcastAddress().getHostAddress();
         Set<String> participants = Sets.newHashSet(coordinator);
@@ -144,17 +144,20 @@ public final class SystemDistributedKeyspace
 
         for (String cfname : cfnames)
         {
-            String fmtQry = String.format(query, NAME, REPAIR_HISTORY,
-                                          keyspaceName,
-                                          cfname,
-                                          id.toString(),
-                                          parent_id.toString(),
-                                          range.left.toString(),
-                                          range.right.toString(),
-                                          coordinator,
-                                          Joiner.on("', '").join(participants),
-                    RepairState.STARTED.toString());
-            processSilent(fmtQry);
+            for (Range<Token> range : ranges)
+            {
+                String fmtQry = String.format(query, NAME, REPAIR_HISTORY,
+                                              keyspaceName,
+                                              cfname,
+                                              id.toString(),
+                                              parent_id.toString(),
+                                              range.left.toString(),
+                                              range.right.toString(),
+                                              coordinator,
+                                              Joiner.on("', '").join(participants),
+                                              RepairState.STARTED.toString());
+                processSilent(fmtQry);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/TreeResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/TreeResponse.java b/src/java/org/apache/cassandra/repair/TreeResponse.java
index eede4ee..c898b36 100644
--- a/src/java/org/apache/cassandra/repair/TreeResponse.java
+++ b/src/java/org/apache/cassandra/repair/TreeResponse.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * Merkle tree response sent from given endpoint.
@@ -27,11 +27,11 @@ import org.apache.cassandra.utils.MerkleTree;
 public class TreeResponse
 {
     public final InetAddress endpoint;
-    public final MerkleTree tree;
+    public final MerkleTrees trees;
 
-    public TreeResponse(InetAddress endpoint, MerkleTree tree)
+    public TreeResponse(InetAddress endpoint, MerkleTrees trees)
     {
         this.endpoint = endpoint;
-        this.tree = tree;
+        this.trees = trees;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/ValidationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java
index a52ec4f..bd866d2 100644
--- a/src/java/org/apache/cassandra/repair/ValidationTask.java
+++ b/src/java/org/apache/cassandra/repair/ValidationTask.java
@@ -18,13 +18,17 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
+import java.util.Map;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.RepairException;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * ValidationTask sends {@link ValidationRequest} to a replica.
@@ -53,19 +57,19 @@ public class ValidationTask extends AbstractFuture<TreeResponse> implements Runn
     }
 
     /**
-     * Receive MerkleTree from replica node.
+     * Receive MerkleTrees from replica node.
      *
-     * @param tree MerkleTree that is sent from replica. Null if validation failed on replica node.
+     * @param trees MerkleTrees that is sent from replica. Null if validation failed on replica node.
      */
-    public void treeReceived(MerkleTree tree)
+    public void treesReceived(MerkleTrees trees)
     {
-        if (tree == null)
+        if (trees == null)
         {
             setException(new RepairException(desc, "Validation failed in " + endpoint));
         }
         else
         {
-            set(new TreeResponse(endpoint, tree));
+            set(new TreeResponse(endpoint, trees));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 87d186c..7d6c787 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -33,12 +33,15 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTree.RowHash;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * Handles the building of a merkle tree for a column family.
@@ -58,11 +61,11 @@ public class Validator implements Runnable
 
     // null when all rows with the min token have been consumed
     private long validated;
-    private MerkleTree tree;
+    private MerkleTrees trees;
     // current range being updated
     private MerkleTree.TreeRange range;
     // iterator for iterating sub ranges (MT's leaves)
-    private MerkleTree.TreeRangeIterator ranges;
+    private MerkleTrees.TreeRangeIterator ranges;
     // last key seen
     private DecoratedKey lastKey;
 
@@ -76,9 +79,9 @@ public class Validator implements Runnable
         ranges = null;
     }
 
-    public void prepare(ColumnFamilyStore cfs, MerkleTree tree)
+    public void prepare(ColumnFamilyStore cfs, MerkleTrees tree)
     {
-        this.tree = tree;
+        this.trees = tree;
 
         if (!tree.partitioner().preservesOrder())
         {
@@ -87,32 +90,35 @@ public class Validator implements Runnable
         }
         else
         {
-            List<DecoratedKey> keys = new ArrayList<>();
-            for (DecoratedKey sample : cfs.keySamples(desc.range))
+            for (Range<Token> range : tree.ranges())
             {
-                assert desc.range.contains(sample.getToken()): "Token " + sample.getToken() + " is not within range " + desc.range;
-                keys.add(sample);
-            }
+                List<DecoratedKey> keys = new ArrayList<>();
+                for (DecoratedKey sample : cfs.keySamples(range))
+                {
+                    assert range.contains(sample.getToken()) : "Token " + sample.getToken() + " is not within range " + desc.ranges;
+                    keys.add(sample);
+                }
 
-            if (keys.isEmpty())
-            {
-                // use an even tree distribution
-                tree.init();
-            }
-            else
-            {
-                int numkeys = keys.size();
-                Random random = new Random();
-                // sample the column family using random keys from the index
-                while (true)
+                if (keys.isEmpty())
+                {
+                    // use an even tree distribution
+                    tree.init(range);
+                }
+                else
                 {
-                    DecoratedKey dk = keys.get(random.nextInt(numkeys));
-                    if (!tree.split(dk.getToken()))
-                        break;
+                    int numKeys = keys.size();
+                    Random random = new Random();
+                    // sample the column family using random keys from the index
+                    while (true)
+                    {
+                        DecoratedKey dk = keys.get(random.nextInt(numKeys));
+                        if (!tree.split(dk.getToken()))
+                            break;
+                    }
                 }
             }
         }
-        logger.debug("Prepared AEService tree of size {} for {}", tree.size(), desc);
+        logger.debug("Prepared AEService trees of size {} for {}", trees.size(), desc);
         ranges = tree.invalids();
     }
 
@@ -124,7 +130,7 @@ public class Validator implements Runnable
      */
     public void add(UnfilteredRowIterator partition)
     {
-        assert desc.range.contains(partition.partitionKey().getToken()) : partition.partitionKey().getToken() + " is not contained in " + desc.range;
+        assert Range.isInRanges(partition.partitionKey().getToken(), desc.ranges) : partition.partitionKey().getToken() + " is not contained in " + desc.ranges;
         assert lastKey == null || lastKey.compareTo(partition.partitionKey()) < 0
                : "partition " + partition.partitionKey() + " received out of order wrt " + lastKey;
         lastKey = partition.partitionKey();
@@ -133,13 +139,14 @@ public class Validator implements Runnable
             range = ranges.next();
 
         // generate new ranges as long as case 1 is true
-        while (!range.contains(lastKey.getToken()))
+        if (!findCorrectRange(lastKey.getToken()))
         {
             // add the empty hash, and move to the next range
-            range.ensureHashInitialised();
-            range = ranges.next();
+            ranges = trees.invalids();
+            findCorrectRange(lastKey.getToken());
         }
 
+        assert range.contains(lastKey.getToken()) : "Token not in MerkleTree: " + lastKey.getToken();
         // case 3 must be true: mix in the hashed row
         RowHash rowHash = rowHash(partition);
         if (rowHash != null)
@@ -148,6 +155,16 @@ public class Validator implements Runnable
         }
     }
 
+    public boolean findCorrectRange(Token t)
+    {
+        while (!range.contains(t) && ranges.hasNext())
+        {
+            range = ranges.next();
+        }
+
+        return range.contains(t);
+    }
+
     static class CountingDigest extends MessageDigest
     {
         private long count;
@@ -212,9 +229,9 @@ public class Validator implements Runnable
         {
             // log distribution of rows in tree
             logger.debug("Validated {} partitions for {}.  Partitions per leaf are:", validated, desc.sessionId);
-            tree.histogramOfRowCountPerLeaf().log(logger);
+            trees.logRowCountPerLeaf(logger);
             logger.debug("Validated {} partitions for {}.  Partition sizes are:", validated, desc.sessionId);
-            tree.histogramOfRowSizePerLeaf().log(logger);
+            trees.logRowSizePerLeaf(logger);
         }
     }
 
@@ -223,8 +240,8 @@ public class Validator implements Runnable
     {
         assert ranges != null : "Validator was not prepared()";
 
-        if (range != null)
-            range.ensureHashInitialised();
+        ranges = trees.invalids();
+
         while (ranges.hasNext())
         {
             range = ranges.next();
@@ -255,6 +272,6 @@ public class Validator implements Runnable
             logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s.%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily));
             Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
         }
-        MessagingService.instance().sendOneWay(new ValidationComplete(desc, tree).createMessage(), initiator);
+        MessagingService.instance().sendOneWay(new ValidationComplete(desc, trees).createMessage(), initiator);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
index ef0c4ec..90be8e5 100644
--- a/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/ValidationComplete.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.RepairJobDesc;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 
 /**
  * ValidationComplete message is sent when validation compaction completed successfully.
@@ -34,24 +34,25 @@ public class ValidationComplete extends RepairMessage
 {
     public static MessageSerializer serializer = new ValidationCompleteSerializer();
 
-    /** true if validation success, false otherwise */
-    public final boolean success;
     /** Merkle hash tree response. Null if validation failed. */
-    public final MerkleTree tree;
+    public final MerkleTrees trees;
 
     public ValidationComplete(RepairJobDesc desc)
     {
         super(Type.VALIDATION_COMPLETE, desc);
-        this.success = false;
-        this.tree = null;
+        trees = null;
     }
 
-    public ValidationComplete(RepairJobDesc desc, MerkleTree tree)
+    public ValidationComplete(RepairJobDesc desc, MerkleTrees trees)
     {
         super(Type.VALIDATION_COMPLETE, desc);
-        assert tree != null;
-        this.success = true;
-        this.tree = tree;
+        assert trees != null;
+        this.trees = trees;
+    }
+
+    public boolean success()
+    {
+        return trees != null;
     }
 
     private static class ValidationCompleteSerializer implements MessageSerializer<ValidationComplete>
@@ -59,31 +60,31 @@ public class ValidationComplete extends RepairMessage
         public void serialize(ValidationComplete message, DataOutputPlus out, int version) throws IOException
         {
             RepairJobDesc.serializer.serialize(message.desc, out, version);
-            out.writeBoolean(message.success);
-            if (message.success)
-                MerkleTree.serializer.serialize(message.tree, out, version);
+            out.writeBoolean(message.success());
+            if (message.trees != null)
+                MerkleTrees.serializer.serialize(message.trees, out, version);
         }
 
         public ValidationComplete deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
-            if (in.readBoolean())
-            {
-                MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
-                return new ValidationComplete(desc, tree);
-            }
-            else
+            boolean success = in.readBoolean();
+
+            if (success)
             {
-                return new ValidationComplete(desc);
+                MerkleTrees trees = MerkleTrees.serializer.deserialize(in, version);
+                return new ValidationComplete(desc, trees);
             }
+
+            return new ValidationComplete(desc);
         }
 
         public long serializedSize(ValidationComplete message, int version)
         {
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
-            size += TypeSizes.sizeof(message.success);
-            if (message.success)
-                size += MerkleTree.serializer.serializedSize(message.tree, version);
+            size += TypeSizes.sizeof(message.success());
+            if (message.trees != null)
+                size += MerkleTrees.serializer.serializedSize(message.trees, version);
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 213edeb..e75d13e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -103,7 +103,7 @@ public class ActiveRepairService
      * @return Future for asynchronous call or null if there is no need to repair
      */
     public RepairSession submitRepairSession(UUID parentRepairSession,
-                                             Range<Token> range,
+                                             Collection<Range<Token>> range,
                                              String keyspace,
                                              RepairParallelism parallelismDegree,
                                              Set<InetAddress> endpoints,
@@ -383,7 +383,7 @@ public class ActiveRepairService
         {
             case VALIDATION_COMPLETE:
                 ValidationComplete validation = (ValidationComplete) message;
-                session.validationComplete(desc, endpoint, validation.tree);
+                session.validationComplete(desc, endpoint, validation.trees);
                 break;
             case SYNC_COMPLETE:
                 // one of replica is synced.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/src/java/org/apache/cassandra/utils/MerkleTrees.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MerkleTrees.java b/src/java/org/apache/cassandra/utils/MerkleTrees.java
new file mode 100644
index 0000000..43c023e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/MerkleTrees.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.PeekingIterator;
+import org.slf4j.Logger;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+
+/**
+ * Wrapper class for handling of multiple MerkleTrees at once.
+ * 
+ * The MerkleTree's are divided in Ranges of non-overlapping tokens.
+ */
+public class MerkleTrees implements Iterable<Map.Entry<Range<Token>, MerkleTree>>
+{
+    public static final MerkleTreesSerializer serializer = new MerkleTreesSerializer();
+
+    private Map<Range<Token>, MerkleTree> merkleTrees = new TreeMap<>(new TokenRangeComparator());
+
+    private IPartitioner partitioner;
+
+    /**
+     * Creates empty MerkleTrees object.
+     * 
+     * @param partitioner The partitioner to use
+     */
+    public MerkleTrees(IPartitioner partitioner)
+    {
+        this(partitioner, new ArrayList<>());
+    }
+
+    private MerkleTrees(IPartitioner partitioner, Collection<MerkleTree> merkleTrees)
+    {
+        this.partitioner = partitioner;
+        addTrees(merkleTrees);
+    }
+
+    /**
+     * Get the ranges that these merkle trees covers.
+     * 
+     * @return
+     */
+    public Collection<Range<Token>> ranges()
+    {
+        return merkleTrees.keySet();
+    }
+
+    /**
+     * Get the partitioner in use.
+     * 
+     * @return
+     */
+    public IPartitioner partitioner()
+    {
+        return partitioner;
+    }
+
+    /**
+     * Add merkle tree's with the defined maxsize and ranges.
+     * 
+     * @param maxsize
+     * @param ranges
+     */
+    public void addMerkleTrees(int maxsize, Collection<Range<Token>> ranges)
+    {
+        for (Range<Token> range : ranges)
+        {
+            addMerkleTree(maxsize, range);
+        }
+    }
+
+    /**
+     * Add a MerkleTree with the defined size and range.
+     * 
+     * @param maxsize
+     * @param range
+     * @return The created merkle tree.
+     */
+    public MerkleTree addMerkleTree(int maxsize, Range<Token> range)
+    {
+        return addMerkleTree(maxsize, MerkleTree.RECOMMENDED_DEPTH, range);
+    }
+
+    @VisibleForTesting
+    public MerkleTree addMerkleTree(int maxsize, byte hashdepth, Range<Token> range)
+    {
+        MerkleTree tree = new MerkleTree(partitioner, range, hashdepth, maxsize);
+        addTree(tree);
+
+        return tree;
+    }
+
+    /**
+     * Get the MerkleTree.Range responsible for the given token.
+     * 
+     * @param t
+     * @return
+     */
+    public MerkleTree.TreeRange get(Token t)
+    {
+        return getMerkleTree(t).get(t);
+    }
+
+    /**
+     * Init all MerkleTree's with an even tree distribution.
+     */
+    public void init()
+    {
+        for (Range<Token> range : merkleTrees.keySet())
+        {
+            init(range);
+        }
+    }
+
+    /**
+     * Init a selected MerkleTree with an even tree distribution.
+     * 
+     * @param range
+     */
+    public void init(Range<Token> range)
+    {
+        merkleTrees.get(range).init();
+    }
+
+    /**
+     * Split the MerkleTree responsible for the given token.
+     * 
+     * @param t
+     * @return
+     */
+    public boolean split(Token t)
+    {
+        return getMerkleTree(t).split(t);
+    }
+
+    /**
+     * Invalidate the MerkleTree responsible for the given token.
+     * 
+     * @param t
+     */
+    public void invalidate(Token t)
+    {
+        getMerkleTree(t).invalidate(t);
+    }
+
+    /**
+     * Get the MerkleTree responsible for the given token range.
+     * 
+     * @param range
+     * @return
+     */
+    public MerkleTree getMerkleTree(Range<Token> range)
+    {
+        return merkleTrees.get(range);
+    }
+
+    public long size()
+    {
+        long size = 0;
+
+        for (MerkleTree tree : merkleTrees.values())
+        {
+            size += tree.size();
+        }
+
+        return size;
+    }
+
+    @VisibleForTesting
+    public void maxsize(Range<Token> range, int maxsize)
+    {
+        getMerkleTree(range).maxsize(maxsize);
+    }
+
+    /**
+     * Get the MerkleTree responsible for the given token.
+     * 
+     * @param t
+     * @return The given MerkleTree or null if none exist.
+     */
+    private MerkleTree getMerkleTree(Token t)
+    {
+        for (Range<Token> range : merkleTrees.keySet())
+        {
+            if (range.contains(t))
+                return merkleTrees.get(range);
+        }
+
+        return null;
+    }
+
+    private void addTrees(Collection<MerkleTree> trees)
+    {
+        for (MerkleTree tree : trees)
+        {
+            addTree(tree);
+        }
+    }
+
+    private void addTree(MerkleTree tree)
+    {
+        assert validateNonOverlapping(tree) : "Range [" + tree.fullRange + "] is intersecting an existing range";
+
+        merkleTrees.put(tree.fullRange, tree);
+    }
+
+    private boolean validateNonOverlapping(MerkleTree tree)
+    {
+        for (Range<Token> range : merkleTrees.keySet())
+        {
+            if (tree.fullRange.intersects(range))
+                return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Get an iterator for all the invalids generated by the MerkleTrees.
+     * 
+     * @return
+     */
+    public TreeRangeIterator invalids()
+    {
+        return new TreeRangeIterator();
+    }
+
+    /**
+     * Log the row count per leaf for all MerkleTrees.
+     * 
+     * @param logger
+     */
+    public void logRowCountPerLeaf(Logger logger)
+    {
+        for (MerkleTree tree : merkleTrees.values())
+        {
+            tree.histogramOfRowCountPerLeaf().log(logger);
+        }
+    }
+
+    /**
+     * Log the row size per leaf for all MerkleTrees.
+     * 
+     * @param logger
+     */
+    public void logRowSizePerLeaf(Logger logger)
+    {
+        for (MerkleTree tree : merkleTrees.values())
+        {
+            tree.histogramOfRowSizePerLeaf().log(logger);
+        }
+    }
+
+    @VisibleForTesting
+    public byte[] hash(Range<Token> range)
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        boolean hashed = false;
+
+        try
+        {
+            for (Range<Token> rt : merkleTrees.keySet())
+            {
+                if (rt.intersects(range))
+                {
+                    byte[] bytes = merkleTrees.get(rt).hash(range);
+                    if (bytes != null)
+                    {
+                        baos.write(bytes);
+                        hashed = true;
+                    }
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Unable to append merkle tree hash to result");
+        }
+        
+        return hashed ? baos.toByteArray() : null;
+    }
+
+    /**
+     * Get an iterator of all ranges and their MerkleTrees.
+     */
+    public Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator()
+    {
+        return merkleTrees.entrySet().iterator();
+    }
+
+    public class TreeRangeIterator extends AbstractIterator<MerkleTree.TreeRange> implements
+            Iterable<MerkleTree.TreeRange>,
+            PeekingIterator<MerkleTree.TreeRange>
+    {
+        private final Iterator<MerkleTree> it;
+
+        private MerkleTree.TreeRangeIterator current = null;
+
+        private TreeRangeIterator()
+        {
+            it = merkleTrees.values().iterator();
+        }
+
+        public MerkleTree.TreeRange computeNext()
+        {
+            if (current == null || !current.hasNext())
+                return nextIterator();
+
+            return current.next();
+        }
+
+        private MerkleTree.TreeRange nextIterator()
+        {
+            if (it.hasNext())
+            {
+                current = it.next().invalids();
+
+                return current.next();
+            }
+
+            return endOfData();
+        }
+
+        public Iterator<MerkleTree.TreeRange> iterator()
+        {
+            return this;
+        }
+    }
+
+    /**
+     * Get the differences between the two sets of MerkleTrees.
+     * 
+     * @param ltree
+     * @param rtree
+     * @return
+     */
+    public static List<Range<Token>> difference(MerkleTrees ltree, MerkleTrees rtree)
+    {
+        List<Range<Token>> differences = new ArrayList<>();
+        for (MerkleTree tree : ltree.merkleTrees.values())
+        {
+            differences.addAll(MerkleTree.difference(tree, rtree.getMerkleTree(tree.fullRange)));
+        }
+        return differences;
+    }
+
+    public static class MerkleTreesSerializer implements IVersionedSerializer<MerkleTrees>
+    {
+        public void serialize(MerkleTrees trees, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeInt(trees.merkleTrees.size());
+            for (MerkleTree tree : trees.merkleTrees.values())
+            {
+                MerkleTree.serializer.serialize(tree, out, version);
+            }
+        }
+
+        public MerkleTrees deserialize(DataInputPlus in, int version) throws IOException
+        {
+            IPartitioner partitioner = null;
+            int nTrees = in.readInt();
+            Collection<MerkleTree> trees = new ArrayList<>(nTrees);
+            if (nTrees > 0)
+            {
+                for (int i = 0; i < nTrees; i++)
+                {
+                    MerkleTree tree = MerkleTree.serializer.deserialize(in, version);
+                    trees.add(tree);
+
+                    if (partitioner == null)
+                        partitioner = tree.partitioner();
+                    else
+                        assert tree.partitioner() == partitioner;
+                }
+            }
+
+            return new MerkleTrees(partitioner, trees);
+        }
+
+        public long serializedSize(MerkleTrees trees, int version)
+        {
+            assert trees != null;
+
+            long size = TypeSizes.sizeof(trees.merkleTrees.size());
+            for (MerkleTree tree : trees.merkleTrees.values())
+            {
+                size += MerkleTree.serializer.serializedSize(tree, version);
+            }
+            return size;
+        }
+
+    }
+
+    private static class TokenRangeComparator implements Comparator<Range<Token>>
+    {
+        @Override
+        public int compare(Range<Token> rt1, Range<Token> rt2)
+        {
+            if (rt1.left.compareTo(rt2.left) == 0)
+                return 0;
+
+            return rt1.compareTo(rt2);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/gms.EndpointState.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/gms.EndpointState.bin b/test/data/serialization/3.0/gms.EndpointState.bin
new file mode 100644
index 0000000..a230ae1
Binary files /dev/null and b/test/data/serialization/3.0/gms.EndpointState.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/gms.Gossip.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/gms.Gossip.bin b/test/data/serialization/3.0/gms.Gossip.bin
new file mode 100644
index 0000000..af5ac57
Binary files /dev/null and b/test/data/serialization/3.0/gms.Gossip.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.SyncComplete.bin b/test/data/serialization/3.0/service.SyncComplete.bin
new file mode 100644
index 0000000..73ea4b4
Binary files /dev/null and b/test/data/serialization/3.0/service.SyncComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.SyncRequest.bin b/test/data/serialization/3.0/service.SyncRequest.bin
new file mode 100644
index 0000000..7e09777
Binary files /dev/null and b/test/data/serialization/3.0/service.SyncRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.ValidationComplete.bin b/test/data/serialization/3.0/service.ValidationComplete.bin
new file mode 100644
index 0000000..b8f0fb9
Binary files /dev/null and b/test/data/serialization/3.0/service.ValidationComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/data/serialization/3.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/3.0/service.ValidationRequest.bin b/test/data/serialization/3.0/service.ValidationRequest.bin
new file mode 100644
index 0000000..a00763b
Binary files /dev/null and b/test/data/serialization/3.0/service.ValidationRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 4ee5ce4..501f4ae 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -37,7 +37,7 @@ import java.util.Map;
 
 public class AbstractSerializationsTester
 {
-    protected static final String CUR_VER = System.getProperty("cassandra.version", "2.1");
+    protected static final String CUR_VER = System.getProperty("cassandra.version", "3.0");
     protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> ()
     {{
         put("0.7", 1);
@@ -46,6 +46,7 @@ public class AbstractSerializationsTester
         put("2.0", MessagingService.VERSION_20);
         put("2.1", MessagingService.VERSION_21);
         put("2.2", MessagingService.VERSION_22);
+        put("3.0", MessagingService.VERSION_30);
     }};
 
     protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index c3be08a..46c7068 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -173,7 +173,7 @@ public class LeveledCompactionStrategyTest
         int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
         UUID parentRepSession = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false);
-        RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
+        RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 77a6ac4..db3f683 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -37,6 +38,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 
 import static org.junit.Assert.assertEquals;
 
@@ -65,10 +67,11 @@ public class LocalSyncTaskTest extends SchemaLoader
         final InetAddress ep2 = InetAddress.getByName("127.0.0.1");
 
         Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken());
-        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", range);
+        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
 
-        MerkleTree tree1 = createInitialTree(desc);
-        MerkleTree tree2 = createInitialTree(desc);
+        MerkleTrees tree2 = createInitialTree(desc);
 
         // difference the trees
         // note: we reuse the same endpoint which is bogus in theory but fine here
@@ -90,10 +93,11 @@ public class LocalSyncTaskTest extends SchemaLoader
 
         ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range), false);
 
-        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range);
+        RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
+
+        MerkleTrees tree1 = createInitialTree(desc);
 
-        MerkleTree tree1 = createInitialTree(desc);
-        MerkleTree tree2 = createInitialTree(desc);
+        MerkleTrees tree2 = createInitialTree(desc);
 
         // change a range in one of the trees
         Token token = partirioner.midpoint(range.left, range.right);
@@ -115,9 +119,10 @@ public class LocalSyncTaskTest extends SchemaLoader
         assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences);
     }
 
-    private MerkleTree createInitialTree(RepairJobDesc desc)
+    private MerkleTrees createInitialTree(RepairJobDesc desc)
     {
-        MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15));
+        MerkleTrees tree = new MerkleTrees(partirioner);
+        tree.addMerkleTrees((int) Math.pow(2, 15), desc.ranges);
         tree.init();
         for (MerkleTree.TreeRange r : tree.invalids())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 0af94b2..d40982c 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -53,7 +54,7 @@ public class RepairSessionTest
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
         Set<InetAddress> endpoints = Sets.newHashSet(remote);
-        RepairSession session = new RepairSession(parentSessionId, sessionId, repairRange, "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1");
+        RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1");
 
         // perform convict
         session.convict(remote, Double.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index d77daf0..8fe76c3 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -19,6 +19,9 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import org.junit.After;
@@ -43,6 +46,7 @@ import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
 import static org.junit.Assert.assertEquals;
@@ -77,7 +81,7 @@ public class ValidatorTest
     public void testValidatorComplete() throws Throwable
     {
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
 
         final SimpleCondition lock = new SimpleCondition();
         MessagingService.instance().addMessageSink(new IMessageSink()
@@ -91,8 +95,8 @@ public class ValidatorTest
                         RepairMessage m = (RepairMessage) message.payload;
                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
                         assertEquals(desc, m.desc);
-                        assertTrue(((ValidationComplete) m).success);
-                        assertNotNull(((ValidationComplete) m).tree);
+                        assertTrue(((ValidationComplete) m).success());
+                        assertNotNull(((ValidationComplete) m).trees);
                     }
                 }
                 finally
@@ -113,7 +117,8 @@ public class ValidatorTest
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
 
         Validator validator = new Validator(desc, remote, 0);
-        MerkleTree tree = new MerkleTree(partitioner, validator.desc.range, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
+        MerkleTrees tree = new MerkleTrees(partitioner);
+        tree.addMerkleTrees((int) Math.pow(2, 15), validator.desc.ranges);
         validator.prepare(cfs, tree);
 
         // and confirm that the tree was split
@@ -137,7 +142,7 @@ public class ValidatorTest
     public void testValidatorFailed() throws Throwable
     {
         Range<Token> range = new Range<>(partitioner.getMinimumToken(), partitioner.getRandomToken());
-        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, range);
+        final RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), keyspace, columnFamily, Arrays.asList(range));
 
         final SimpleCondition lock = new SimpleCondition();
         MessagingService.instance().addMessageSink(new IMessageSink()
@@ -151,8 +156,8 @@ public class ValidatorTest
                         RepairMessage m = (RepairMessage) message.payload;
                         assertEquals(RepairMessage.Type.VALIDATION_COMPLETE, m.messageType);
                         assertEquals(desc, m.desc);
-                        assertFalse(((ValidationComplete) m).success);
-                        assertNull(((ValidationComplete) m).tree);
+                        assertFalse(((ValidationComplete) m).success());
+                        assertNull(((ValidationComplete) m).trees);
                     }
                 }
                 finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0dd50a6c/test/unit/org/apache/cassandra/service/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java
index 80bb452..847bcea 100644
--- a/test/unit/org/apache/cassandra/service/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
 
@@ -43,7 +44,7 @@ import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
 
 public class SerializationsTest extends AbstractSerializationsTester
 {
@@ -58,7 +59,7 @@ public class SerializationsTest extends AbstractSerializationsTester
         partitionerSwitcher = Util.switchPartitioner(RandomPartitioner.instance);
         RANDOM_UUID = UUID.fromString("b5c3d033-75aa-4c2f-a819-947aac7a0c54");
         FULL_RANGE = new Range<>(Util.testPartitioner().getMinimumToken(), Util.testPartitioner().getMinimumToken());
-        DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", FULL_RANGE);
+        DESC = new RepairJobDesc(getVersion() < MessagingService.VERSION_21 ? null : RANDOM_UUID, RANDOM_UUID, "Keyspace1", "Standard1", Arrays.asList(FULL_RANGE));
     }
 
     @AfterClass
@@ -66,8 +67,7 @@ public class SerializationsTest extends AbstractSerializationsTester
     {
         partitionerSwitcher.close();
     }
-
-
+    
     private void testRepairMessageWrite(String fileName, RepairMessage... messages) throws IOException
     {
         try (DataOutputStreamPlus out = getOutput(fileName))
@@ -109,13 +109,17 @@ public class SerializationsTest extends AbstractSerializationsTester
     private void testValidationCompleteWrite() throws IOException
     {
         IPartitioner p = RandomPartitioner.instance;
+
+        MerkleTrees mt = new MerkleTrees(p);
+
         // empty validation
-        MerkleTree mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, (int) Math.pow(2, 15));
+        mt.addMerkleTree((int) Math.pow(2, 15), FULL_RANGE);
         Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(),  -1);
         ValidationComplete c0 = new ValidationComplete(DESC, mt);
 
         // validation with a tree
-        mt = new MerkleTree(p, FULL_RANGE, MerkleTree.RECOMMENDED_DEPTH, Integer.MAX_VALUE);
+        mt = new MerkleTrees(p);
+        mt.addMerkleTree(Integer.MAX_VALUE, FULL_RANGE);
         for (int i = 0; i < 10; i++)
             mt.split(p.getRandomToken());
         Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1);
@@ -140,24 +144,24 @@ public class SerializationsTest extends AbstractSerializationsTester
             assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
             assert DESC.equals(message.desc);
 
-            assert ((ValidationComplete) message).success;
-            assert ((ValidationComplete) message).tree != null;
+            assert ((ValidationComplete) message).success();
+            assert ((ValidationComplete) message).trees != null;
 
             // validation with a tree
             message = RepairMessage.serializer.deserialize(in, getVersion());
             assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
             assert DESC.equals(message.desc);
 
-            assert ((ValidationComplete) message).success;
-            assert ((ValidationComplete) message).tree != null;
+            assert ((ValidationComplete) message).success();
+            assert ((ValidationComplete) message).trees != null;
 
             // failed validation
             message = RepairMessage.serializer.deserialize(in, getVersion());
             assert message.messageType == RepairMessage.Type.VALIDATION_COMPLETE;
             assert DESC.equals(message.desc);
 
-            assert !((ValidationComplete) message).success;
-            assert ((ValidationComplete) message).tree == null;
+            assert !((ValidationComplete) message).success();
+            assert ((ValidationComplete) message).trees == null;
 
             // MessageOuts
             for (int i = 0; i < 3; i++)


Mime
View raw message