cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggles...@apache.org
Subject cassandra git commit: Add incremental repair support for --hosts, --force, and subrange repair
Date Tue, 12 Sep 2017 22:53:42 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk c6cd82462 -> 3cec208c4


Add incremental repair support for --hosts, --force, and subrange repair

Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13818


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cec208c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cec208c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cec208c

Branch: refs/heads/trunk
Commit: 3cec208c40b85e1be0ff8c68fc9d9017945a1ed8
Parents: c6cd824
Author: Blake Eggleston <bdeggleston@gmail.com>
Authored: Mon Aug 28 10:33:34 2017 -0700
Committer: Blake Eggleston <bdeggleston@gmail.com>
Committed: Tue Sep 12 15:51:34 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |   4 +-
 .../org/apache/cassandra/repair/RepairJob.java  |  10 +-
 .../repair/RepairMessageVerbHandler.java        |   6 +-
 .../apache/cassandra/repair/RepairRunnable.java | 161 ++++++++++++++-----
 .../apache/cassandra/repair/RepairSession.java  |  12 +-
 .../org/apache/cassandra/repair/Validator.java  |  10 +-
 .../repair/consistent/ConsistentSession.java    |   3 +-
 .../cassandra/repair/messages/RepairOption.java |  16 +-
 .../cassandra/service/ActiveRepairService.java  |  33 ++--
 ...pactionStrategyManagerPendingRepairTest.java |   2 +-
 .../cassandra/repair/AbstractRepairTest.java    |   2 +
 .../cassandra/repair/RepairRunnableTest.java    |  65 ++++++++
 .../repair/consistent/LocalSessionTest.java     |   1 -
 .../repair/messages/RepairOptionTest.java       |  13 --
 .../service/ActiveRepairServiceTest.java        |  55 +++++++
 16 files changed, 289 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1f03ec5..55bbfa8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
  * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
  * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
  * Add keyspace and table name in schema validation exception (CASSANDRA-13845)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5619da7..06fbef2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1330,7 +1330,7 @@ public class CompactionManager implements CompactionManagerMBean
             }
             else
             {
-                if (!validator.isConsistent)
+                if (!validator.isIncremental)
                 {
                     // flush first so everyone is validating data that is as similar as possible
                     StorageService.instance.forceKeyspaceFlush(cfs.keyspace.getName(), cfs.name);
@@ -1447,7 +1447,7 @@ public class CompactionManager implements CompactionManagerMBean
             predicate = prs.getPreviewPredicate();
 
         }
-        else if (validator.isConsistent)
+        else if (validator.isIncremental)
         {
             predicate = s -> validator.desc.parentSessionId.equals(s.getSSTableMetadata().pendingRepair);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 0615681..4bc3496 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -43,7 +43,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
     private final RepairJobDesc desc;
     private final RepairParallelism parallelismDegree;
     private final ListeningExecutorService taskExecutor;
-    private final boolean isConsistent;
+    private final boolean isIncremental;
     private final PreviewKind previewKind;
 
     /**
@@ -52,13 +52,13 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
      * @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public RepairJob(RepairSession session, String columnFamily, boolean isConsistent, PreviewKind previewKind)
+    public RepairJob(RepairSession session, String columnFamily, boolean isIncremental, PreviewKind previewKind)
     {
         this.session = session;
         this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRanges());
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
-        this.isConsistent = isConsistent;
+        this.isIncremental = isIncremental;
         this.previewKind = previewKind;
     }
 
@@ -81,7 +81,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
         if (parallelismDegree != RepairParallelism.PARALLEL)
         {
             ListenableFuture<List<InetAddress>> allSnapshotTasks;
-            if (isConsistent)
+            if (isIncremental)
             {
                 // consistent repair does it's own "snapshotting"
                 allSnapshotTasks = Futures.immediateFuture(allEndpoints);
@@ -135,7 +135,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                         SyncTask task;
                         if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
                         {
-                            task = new LocalSyncTask(desc, r1, r2, isConsistent ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
+                            task = new LocalSyncTask(desc, r1, r2, isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind);
                         }
                         else
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index c38d098..3c7f890 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -46,7 +46,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
 {
     private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class);
 
-    private boolean isConsistent(UUID sessionID)
+    private boolean isIncremental(UUID sessionID)
     {
         return ActiveRepairService.instance.consistent.local.isSessionInProgress(sessionID);
     }
@@ -136,7 +136,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
 
                     ActiveRepairService.instance.consistent.local.maybeSetRepairing(desc.parentSessionId);
                     Validator validator = new Validator(desc, message.from, validationRequest.nowInSec,
-                                                        isConsistent(desc.parentSessionId), previewKind(desc.parentSessionId));
+                                                        isIncremental(desc.parentSessionId), previewKind(desc.parentSessionId));
                     CompactionManager.instance.submitValidation(store, validator);
                     break;
 
@@ -144,7 +144,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                     // forwarded sync request
                     SyncRequest request = (SyncRequest) message.payload;
                     logger.debug("Syncing {}", request);
-                    StreamingRepairTask task = new StreamingRepairTask(desc, request, isConsistent(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind);
+                    StreamingRepairTask task = new StreamingRepairTask(desc, request, isIncremental(desc.parentSessionId) ? desc.parentSessionId : null, request.previewKind);
                     task.run();
                     break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index b581ebd..9e37ada 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -27,18 +27,26 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.*;
 import org.apache.commons.lang3.time.DurationFormatUtils;
+import org.junit.internal.runners.statements.Fail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Timer;
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.repair.consistent.SyncStatSummary;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.db.Keyspace;
@@ -130,6 +138,47 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         recordFailure(message, completionMessage);
     }
 
+    @VisibleForTesting
+    static class CommonRange
+    {
+        public final Set<InetAddress> endpoints;
+        public final Collection<Range<Token>> ranges;
+
+        public CommonRange(Set<InetAddress> endpoints, Collection<Range<Token>> ranges)
+        {
+            Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty());
+            Preconditions.checkArgument(ranges != null && !ranges.isEmpty());
+            this.endpoints = endpoints;
+            this.ranges = ranges;
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            CommonRange that = (CommonRange) o;
+
+            if (!endpoints.equals(that.endpoints)) return false;
+            return ranges.equals(that.ranges);
+        }
+
+        public int hashCode()
+        {
+            int result = endpoints.hashCode();
+            result = 31 * result + ranges.hashCode();
+            return result;
+        }
+
+        public String toString()
+        {
+            return "CommonRange{" +
+                   "endpoints=" + endpoints +
+                   ", ranges=" + ranges +
+                   '}';
+        }
+    }
+
     protected void runMayThrow() throws Exception
     {
         ActiveRepairService.instance.recordRepairStatus(cmd, ActiveRepairService.ParentRepairStatus.IN_PROGRESS, ImmutableList.of());
@@ -184,7 +233,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         }
 
         final Set<InetAddress> allNeighbors = new HashSet<>();
-        List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges = new ArrayList<>();
+        List<CommonRange> commonRanges = new ArrayList<>();
 
         //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent
         //calculation multiple times
@@ -235,11 +284,9 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options);
         }
 
-        long repairedAt;
         try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time())
         {
             ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
-            repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
             progress.incrementAndGet();
         }
         catch (Throwable t)
@@ -254,23 +301,22 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
         if (options.isPreview())
         {
-            previewRepair(parentSession, repairedAt, startTime, traceState, allNeighbors, commonRanges, cfnames);
+            previewRepair(parentSession, startTime, commonRanges, cfnames);
         }
         else if (options.isIncremental())
         {
-            consistentRepair(parentSession, repairedAt, startTime, traceState, allNeighbors, commonRanges, cfnames);
+            incrementalRepair(parentSession, startTime, options.isForcedRepair(), traceState, allNeighbors, commonRanges, cfnames);
         }
         else
         {
-            normalRepair(parentSession, startTime, traceState, allNeighbors, commonRanges, cfnames);
+            normalRepair(parentSession, startTime, traceState, commonRanges, cfnames);
         }
     }
 
     private void normalRepair(UUID parentSession,
                               long startTime,
                               TraceState traceState,
-                              Set<InetAddress> allNeighbors,
-                              List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges,
+                              List<CommonRange> commonRanges,
                               String... cfnames)
     {
 
@@ -295,15 +341,11 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                     logger.debug("Repair result: {}", results);
                     if (sessionResult != null)
                     {
-                        // don't promote sstables for sessions we skipped replicas for
+                        // don't record successful repair if we had to skip ranges
                         if (!sessionResult.skippedReplicas)
                         {
                             successfulRanges.addAll(sessionResult.ranges);
                         }
-                        else
-                        {
-                            logger.debug("Skipping anticompaction for {}", results);
-                        }
                     }
                     else
                     {
@@ -316,26 +358,59 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor));
     }
 
-    private void consistentRepair(UUID parentSession,
-                                  long repairedAt,
-                                  long startTime,
-                                  TraceState traceState,
-                                  Set<InetAddress> allNeighbors,
-                                  List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges,
-                                  String... cfnames)
+    /**
+     * removes dead nodes from common ranges, and exludes ranges left without any participants
+     */
+    @VisibleForTesting
+    static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddress> liveEndpoints, boolean force)
     {
-        // the local node also needs to be included in the set of
-        // participants, since coordinator sessions aren't persisted
-        Set<InetAddress> allParticipants = new HashSet<>(allNeighbors);
-        allParticipants.add(FBUtilities.getBroadcastAddress());
+        if (!force)
+        {
+            return commonRanges;
+        }
+        else
+        {
+            List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
+
+            for (CommonRange commonRange: commonRanges)
+            {
+                Set<InetAddress> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
+
+                // this node is implicitly a participant in this repair, so a single endpoint is ok here
+                if (!endpoints.isEmpty())
+                {
+                    filtered.add(new CommonRange(endpoints, commonRange.ranges));
+                }
+            }
+            Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
+            return filtered;
+        }
+    }
+
+    private void incrementalRepair(UUID parentSession,
+                                   long startTime,
+                                   boolean forceRepair,
+                                   TraceState traceState,
+                                   Set<InetAddress> allNeighbors,
+                                   List<CommonRange> commonRanges,
+                                   String... cfnames)
+    {
+        // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
+        Predicate<InetAddress> isAlive = FailureDetector.instance::isAlive;
+        Set<InetAddress> allParticipants = ImmutableSet.<InetAddress>builder()
+                                           .addAll(forceRepair ? Iterables.filter(allNeighbors, isAlive) : allNeighbors)
+                                           .add(FBUtilities.getBroadcastAddress())
+                                           .build();
+
+        List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
 
         CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants);
         ListeningExecutorService executor = createExecutor();
         AtomicBoolean hasFailure = new AtomicBoolean(false);
-        ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, commonRanges, cfnames),
+        ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
                                                                    hasFailure);
         Collection<Range<Token>> ranges = new HashSet<>();
