cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggles...@apache.org
Subject [2/3] cassandra git commit: Add repair streaming preview
Date Mon, 24 Apr 2017 16:24:51 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 3b13cd8..6c8ff9d 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -47,6 +48,7 @@ public class RepairOption
     public static final String TRACE_KEY = "trace";
     public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair";
     public static final String PULL_REPAIR_KEY = "pullRepair";
+    public static final String PREVIEW = "previewKind";
 
     // we don't want to push nodes too much for repair
     public static final int MAX_JOB_THREADS = 4;
@@ -136,6 +138,7 @@ public class RepairOption
         RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY));
         boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
         boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
+        PreviewKind previewKind = PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString()));
         boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
         boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY));
 
@@ -171,7 +174,7 @@ public class RepairOption
             }
         }
 
-        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair);
+        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, previewKind);
 
         // data centers
         String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -252,13 +255,14 @@ public class RepairOption
     private final int jobThreads;
     private final boolean isSubrangeRepair;
     private final boolean pullRepair;
+    private final PreviewKind previewKind;
 
     private final Collection<String> columnFamilies = new HashSet<>();
     private final Collection<String> dataCenters = new HashSet<>();
     private final Collection<String> hosts = new HashSet<>();
     private final Collection<Range<Token>> ranges = new HashSet<>();
 
-    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair)
+    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, PreviewKind previewKind)
     {
         if (FBUtilities.isWindows &&
             (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -277,6 +281,7 @@ public class RepairOption
         this.ranges.addAll(ranges);
         this.isSubrangeRepair = isSubrangeRepair;
         this.pullRepair = pullRepair;
+        this.previewKind = previewKind;
     }
 
     public RepairParallelism getParallelism()
@@ -339,6 +344,16 @@ public class RepairOption
         return isSubrangeRepair;
     }
 
+    public PreviewKind getPreviewKind()
+    {
+        return previewKind;
+    }
+
+    public boolean isPreview()
+    {
+        return previewKind.isPreview();
+    }
+
     public boolean isInLocalDCOnly() {
         return dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter());
     }
@@ -347,16 +362,17 @@ public class RepairOption
     public String toString()
     {
         return "repair options (" +
-                       "parallelism: " + parallelism +
-                       ", primary range: " + primaryRange +
-                       ", incremental: " + incremental +
-                       ", job threads: " + jobThreads +
-                       ", ColumnFamilies: " + columnFamilies +
-                       ", dataCenters: " + dataCenters +
-                       ", hosts: " + hosts +
-                       ", # of ranges: " + ranges.size() +
-                       ", pull repair: " + pullRepair +
-                       ')';
+               "parallelism: " + parallelism +
+               ", primary range: " + primaryRange +
+               ", incremental: " + incremental +
+               ", job threads: " + jobThreads +
+               ", ColumnFamilies: " + columnFamilies +
+               ", dataCenters: " + dataCenters +
+               ", hosts: " + hosts +
+               ", previewKind: " + previewKind +
+               ", # of ranges: " + ranges.size() +
+               ", pull repair: " + pullRepair +
+               ')';
     }
 
     public Map<String, String> asMap()
@@ -373,6 +389,7 @@ public class RepairOption
         options.put(TRACE_KEY, Boolean.toString(trace));
         options.put(RANGES_KEY, Joiner.on(",").join(ranges));
         options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+        options.put(PREVIEW, previewKind.toString());
         return options;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
index 178e710..7b68daf 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.repair.messages;
 
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -26,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.repair.NodePair;
 import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.streaming.SessionSummary;
 
 /**
  *
@@ -40,16 +43,20 @@ public class SyncComplete extends RepairMessage
     /** true if sync success, false otherwise */
     public final boolean success;
 
-    public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success)
+    public final List<SessionSummary> summaries;
+
+    public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries)
     {
         super(Type.SYNC_COMPLETE, desc);
         this.nodes = nodes;
         this.success = success;
+        this.summaries = summaries;
     }
 
-    public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success)
+    public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success, List<SessionSummary> summaries)
     {
         super(Type.SYNC_COMPLETE, desc);
+        this.summaries = summaries;
         this.nodes = new NodePair(endpoint1, endpoint2);
         this.success = success;
     }
@@ -63,13 +70,14 @@ public class SyncComplete extends RepairMessage
         return messageType == other.messageType &&
                desc.equals(other.desc) &&
                success == other.success &&
-               nodes.equals(other.nodes);
+               nodes.equals(other.nodes) &&
+               summaries.equals(other.summaries);
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, desc, success, nodes);
+        return Objects.hash(messageType, desc, success, nodes, summaries);
     }
 
     private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete>
@@ -79,13 +87,28 @@ public class SyncComplete extends RepairMessage
             RepairJobDesc.serializer.serialize(message.desc, out, version);
             NodePair.serializer.serialize(message.nodes, out, version);
             out.writeBoolean(message.success);