-        for (Collection<Range<Token>> range : Iterables.transform(commonRanges, cr -> cr.right))
+        for (Collection<Range<Token>> range : Iterables.transform(allRanges, cr -> cr.ranges))
         {
             ranges.addAll(range);
         }
@@ -343,11 +418,8 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
     }
 
     private void previewRepair(UUID parentSession,
-                               long repairedAt,
                                long startTime,
-                               TraceState traceState,
-                               Set<InetAddress> allNeighbors,
-                               List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges,
+                               List<CommonRange> commonRanges,
                                String... cfnames)
     {
 
@@ -421,22 +493,27 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
     }
 
     private ListenableFuture<List<RepairSessionResult>> submitRepairSessions(UUID parentSession,
-                                                                             boolean isConsistent,
+                                                                             boolean isIncremental,
                                                                              ListeningExecutorService executor,
-                                                                             List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> commonRanges,
+                                                                             List<CommonRange> commonRanges,
                                                                              String... cfnames)
     {
         List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
-        for (Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p : commonRanges)
+
+        // we do endpoint filtering at the start of an incremental repair,
+        // so repair sessions shouldn't also be checking liveness
+        boolean force = options.isForcedRepair() && !isIncremental;
+        for (CommonRange cr : commonRanges)
         {
+            logger.info("Starting RepairSession for {}", cr);
             RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession,
-                                                                                     p.right,
+                                                                                     cr.ranges,
                                                                                      keyspace,
                                                                                      options.getParallelism(),
-                                                                                     p.left,
-                                                                                     isConsistent,
+                                                                                     cr.endpoints,
+                                                                                     isIncremental,
                                                                                      options.isPullRepair(),
-                                                                                     options.isForcedRepair(),
+                                                                                     force,
                                                                                      options.getPreviewKind(),
                                                                                      executor,
                                                                                      cfnames);
@@ -595,22 +672,22 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                ImmutableList.of(failureMessage, completionMessage));
     }
 
-    private void addRangeToNeighbors(List<Pair<Set<InetAddress>, ? extends Collection<Range<Token>>>> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
+    private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, Set<InetAddress> neighbors)
     {
         for (int i = 0; i < neighborRangeList.size(); i++)
         {
-            Pair<Set<InetAddress>, ? extends Collection<Range<Token>>> p = neighborRangeList.get(i);
+            CommonRange cr = neighborRangeList.get(i);
 
-            if (p.left.containsAll(neighbors))
+            if (cr.endpoints.containsAll(neighbors))
             {
-                p.right.add(range);
+                cr.ranges.add(range);
                 return;
             }
         }
 
         List<Range<Token>> ranges = new ArrayList<>();
         ranges.add(range);
-        neighborRangeList.add(Pair.create(neighbors, ranges));
+        neighborRangeList.add(new CommonRange(neighbors, ranges));
     }
 
     private Thread createQueryThread(final int cmd, final UUID sessionId)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index d00e1b2..5dbd050 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -97,7 +96,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     /** Range to repair */
     public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
-    public final boolean isConsistent;
+    public final boolean isIncremental;
     public final PreviewKind previewKind;
 
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
@@ -131,7 +130,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                          String keyspace,
                          RepairParallelism parallelismDegree,
                          Set<InetAddress> endpoints,