+
+            out.writeInt(message.summaries.size());
+            for (SessionSummary summary: message.summaries)
+            {
+                SessionSummary.serializer.serialize(summary, out, version);
+            }
         }
 
         public SyncComplete deserialize(DataInputPlus in, int version) throws IOException
         {
             RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
             NodePair nodes = NodePair.serializer.deserialize(in, version);
-            return new SyncComplete(desc, nodes, in.readBoolean());
+            boolean success = in.readBoolean();
+
+            int numSummaries = in.readInt();
+            List<SessionSummary> summaries = new ArrayList<>(numSummaries);
+            for (int i=0; i<numSummaries; i++)
+            {
+                summaries.add(SessionSummary.serializer.deserialize(in, version));
+            }
+
+            return new SyncComplete(desc, nodes, success, summaries);
         }
 
         public long serializedSize(SyncComplete message, int version)
@@ -93,6 +116,13 @@ public class SyncComplete extends RepairMessage
             long size = RepairJobDesc.serializer.serializedSize(message.desc, version);
             size += NodePair.serializer.serializedSize(message.nodes, version);
             size += TypeSizes.sizeof(message.success);
+
+            size += TypeSizes.sizeof(message.summaries.size());
+            for (SessionSummary summary: message.summaries)
+            {
+                size += SessionSummary.serializer.serializedSize(summary, version);
+            }
+
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
index e31cc6c..01601e2 100644
--- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.RepairJobDesc;
+import org.apache.cassandra.streaming.PreviewKind;
 
 /**
  * Body part of SYNC_REQUEST repair message.
@@ -48,14 +49,16 @@ public class SyncRequest extends RepairMessage
     public final InetAddress src;
     public final InetAddress dst;
     public final Collection<Range<Token>> ranges;
+    public final PreviewKind previewKind;
 
-    public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges)
+    public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, PreviewKind previewKind)
     {
         super(Type.SYNC_REQUEST, desc);
         this.initiator = initiator;
         this.src = src;
         this.dst = dst;
         this.ranges = ranges;
+        this.previewKind = previewKind;
     }
 
     @Override
@@ -69,13 +72,14 @@ public class SyncRequest extends RepairMessage
                initiator.equals(req.initiator) &&
                src.equals(req.src) &&
                dst.equals(req.dst) &&
-               ranges.equals(req.ranges);
+               ranges.equals(req.ranges) &&
+               previewKind == req.previewKind;
     }
 
     @Override
     public int hashCode()
     {
-        return Objects.hash(messageType, desc, initiator, src, dst, ranges);
+        return Objects.hash(messageType, desc, initiator, src, dst, ranges, previewKind);
     }
 
     public static class SyncRequestSerializer implements MessageSerializer<SyncRequest>
@@ -92,6 +96,7 @@ public class SyncRequest extends RepairMessage
                 MessagingService.validatePartitioner(range);
                 AbstractBounds.tokenSerializer.serialize(range, out, version);
             }
+            out.writeInt(message.previewKind.getSerializationVal());
         }
 
         public SyncRequest deserialize(DataInputPlus in, int version) throws IOException
@@ -104,7 +109,8 @@ public class SyncRequest extends RepairMessage
             List<Range<Token>> ranges = new ArrayList<>(rangesCount);
             for (int i = 0; i < rangesCount; ++i)
                 ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version));
-            return new SyncRequest(desc, owner, src, dst, ranges);
+            PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
+            return new SyncRequest(desc, owner, src, dst, ranges, previewKind);
         }
 
         public long serializedSize(SyncRequest message, int version)
@@ -114,6 +120,7 @@ public class SyncRequest extends RepairMessage
             size += TypeSizes.sizeof(message.ranges.size());
             for (Range<Token> range : message.ranges)
                 size += AbstractBounds.tokenSerializer.serializedSize(range, version);
+            size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
             return size;
         }
     }
@@ -126,6 +133,7 @@ public class SyncRequest extends RepairMessage
                 ", src=" + src +
                 ", dst=" + dst +
                 ", ranges=" + ranges +
+                ", previewKind=" + previewKind +
                 "} " + super.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index fd98b37..aadf7c1 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.RepairSession;
@@ -167,6 +168,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              Set<InetAddress> endpoints,
                                              boolean isConsistent,
                                              boolean pullRepair,
+                                             PreviewKind previewKind,
                                              ListeningExecutorService executor,
                                              String... cfnames)
     {
@@ -176,7 +178,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, previewKind, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners
@@ -319,7 +321,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
     {
         // we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables
         long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE;
-        registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
+        registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
         final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -351,7 +353,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         {
             if (FailureDetector.instance.isAlive(neighbour))
             {
-                PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal());
+                PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
                 MessageOut<RepairMessage> msg = message.createMessage();
                 MessagingService.instance().sendRR(msg, neighbour, callback, DatabaseDescriptor.getRpcTimeout(), true);
             }
@@ -386,7 +388,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         throw new RuntimeException(errorMsg);
     }
 
-    public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
+    public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind)
     {
         assert isIncremental || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE;
         if (!registeredForEndpointChanges)
@@ -396,7 +398,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             registeredForEndpointChanges = true;
         }
 
-        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal));
+        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal, previewKind));
     }
 
     public ParentRepairSession getParentRepairSession(UUID parentSessionId)
@@ -444,7 +446,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             case SYNC_COMPLETE:
                 // one of replica is synced.
                 SyncComplete sync = (SyncComplete) message;
-                session.syncComplete(desc, sync.nodes, sync.success);
+                session.syncComplete(desc, sync.nodes, sync.success, sync.summaries);
                 break;
             default:
                 break;
@@ -464,8 +466,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         public final boolean isGlobal;
         public final long repairedAt;
         public final InetAddress coordinator;
+        public final PreviewKind previewKind;
 
-        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal)
+        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind)
         {
             this.coordinator = coordinator;
             for (ColumnFamilyStore cfs : columnFamilyStores)
@@ -476,6 +479,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             this.repairedAt = repairedAt;
             this.isIncremental = isIncremental;
             this.isGlobal = isGlobal;
+            this.previewKind = previewKind;
+        }
+
+        public boolean isPreview()
+        {
+            return previewKind != PreviewKind.NONE;
+        }
+
+        public Predicate<SSTableReader> getPreviewPredicate()
+        {
+            switch (previewKind)
+            {
+                case ALL:
+                    return (s) -> true;
+                case REPAIRED:
+                    return (s) -> s.isRepaired();
+                case UNREPAIRED:
+                    return (s) -> !s.isRepaired();
+                default:
+                    throw new RuntimeException("Can't get preview predicate for preview kind " + previewKind);
+            }
         }
 
         public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index 86340a5..5f734c9 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -64,10 +64,12 @@ public class ConnectionHandler
 
     private IncomingMessageHandler incoming;
     private OutgoingMessageHandler outgoing;
+    private final boolean isPreview;
 
-    ConnectionHandler(StreamSession session, int incomingSocketTimeout)
+    ConnectionHandler(StreamSession session, int incomingSocketTimeout, boolean isPreview)
     {
         this.session = session;
+        this.isPreview = isPreview;
         this.incoming = new IncomingMessageHandler(session, incomingSocketTimeout);
         this.outgoing = new OutgoingMessageHandler(session);
     }
@@ -142,6 +144,9 @@ public class ConnectionHandler
         if (outgoing.isClosed())
             throw new RuntimeException("Outgoing stream handler has been closed");
 
+        if (message.type == StreamMessage.Type.FILE && isPreview)
+            throw new RuntimeException("Cannot send file messages for preview streaming sessions");
+
         outgoing.enqueue(message);
     }
 
@@ -191,14 +196,14 @@ public class ConnectionHandler
         @SuppressWarnings("resource")
         private void sendInitMessage() throws IOException
         {
-            StreamInitMessage message = new StreamInitMessage(
-                    FBUtilities.getBroadcastAddress(),
-                    session.sessionIndex(),
-                    session.planId(),
-                    session.streamOperation(),
-                    !isOutgoingHandler,
-                    session.keepSSTableLevel(),
-                    session.getPendingRepair());
+            StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(),
+                                                              session.sessionIndex(),
+                                                              session.planId(),
+                                                              session.streamOperation(),
+                                                              !isOutgoingHandler,
+                                                              session.keepSSTableLevel(),
+                                                              session.getPendingRepair(),
+                                                              session.getPreviewKind());
             ByteBuffer messageBuf = message.createMessage(false, protocolVersion);
             DataOutputStreamPlus out = getWriteChannel(socket);
             out.write(messageBuf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/PreviewKind.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java
new file mode 100644
index 0000000..3b4d2a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+
+import java.util.UUID;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+
+public enum PreviewKind
+{
+    NONE(0, null),
+    ALL(1, Predicates.alwaysTrue()),
+    UNREPAIRED(2, Predicates.not(SSTableReader::isRepaired)),
+    REPAIRED(3, SSTableReader::isRepaired);
+
+    private final int serializationVal;
+    private final Predicate<SSTableReader> streamingPredicate;
+
+    PreviewKind(int serializationVal, Predicate<SSTableReader> streamingPredicate)
+    {
+        assert ordinal() == serializationVal;
+        this.serializationVal = serializationVal;
+        this.streamingPredicate = streamingPredicate;
+    }
+
+    public int getSerializationVal()
+    {
+        return serializationVal;
+    }
+
+    public static PreviewKind deserialize(int serializationVal)
+    {
+        return values()[serializationVal];
+    }
+
+    public Predicate<SSTableReader> getStreamingPredicate()
+    {
+        return streamingPredicate;
+    }
+
+    public boolean isPreview()
+    {
+        return this != NONE;
+    }
+
+    public String logPrefix()
+    {
+        return isPreview() ? "preview repair" : "repair";
+    }
+
+    public String logPrefix(UUID sessionId)
+    {
+        return '[' + logPrefix() + " #" + sessionId.toString() + ']';
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index 3bcb20c..1521614 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -27,6 +27,8 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
+import org.apache.cassandra.utils.FBUtilities;
+
 /**
  * Stream session info.
  */