-                         boolean isConsistent,
+                         boolean isIncremental,
                          boolean pullRepair,
                          boolean force,
                          PreviewKind previewKind,
@@ -162,7 +161,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
             }
             if (!removeCandidates.isEmpty())
             {
-                // we shouldn't be promoting sstables to repaired if any replicas are excluded from the repair
+                // we shouldn't be recording a successful repair if
+                // any replicas are excluded from the repair
                 forceSkippedReplicas = true;
                 endpoints = new HashSet<>(endpoints);
                 endpoints.removeAll(removeCandidates);
@@ -170,7 +170,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         }
 
         this.endpoints = endpoints;
-        this.isConsistent = isConsistent;
+        this.isIncremental = isIncremental;
         this.previewKind = previewKind;
         this.pullRepair = pullRepair;
         this.skippedReplicas = forceSkippedReplicas;
@@ -301,7 +301,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length);
         for (String cfname : cfnames)
         {
-            RepairJob job = new RepairJob(this, cfname, isConsistent, previewKind);
+            RepairJob job = new RepairJob(this, cfname, isIncremental, previewKind);
             executor.execute(job);
             jobs.add(job);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index bdf8cca..f9556d6 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -60,7 +60,7 @@ public class Validator implements Runnable
     public final InetAddress initiator;
     public final int nowInSec;
     private final boolean evenTreeDistribution;
-    public final boolean isConsistent;
+    public final boolean isIncremental;
 
     // null when all rows with the min token have been consumed
     private long validated;
@@ -79,17 +79,17 @@ public class Validator implements Runnable
         this(desc, initiator, nowInSec, false, false, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isConsistent, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean isIncremental, PreviewKind previewKind)
     {
-        this(desc, initiator, nowInSec, false, isConsistent, previewKind);
+        this(desc, initiator, nowInSec, false, isIncremental, previewKind);
     }
 
-    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isConsistent, PreviewKind previewKind)
+    public Validator(RepairJobDesc desc, InetAddress initiator, int nowInSec, boolean evenTreeDistribution, boolean isIncremental, PreviewKind previewKind)
     {
         this.desc = desc;
         this.initiator = initiator;
         this.nowInSec = nowInSec;
-        this.isConsistent = isConsistent;
+        this.isIncremental = isIncremental;
         this.previewKind = previewKind;
         validated = 0;
         range = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index 803a1f8..c137346 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -313,7 +313,8 @@ public abstract class ConsistentSession
             Preconditions.checkArgument(coordinator != null);
             Preconditions.checkArgument(ids != null);
             Preconditions.checkArgument(!ids.isEmpty());
-            Preconditions.checkArgument(repairedAt > 0);
+            Preconditions.checkArgument(repairedAt > 0
+                                        || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE);
             Preconditions.checkArgument(ranges != null);
             Preconditions.checkArgument(!ranges.isEmpty());
             Preconditions.checkArgument(participants != null);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index a95ee19..971bf5d 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -163,10 +163,6 @@ public class RepairOption
         Set<Range<Token>> ranges = new HashSet<>();
         if (rangesStr != null)
         {
-            if (incremental)
-                logger.warn("Incremental repair can't be requested with subrange repair " +
-                            "because each subrange repair would generate an anti-compacted table. " +
-                            "The repair will occur but without anti-compaction.");
             StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
             while (tokenizer.hasMoreTokens())
             {
@@ -251,16 +247,6 @@ public class RepairOption
             }
         }
 
-        if (option.isIncremental() && !option.isPreview() && !option.isGlobal())
-        {
-            throw new IllegalArgumentException("Incremental repairs cannot be run against a subset of tokens or ranges");
-        }
-
-        if (option.isIncremental() && option.isForcedRepair())
-        {
-            throw new IllegalArgumentException("Cannot force incremental repair");
-        }
-
         return option;
     }
 
@@ -359,7 +345,7 @@ public class RepairOption
 
     public boolean isGlobal()
     {
-        return dataCenters.isEmpty() && hosts.isEmpty() && !isSubrangeRepair();
+        return dataCenters.isEmpty() && hosts.isEmpty();
     }
 
     public boolean isSubrangeRepair()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2e02f0c..ab92822 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -209,7 +209,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              String keyspace,
                                              RepairParallelism parallelismDegree,
                                              Set<InetAddress> endpoints,
-                                             boolean isConsistent,
+                                             boolean isIncremental,
                                              boolean pullRepair,
                                              boolean force,
                                              PreviewKind previewKind,
@@ -222,7 +222,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, force, previewKind, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isIncremental, pullRepair, force, previewKind, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners
@@ -372,10 +372,28 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return neighbors;
     }
 
+    /**
+     * we only want to set repairedAt for incremental repairs including all replicas for a token range. For non-global
+     * incremental repairs, forced incremental repairs, and full repairs, the UNREPAIRED_SSTABLE value will prevent
+     * sstables from being promoted to repaired or preserve the repairedAt/pendingRepair values, respectively.
+     */
+    static long getRepairedAt(RepairOption options)
+    {
+        // we only want to set repairedAt for incremental repairs including all replicas for a token range. For non-global incremental repairs, forced incremental repairs, and
+        // full repairs, the UNREPAIRED_SSTABLE value will prevent sstables from being promoted to repaired or preserve the repairedAt/pendingRepair values, respectively.
+        if (options.isIncremental() && options.isGlobal() && !options.isForcedRepair())
+        {
+            return Clock.instance.currentTimeMillis();
+        }
+        else
+        {
+            return  ActiveRepairService.UNREPAIRED_SSTABLE;
+        }
+    }
+
     public UUID prepareForRepair(UUID parentRepairSession, InetAddress coordinator, Set<InetAddress> endpoints, RepairOption options, List<ColumnFamilyStore> columnFamilyStores)
     {
-        // we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables
-        long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE;
+        long repairedAt = getRepairedAt(options);
         registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
@@ -583,13 +601,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             }
         }
 
-        public long getRepairedAt()
-        {
-            if (isGlobal)
-                return repairedAt;
-            return ActiveRepairService.UNREPAIRED_SSTABLE;
-        }
-
         public Collection<ColumnFamilyStore> getColumnFamilyStores()
         {
             return ImmutableSet.<ColumnFamilyStore>builder().addAll(columnFamilyStores.values()).build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
index af629e5..c7f1ae8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionStrategyManagerPendingRepairTest.java
@@ -248,7 +248,7 @@ public class CompactionStrategyManagerPendingRepairTest extends AbstractPendingR
         csm.getForPendingRepair(repairID).forEach(Assert::assertNull);
 
         // sstable should have pendingRepair cleared, and repairedAt set correctly
-        long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).getRepairedAt();
+        long expectedRepairedAt = ActiveRepairService.instance.getParentRepairSession(repairID).repairedAt;
         Assert.assertFalse(sstable.isPendingRepair());
         Assert.assertTrue(sstable.isRepaired());
         Assert.assertEquals(expectedRepairedAt, sstable.getSSTableMetadata().repairedAt);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
index d61d859..21c51c6 100644
--- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
+++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java
@@ -75,6 +75,8 @@ public abstract class AbstractRepairTest
     protected static final Range<Token> RANGE2 = new Range<>(t(2), t(3));
     protected static final Range<Token> RANGE3 = new Range<>(t(4), t(5));
 