@@ -190,4 +192,9 @@ public final class SessionInfo implements Serializable
         });
         return Iterables.size(completed);
     }
+
+    public SessionSummary createSummary()
+    {
+        return new SessionSummary(FBUtilities.getBroadcastAddress(), peer, receivingSummaries, sendingSummaries);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/SessionSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java b/src/java/org/apache/cassandra/streaming/SessionSummary.java
new file mode 100644
index 0000000..d52c2ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.serializers.InetAddressSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SessionSummary
+{
+    public final InetAddress coordinator;
+    public final InetAddress peer;
+    /** Immutable collection of receiving summaries */
+    public final Collection<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final Collection<StreamSummary> sendingSummaries;
+
+    public SessionSummary(InetAddress coordinator, InetAddress peer,
+                          Collection<StreamSummary> receivingSummaries,
+                          Collection<StreamSummary> sendingSummaries)
+    {
+        assert coordinator != null;
+        assert peer != null;
+        assert receivingSummaries != null;
+        assert sendingSummaries != null;
+
+        this.coordinator = coordinator;
+        this.peer = peer;
+        this.receivingSummaries = receivingSummaries;
+        this.sendingSummaries = sendingSummaries;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        SessionSummary summary = (SessionSummary) o;
+
+        if (!coordinator.equals(summary.coordinator)) return false;
+        if (!peer.equals(summary.peer)) return false;
+        if (!receivingSummaries.equals(summary.receivingSummaries)) return false;
+        return sendingSummaries.equals(summary.sendingSummaries);
+    }
+
+    public int hashCode()
+    {
+        int result = coordinator.hashCode();
+        result = 31 * result + peer.hashCode();
+        result = 31 * result + receivingSummaries.hashCode();
+        result = 31 * result + sendingSummaries.hashCode();
+        return result;
+    }
+
+    public static IVersionedSerializer<SessionSummary> serializer = new IVersionedSerializer<SessionSummary>()
+    {
+        public void serialize(SessionSummary summary, DataOutputPlus out, int version) throws IOException
+        {
+            ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator), out);
+            ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.peer), out);
+
+            out.writeInt(summary.receivingSummaries.size());
+            for (StreamSummary streamSummary: summary.receivingSummaries)
+            {
+                StreamSummary.serializer.serialize(streamSummary, out, version);
+            }
+
+            out.writeInt(summary.sendingSummaries.size());
+            for (StreamSummary streamSummary: summary.sendingSummaries)
+            {
+                StreamSummary.serializer.serialize(streamSummary, out, version);
+            }
+        }
+
+        public SessionSummary deserialize(DataInputPlus in, int version) throws IOException
+        {
+            InetAddress coordinator = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
+            InetAddress peer = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in));
+
+            int numRcvd = in.readInt();
+            List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd);
+            for (int i=0; i<numRcvd; i++)
+            {
+                receivingSummaries.add(StreamSummary.serializer.deserialize(in, version));
+            }
+
+            int numSent = in.readInt();
+            List<StreamSummary> sendingSummaries = new ArrayList<>(numRcvd);
+            for (int i=0; i<numSent; i++)
+            {
+                sendingSummaries.add(StreamSummary.serializer.deserialize(in, version));
+            }
+
+            return new SessionSummary(coordinator, peer, receivingSummaries, sendingSummaries);
+        }
+
+        public long serializedSize(SessionSummary summary, int version)
+        {
+            long size = 0;
+            size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator));
+            size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.peer));
+
+            size += TypeSizes.sizeof(summary.receivingSummaries.size());
+            for (StreamSummary streamSummary: summary.receivingSummaries)
+            {
+                size += StreamSummary.serializer.serializedSize(streamSummary, version);
+            }
+            size += TypeSizes.sizeof(summary.sendingSummaries.size());
+            for (StreamSummary streamSummary: summary.sendingSummaries)
+            {
+                size += StreamSummary.serializer.serializedSize(streamSummary, version);
+            }
+            return size;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 6aa34cd..9059f45 100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@ -49,15 +49,17 @@ public class StreamCoordinator
     private final boolean keepSSTableLevel;
     private Iterator<StreamSession> sessionsToConnect = null;
     private final UUID pendingRepair;
+    private final PreviewKind previewKind;
 
     public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory,
-                             boolean connectSequentially, UUID pendingRepair)
+                             boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
     {
         this.connectionsPerHost = connectionsPerHost;
         this.factory = factory;
         this.keepSSTableLevel = keepSSTableLevel;
         this.connectSequentially = connectSequentially;
         this.pendingRepair = pendingRepair;
+        this.previewKind = previewKind;
     }
 
     public void setConnectionFactory(StreamConnectionFactory factory)
@@ -293,7 +295,7 @@ public class StreamCoordinator
             // create
             if (streamSessions.size() < connectionsPerHost)
             {
-                StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair);
+                StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair, previewKind);
                 streamSessions.put(++lastReturned, session);
                 return session;
             }
@@ -325,7 +327,7 @@ public class StreamCoordinator
             StreamSession session = streamSessions.get(id);
             if (session == null)
             {
-                session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair);
+                session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair, previewKind);
                 streamSessions.put(id, session);
             }
             return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index b5a6214..05a8d30 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -22,9 +22,10 @@ import java.util.*;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
+
 /**
  * {@link StreamPlan} is a helper class that builds StreamOperation of given configuration.
  *
@@ -47,20 +48,20 @@ public class StreamPlan
      */
     public StreamPlan(StreamOperation streamOperation)
     {
-        this(streamOperation, 1, false, false, null);
+        this(streamOperation, 1, false, false, NO_PENDING_REPAIR, PreviewKind.NONE);
     }
 
     public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially)
     {
-        this(streamOperation, 1, keepSSTableLevels, connectSequentially, null);
+        this(streamOperation, 1, keepSSTableLevels, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE);
     }
 
     public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean keepSSTableLevels,