+    protected static final Set<Range<Token>> ALL_RANGES = ImmutableSet.of(RANGE1, RANGE2, RANGE3);
+
     protected static UUID registerSession(ColumnFamilyStore cfs, boolean isIncremental, boolean isGlobal)
     {
         UUID sessionId = UUIDGen.getTimeUUID();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
new file mode 100644
index 0000000..db76f73
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.repair.RepairRunnable.CommonRange;
+
+import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
+
+public class RepairRunnableTest extends AbstractRepairTest
+{
+    /**
+     * For non-forced repairs, common ranges should be passed through as-is
+     */
+    @Test
+    public void filterCommonIncrementalRangesNotForced() throws Exception
+    {
+        CommonRange cr = new CommonRange(PARTICIPANTS, ALL_RANGES);
+
+        List<CommonRange> expected = Lists.newArrayList(cr);
+        List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false);
+
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void forceFilterCommonIncrementalRanges() throws Exception
+    {
+        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2));
+        CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3));
+        Set<InetAddress> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
+
+        List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
+        List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Sets.newHashSet(RANGE1, RANGE2)),
+                                                        new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Sets.newHashSet(RANGE3)));
+        List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true);
+
+        Assert.assertEquals(expected, actual);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index be048fb..6e6d222 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -223,7 +223,6 @@ public class LocalSessionTest extends AbstractRepairTest
         assertValidationFailure(b -> b.withCoordinator(null));
         assertValidationFailure(b -> b.withTableIds(null));
         assertValidationFailure(b -> b.withTableIds(new HashSet<>()));
-        assertValidationFailure(b -> b.withRepairedAt(0));
         assertValidationFailure(b -> b.withRepairedAt(-1));
         assertValidationFailure(b -> b.withRanges(null));
         assertValidationFailure(b -> b.withRanges(new HashSet<>()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 13d7575..484d7a8 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -150,19 +150,6 @@ public class RepairOptionTest
     }
 
     @Test
-    public void testNonGlobalIncrementalRepairParse() throws Exception
-    {
-        Map<String, String> options = new HashMap<>();
-        options.put(RepairOption.PARALLELISM_KEY, "parallel");
-        options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
-        options.put(RepairOption.INCREMENTAL_KEY, "true");
-        options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
-        options.put(RepairOption.HOSTS_KEY, "127.0.0.1, 127.0.0.2");
-        assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Incremental repairs cannot be run against a subset of tokens or ranges");
-
-    }
-
-    @Test
     public void testForceOption() throws Exception
     {
         RepairOption option;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cec208c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 57ffa7d..cbacaec 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -21,8 +21,11 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.*;
 
+import javax.xml.crypto.Data;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -34,17 +37,26 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static org.apache.cassandra.repair.messages.RepairOption.DATACENTERS_KEY;
+import static org.apache.cassandra.repair.messages.RepairOption.FORCE_REPAIR_KEY;
+import static org.apache.cassandra.repair.messages.RepairOption.HOSTS_KEY;
+import static org.apache.cassandra.repair.messages.RepairOption.INCREMENTAL_KEY;
+import static org.apache.cassandra.repair.messages.RepairOption.RANGES_KEY;
+import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
+import static org.apache.cassandra.service.ActiveRepairService.getRepairedAt;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -299,4 +311,47 @@ public class ActiveRepairServiceTest
             cfs.forceBlockingFlush();
         }
     }
+
+    private static RepairOption opts(String... params)
+    {
+        assert params.length % 2 == 0 : "unbalanced key value pairs";
+        Map<String, String> opt = new HashMap<>();
+        for (int i=0; i<(params.length >> 1); i++)
+        {
+            int idx = i << 1;
+            opt.put(params[idx], params[idx+1]);
+        }
+        return RepairOption.parse(opt, DatabaseDescriptor.getPartitioner());
+    }
+
+    private static String b2s(boolean b)
+    {
+        return Boolean.toString(b);
+    }
+
+    /**
+     * Tests the expected repairedAt value is returned, based on different RepairOption
+     */
+    @Test
+    public void repairedAt() throws Exception
+    {
+        // regular incremental repair
+        Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true))));
+        // subrange incremental repair
+        Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                      RANGES_KEY, "1:2")));
+
+        // hosts incremental repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                   HOSTS_KEY, "127.0.0.1")));
+        // dc incremental repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                   DATACENTERS_KEY, "DC2")));
+        // forced incremental repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                   FORCE_REPAIR_KEY, b2s(true))));
+
+        // full repair
+        Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false))));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message