-                      boolean connectSequentially, UUID pendingRepair)
+                      boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind)
     {
         this.streamOperation = streamOperation;
         this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory(),
-                                                 connectSequentially, pendingRepair);
+                                                 connectSequentially, pendingRepair, previewKind);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index b7e475a..34e7cc8 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,6 +93,8 @@ public class StreamReceiveTask extends StreamTask
      */
     public synchronized void received(SSTableMultiWriter sstable)
     {
+        Preconditions.checkState(!session.isPreview(), "we should never receive sstables when previewing");
+
         if (done)
         {
             logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 7845986..67d7d0d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -71,9 +71,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             set(getCurrentState());
     }
 
-    private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair)
+    private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair, PreviewKind previewKind)
     {
-        this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair));
+        this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair, previewKind));
     }
 
     static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners,
@@ -107,7 +107,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
                                                                     boolean isForOutgoing,
                                                                     int version,
                                                                     boolean keepSSTableLevel,
-                                                                    UUID pendingRepair) throws IOException
+                                                                    UUID pendingRepair,
+                                                                    PreviewKind previewKind) throws IOException
     {
         StreamResultFuture future = StreamManager.instance.getReceivingStream(planId);
         if (future == null)
@@ -115,7 +116,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState>
             logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription());
 
             // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure.
-            future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair);
+            future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind);
             StreamManager.instance.registerReceiving(future);
         }
         future.attachConnection(from, sessionIndex, connection, isForOutgoing, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index adb8e79..5ca9938 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -165,6 +165,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     private final boolean keepSSTableLevel;
     private ScheduledFuture<?> keepAliveFuture = null;
     private final UUID pendingRepair;
+    private final PreviewKind previewKind;
 
     public static enum State
     {
@@ -181,12 +182,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
 
     /**
      * Create new streaming session with the peer.
-     *
-     * @param peer Address of streaming peer
+     *  @param peer Address of streaming peer
      * @param connecting Actual connecting address
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair)
+    public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
     {
         this.peer = peer;
         this.connecting = connecting;
@@ -194,10 +194,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         this.factory = factory;
         this.handler = new ConnectionHandler(this, isKeepAliveSupported()?
                                                    (int)TimeUnit.SECONDS.toMillis(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod()) :
-                                                   DatabaseDescriptor.getStreamingSocketTimeout());
+                                                   DatabaseDescriptor.getStreamingSocketTimeout(), previewKind.isPreview());
         this.metrics = StreamingMetrics.get(connecting);
         this.keepSSTableLevel = keepSSTableLevel;
         this.pendingRepair = pendingRepair;
+        this.previewKind = previewKind;
     }
 
     public UUID planId()
@@ -225,6 +226,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         return pendingRepair;
     }
 
+    public boolean isPreview()
+    {
+        return previewKind.isPreview();
+    }
+
+    public PreviewKind getPreviewKind()
+    {
+        return previewKind;
+    }
+
     public LifecycleTransaction getTransaction(TableId tableId)
     {
         assert receivers.containsKey(tableId);
@@ -314,7 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             flushSSTables(stores);
 
         List<Range<Token>> normalizedRanges = Range.normalize(ranges);
-        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair);
+        List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair, previewKind);
         try
         {
             addTransferFiles(sections);
@@ -356,7 +367,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
     }
 
     @VisibleForTesting
-    public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair)
+    public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind)
     {
         Refs<SSTableReader> refs = new Refs<>();
         try
@@ -370,7 +381,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                     Set<SSTableReader> sstables = Sets.newHashSet();
                     SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
                     Predicate<SSTableReader> predicate;
-                    if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR)
+                    if (previewKind.isPreview())
+                    {
+                        predicate = previewKind.getStreamingPredicate();
+                    }
+                    else if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR)
                     {
                         predicate = Predicates.alwaysTrue();
                     }
@@ -620,6 +635,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
             handler.sendMessage(prepare);
         }
 
+        if (isPreview())
+        {
+            completePreview();
+            return;
+        }
+
         // if there are files to stream
         if (!maybeCompleted())
             startStreamingFiles();
@@ -650,6 +671,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber
      */
     public void receive(IncomingFileMessage message)
     {
+        if (isPreview())
+        {
+            throw new RuntimeException("Cannot receive files for preview session");
+        }
+
         long headerSize = message.header.size();
         StreamingMetrics.totalIncomingBytes.inc(headerSize);
         metrics.incomingBytes.inc(headerSize);
@@ -753,6 +779,22 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         closeSession(State.FAILED);
     }
 
+    private void completePreview()
+    {
+        try
+        {
+            state(State.WAIT_COMPLETE);
+            closeSession(State.COMPLETE);
+        }
+        finally
+        {
+            // aborting the tasks here needs to be the last thing we do so that we
+            // accurately report expected streaming, but don't leak any sstable refs
+            for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
+                task.abort();
+        }
+    }
+
     private boolean maybeCompleted()
     {
         boolean completed = receivers.isEmpty() && transfers.isEmpty();
@@ -803,6 +845,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         streamResult.handleSessionPrepared(this);
 
         state(State.STREAMING);
+
         for (StreamTransferTask task : transfers.values())
         {
             Collection<OutgoingFileMessage> messages = task.getFileMessages();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java b/src/java/org/apache/cassandra/streaming/StreamState.java
index 4ee3c8d..be37677 100644
--- a/src/java/org/apache/cassandra/streaming/StreamState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamState.java
@@ -18,11 +18,13 @@
 package org.apache.cassandra.streaming;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 
 /**
  * Current snapshot of streaming progress.
@@ -50,4 +52,9 @@ public class StreamState implements Serializable
             }
         });
     }
+
+    public List<SessionSummary> createSummaries()
+    {
+        return Lists.newArrayList(Iterables.transform(sessions, SessionInfo::createSummary));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
index 4619561..ceaa4d1 100644
--- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
+++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
@@ -50,8 +51,9 @@ public class StreamInitMessage
     public final boolean isForOutgoing;
     public final boolean keepSSTableLevel;
     public final UUID pendingRepair;
+    public final PreviewKind previewKind;
 
-    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair)
+    public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind)
     {
         this.from = from;
         this.sessionIndex = sessionIndex;
@@ -60,6 +62,7 @@ public class StreamInitMessage
         this.isForOutgoing = isForOutgoing;
         this.keepSSTableLevel = keepSSTableLevel;
         this.pendingRepair = pendingRepair;
+        this.previewKind = previewKind;
     }
 
     /**
@@ -120,6 +123,7 @@ public class StreamInitMessage
             {
                 UUIDSerializer.serializer.serialize(message.pendingRepair, out, MessagingService.current_version);
             }
+            out.writeInt(message.previewKind.getSerializationVal());
         }
 
         public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException
@@ -132,7 +136,8 @@ public class StreamInitMessage
             boolean keepSSTableLevel = in.readBoolean();
 
             UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null;
-            return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair);
+            PreviewKind previewKind = PreviewKind.deserialize(in.readInt());
+            return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair, previewKind);
         }
 
         public long serializedSize(StreamInitMessage message, int version)
@@ -148,6 +153,7 @@ public class StreamInitMessage
             {
                 size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version);
             }
+            size += TypeSizes.sizeof(message.previewKind.getSerializationVal());
             return size;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 48f929f..317a677 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.tools.NodeProbe;
@@ -73,6 +74,12 @@ public class Repair extends NodeToolCmd
     @Option(title = "full", name = {"-full", "--full"}, description = "Use -full to issue a full repair.")
     private boolean fullRepair = false;
 
+    @Option(title = "preview", name = {"-prv", "--preview"}, description = "Determine ranges and amount of data to be streamed, but don't actually perform repair")
+    private boolean preview = false;
+
+    @Option(title = "validate", name = {"-vd", "--validate"}, description = "Checks that repaired data is in sync between nodes. Out of sync repaired data indicates a full repair should be run.")
+    private boolean validate = false;
+
     @Option(title = "job_threads", name = {"-j", "--job-threads"}, description = "Number of threads to run repair jobs. " +
                                                                                  "Usually this means number of CFs to repair concurrently. " +
                                                                                  "WARNING: increasing this puts more load on repairing nodes, so be careful. (default: 1, max: 4)")
@@ -84,6 +91,26 @@ public class Repair extends NodeToolCmd
     @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.")
     private boolean pullRepair = false;
 
+    private PreviewKind getPreviewKind()
+    {
+        if (validate)
+        {
+            return PreviewKind.REPAIRED;
+        }
+        else if (preview && fullRepair)
+        {
+            return PreviewKind.ALL;
+        }
+        else if (preview)
+        {
+            return PreviewKind.UNREPAIRED;
+        }
+        else
+        {
+            return PreviewKind.NONE;
+        }
+    }
+
     @Override
     public void execute(NodeProbe probe)
     {
@@ -112,6 +139,8 @@ public class Repair extends NodeToolCmd
             options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
             options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ","));
             options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+            options.put(RepairOption.PREVIEW, getPreviewKind().toString());
+
             if (!startToken.isEmpty() || !endToken.isEmpty())
             {
                 options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/gms.EndpointState.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.EndpointState.bin b/test/data/serialization/4.0/gms.EndpointState.bin
new file mode 100644
index 0000000..fb7d168
Binary files /dev/null and b/test/data/serialization/4.0/gms.EndpointState.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/gms.Gossip.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/gms.Gossip.bin b/test/data/serialization/4.0/gms.Gossip.bin
new file mode 100644
index 0000000..af5ac57
Binary files /dev/null and b/test/data/serialization/4.0/gms.Gossip.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.SyncComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin
new file mode 100644
index 0000000..ba84349
Binary files /dev/null and b/test/data/serialization/4.0/service.SyncComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.SyncRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin
new file mode 100644
index 0000000..6d688a4
Binary files /dev/null and b/test/data/serialization/4.0/service.SyncRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.ValidationComplete.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin
new file mode 100644
index 0000000..7433d64
Binary files /dev/null and b/test/data/serialization/4.0/service.ValidationComplete.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.ValidationRequest.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin
new file mode 100644
index 0000000..a00763b
Binary files /dev/null and b/test/data/serialization/4.0/service.ValidationRequest.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/utils.EstimatedHistogram.bin
----------------------------------------------------------------------
diff --git a/test/data/serialization/4.0/utils.EstimatedHistogram.bin b/test/data/serialization/4.0/utils.EstimatedHistogram.bin
new file mode 100644
index 0000000..e878eda
Binary files /dev/null and b/test/data/serialization/4.0/utils.EstimatedHistogram.bin differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
index 04cb083..3611f0e 100644
--- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
+++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java
@@ -36,10 +36,11 @@ import java.util.Map;
 
 public class AbstractSerializationsTester
 {
-    protected static final String CUR_VER = System.getProperty("cassandra.version", "3.0");
+    protected static final String CUR_VER = System.getProperty("cassandra.version", "4.0");
     protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> ()
     {{
         put("3.0", MessagingService.VERSION_30);
+        put("4.0", MessagingService.VERSION_40);
     }};
 
     protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
index 0ee85c6..b9e3c17 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -106,7 +107,8 @@ public class CompactionManagerGetSSTablesForValidationTest
                                                                  Sets.newHashSet(range),
                                                                  incremental,
                                                                  incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                 true);
+                                                                 true,
+                                                                 PreviewKind.NONE);
         desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range));
     }
 
@@ -135,7 +137,7 @@ public class CompactionManagerGetSSTablesForValidationTest
         modifySSTables();
 
         // get sstables for repair
-        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true);
+        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true, PreviewKind.NONE);
         Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
         Assert.assertNotNull(sstables);
         Assert.assertEquals(1, sstables.size());
@@ -150,7 +152,7 @@ public class CompactionManagerGetSSTablesForValidationTest
         modifySSTables();
 
         // get sstables for repair
-        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
         Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
         Assert.assertNotNull(sstables);
         Assert.assertEquals(2, sstables.size());
@@ -166,7 +168,7 @@ public class CompactionManagerGetSSTablesForValidationTest
         modifySSTables();
 
         // get sstables for repair
-        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false);
+        Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE);
         Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator));
         Assert.assertNotNull(sstables);
         Assert.assertEquals(3, sstables.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 0a38cd9..360a2cd 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
 import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.schema.CompactionParams;
@@ -193,9 +194,16 @@ public class LeveledCompactionStrategyTest
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds());
         UUID parentRepSession = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, ActiveRepairService.UNREPAIRED_SSTABLE, true);
+        ActiveRepairService.instance.registerParentRepairSession(parentRepSession,
+                                                                 FBUtilities.getBroadcastAddress(),
+                                                                 Arrays.asList(cfs),
+                                                                 Arrays.asList(range),
+                                                                 false,
+                                                                 ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                                 true,
+                                                                 PreviewKind.NONE);
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range));
-        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore, PreviewKind.NONE);
         CompactionManager.instance.submitValidation(cfs, validator).get();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
index f11362f..b5f8036 100644
--- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
+++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.streaming.DefaultConnectionFactory;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamEvent;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.FBUtilities;
@@ -50,7 +51,7 @@ public class StreamStateStoreTest
         Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100"));
 
         InetAddress local = FBUtilities.getBroadcastAddress();
-        StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null);
+        StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"));
 
         StreamStateStore store = new StreamStateStore();
@@ -71,7 +72,7 @@ public class StreamStateStoreTest
 
         // add different range within the same keyspace
         Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200"));
-        session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null);
+        session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE);
         session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"));
         session.state(StreamSession.State.COMPLETE);
         store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 97bd321..53f5ab3 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -789,7 +790,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
 
         List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges(
             Collections.singleton(new Range<Token>(firstToken, firstToken)),
-            Collections.singleton(cfs), null);
+            Collections.singleton(cfs), null, PreviewKind.NONE);
         assertEquals(1, sectionsBeforeRewrite.size());
         for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite)
             section.ref.release();
@@ -804,7 +805,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 while (!done.get())
                 {
                     Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken));
-                    List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null);
+                    List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null, PreviewKind.NONE);
                     if (sections.size() != 1)
                         failed.set(true);
                     for (StreamSession.SSTableStreamingSections section : sections)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index 1c508a0..d61d859 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -85,7 +86,8 @@ public abstract class AbstractRepairTest
                                                                  Sets.newHashSet(RANGE1, RANGE2, RANGE3),
                                                                  isIncremental,
                                                                  repairedAt,
-                                                                 isGlobal);
+                                                                 isGlobal,
+                                                                 PreviewKind.NONE);
         return sessionId;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 75742dc..f5e9d6b 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -91,7 +92,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         // note: we reuse the same endpoint which is bogus in theory but fine here
         TreeResponse r1 = new TreeResponse(ep1, tree1);
         TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
         task.run();
 
         assertEquals(0, task.get().numberOfDifferences);
@@ -105,9 +106,10 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession,  FBUtilities.getBroadcastAddress(),
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(),
                                                                  Arrays.asList(cfs), Arrays.asList(range), false,
-                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false);
+                                                                 ActiveRepairService.UNREPAIRED_SSTABLE, false,
+                                                                 PreviewKind.NONE);
 
         RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range));
 
@@ -128,7 +130,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         // note: we reuse the same endpoint which is bogus in theory but fine here
         TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
         TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
         task.run();
 
         // ensure that the changed range was recorded
@@ -145,7 +147,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE);
         StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1));
 
         assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair());
@@ -162,7 +164,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest
         TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
         TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner()));
 
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, PreviewKind.NONE);
         StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1));
 
         assertEquals(desc.parentSessionId, plan.getPendingRepair());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 0260cd0..5a4e5b1 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -62,7 +63,10 @@ public class RepairSessionTest
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
         Set<InetAddress> endpoints = Sets.newHashSet(remote);
-        RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, false, false, "Standard1");
+        RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
+                                                  "Keyspace1", RepairParallelism.SEQUENTIAL,
+                                                  endpoints, false, false,
+                                                  PreviewKind.NONE, "Standard1");
 
         // perform convict
         session.convict(remote, Double.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
index 5f13e3d..f433f2e 100644
--- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamPlan;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -64,8 +65,8 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
         UUID sessionID = registerSession(cfs, true, true);
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
         RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
-        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges());
-        StreamingRepairTask task = new StreamingRepairTask(desc, request, desc.sessionId);
+        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+        StreamingRepairTask task = new StreamingRepairTask(desc, request, desc.sessionId, PreviewKind.NONE);
 
         StreamPlan plan = task.createStreamPlan(request.src, request.dst);
         Assert.assertFalse(plan.getFlushBeforeTransfer());
@@ -77,8 +78,8 @@ public class StreamingRepairTaskTest extends AbstractRepairTest
         UUID sessionID = registerSession(cfs, false, true);
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID);
         RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges());
-        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges());
-        StreamingRepairTask task = new StreamingRepairTask(desc, request, null);
+        SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE);
+        StreamingRepairTask task = new StreamingRepairTask(desc, request, null, PreviewKind.NONE);
 
         StreamPlan plan = task.createStreamPlan(request.src, request.dst);
         Assert.assertTrue(plan.getFlushBeforeTransfer());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/ValidatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
index bbcdbb8..b45edc1 100644
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.ValidationComplete;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -98,7 +99,7 @@ public class ValidatorTest
 
         ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
 
-        Validator validator = new Validator(desc, remote, 0);
+        Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE);
         MerkleTrees tree = new MerkleTrees(partitioner);
         tree.addMerkleTrees((int) Math.pow(2, 15), validator.desc.ranges);
         validator.prepare(cfs, tree);
@@ -135,7 +136,7 @@ public class ValidatorTest
 
         InetAddress remote = InetAddress.getByName("127.0.0.2");
 
-        Validator validator = new Validator(desc, remote, 0);
+        Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE);
         validator.fail();
 
         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);
@@ -190,10 +191,10 @@ public class ValidatorTest
 
         ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(),
                                                                  Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
-                                                                 false);
+                                                                 false, PreviewKind.NONE);
 
         final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink();
-        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false);
+        Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false, PreviewKind.NONE);
         CompactionManager.instance.submitValidation(cfs, validator);
 
         MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
index 26168ad..367fea9 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.UUIDGen;
@@ -85,7 +86,8 @@ public abstract class AbstractConsistentSessionTest
                                                                  Sets.newHashSet(RANGE1, RANGE2, RANGE3),
                                                                  true,
                                                                  System.currentTimeMillis(),
-                                                                 true);
+                                                                 true,
+                                                                 PreviewKind.NONE);
         return sessionId;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
index 2cb6326..2126835 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -146,7 +147,7 @@ public class PendingAntiCompactionTest
 
         // create a session so the anti compaction can fine it
         UUID sessionID = UUIDGen.getTimeUUID();
-        ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true);
+        ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE);
 
         PendingAntiCompaction pac;
         ExecutorService executor = Executors.newSingleThreadExecutor();


Mime
View raw message