cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggles...@apache.org
Subject cassandra git commit: Forced incremental repairs should promote sstables if they can
Date Wed, 14 Mar 2018 22:31:13 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk de12f29dc -> d4dfbb5c6


Forced incremental repairs should promote sstables if they can

Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14294


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4dfbb5c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4dfbb5c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4dfbb5c

Branch: refs/heads/trunk
Commit: d4dfbb5c678415e63bb7cf7dfd78518dfa6ea7b9
Parents: de12f29
Author: Blake Eggleston <bdeggleston@gmail.com>
Authored: Wed Mar 7 09:35:02 2018 -0800
Committer: Blake Eggleston <bdeggleston@gmail.com>
Committed: Wed Mar 14 15:28:36 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/repair/RepairRunnable.java | 20 ++++++++++++++------
 .../repair/consistent/CoordinatorSessions.java  |  9 +++++++--
 .../cassandra/service/ActiveRepairService.java  | 17 ++++++++++-------
 .../consistent/CoordinatorSessionsTest.java     | 12 ++++++------
 .../service/ActiveRepairServiceTest.java        | 14 ++++++++------
 6 files changed, 46 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c7517d7..f2b9f7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294)
  * Use Murmur3 for validation compactions (CASSANDRA-14002)
  * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285)
  * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 89177ee..4097715 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -234,7 +234,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             traceState = null;
         }
 
-        final Set<InetAddressAndPort> allNeighbors = new HashSet<>();
+        Set<InetAddressAndPort> allNeighbors = new HashSet<>();
         List<CommonRange> commonRanges = new ArrayList<>();
 
         //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase
performance and prevent
@@ -286,9 +286,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
             SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames,
options);
         }
 
+        boolean force = options.isForcedRepair();
+
+        if (force && options.isIncremental())
+        {
+            Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors,
FailureDetector.instance::isAlive));
+            force = !allNeighbors.equals(actualNeighbors);
+            allNeighbors = actualNeighbors;
+        }
+
         try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time())
         {
-            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(),
allNeighbors, options, columnFamilyStores);
+            ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(),
allNeighbors, options, force, columnFamilyStores);
             progress.incrementAndGet();
         }
         catch (Throwable t)
@@ -307,7 +316,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
         }
         else if (options.isIncremental())
         {
-            incrementalRepair(parentSession, startTime, options.isForcedRepair(), traceState,
allNeighbors, commonRanges, cfnames);
+            incrementalRepair(parentSession, startTime, force, traceState, allNeighbors,
commonRanges, cfnames);
         }
         else
         {
@@ -398,15 +407,14 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                    String... cfnames)
     {
         // the local node also needs to be included in the set of participants, since coordinator
sessions aren't persisted
-        Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
         Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
-                                           .addAll(forceRepair ? Iterables.filter(allNeighbors,
isAlive) : allNeighbors)
+                                           .addAll(allNeighbors)
                                            .add(FBUtilities.getBroadcastAddressAndPort())
                                            .build();
 
         List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants,
forceRepair);
 
-        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession,
allParticipants);
+        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession,
allParticipants, forceRepair);
         ListeningExecutorService executor = createExecutor();
         AtomicBoolean hasFailure = new AtomicBoolean(false);
         ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession,
true, executor, allRanges, cfnames),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
index bb84d0a..b87a2c0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java
@@ -43,10 +43,15 @@ public class CoordinatorSessions
         return new CoordinatorSession(builder);
     }
 
-    public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort>
participants)
+    public synchronized CoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort>
participants, boolean isForced)
     {
-        Preconditions.checkArgument(!sessions.containsKey(sessionId), "A coordinator already
exists for session %s", sessionId);
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionId);
+
+        Preconditions.checkArgument(!sessions.containsKey(sessionId),
+                                    "A coordinator already exists for session %s", sessionId);
+        Preconditions.checkArgument(!isForced || prs.repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE,
+                                    "cannot promote data for forced incremental repairs");
+
         CoordinatorSession.Builder builder = CoordinatorSession.builder();
         builder.withState(ConsistentSession.State.PREPARING);
         builder.withSessionID(sessionId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index c600789..950966f 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -379,11 +379,12 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber,
IFai
      * incremental repairs, forced incremental repairs, and full repairs, the UNREPAIRED_SSTABLE
value will prevent
      * sstables from being promoted to repaired or preserve the repairedAt/pendingRepair
values, respectively.
      */
-    static long getRepairedAt(RepairOption options)
+    static long getRepairedAt(RepairOption options, boolean force)
     {
-        // we only want to set repairedAt for incremental repairs including all replicas
for a token range. For non-global incremental repairs, forced incremental repairs, and
-        // full repairs, the UNREPAIRED_SSTABLE value will prevent sstables from being promoted
to repaired or preserve the repairedAt/pendingRepair values, respectively.
-        if (options.isIncremental() && options.isGlobal() && !options.isForcedRepair())
+        // we only want to set repairedAt for incremental repairs including all replicas
for a token range. For non-global incremental repairs, full repairs, the UNREPAIRED_SSTABLE
value will prevent
+        // sstables from being promoted to repaired or preserve the repairedAt/pendingRepair
values, respectively. For forced repairs, repairedAt time is only set to UNREPAIRED_SSTABLE
if we actually
+        // end up skipping replicas
+        if (options.isIncremental() && options.isGlobal() && ! force)
         {
             return Clock.instance.currentTimeMillis();
         }
@@ -393,9 +394,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber,
IFai
         }
     }
 
-    public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator,
Set<InetAddressAndPort> endpoints, RepairOption options, List<ColumnFamilyStore>
columnFamilyStores)
+    public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator,
Set<InetAddressAndPort> endpoints, RepairOption options, boolean isForcedRepair, List<ColumnFamilyStore>
columnFamilyStores)
     {
-        long repairedAt = getRepairedAt(options);
+        long repairedAt = getRepairedAt(options, isForcedRepair);
         registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores,
options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind());
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
@@ -434,7 +435,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber,
IFai
             }
             else
             {
-                if (options.isForcedRepair())
+                // we pre-filter the endpoints we want to repair for forced incremental repairs.
So if any of the
+                // remaining ones go down, we still want to fail so we don't create repair
sessions that can't complete
+                if (isForcedRepair && !options.isIncremental())
                 {
                     prepareLatch.countDown();
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
index 9bf4270..bc90e9b 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -93,9 +93,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
             return (InstrumentedCoordinatorSession) super.getSession(sessionId);
         }
 
-        public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort>
peers)
+        public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set<InetAddressAndPort>
peers, boolean isForced)
         {
-            return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers);
+            return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers,
isForced);
         }
     }
 
@@ -118,7 +118,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
     {
         CoordinatorSessions sessions = new CoordinatorSessions();
         UUID sessionID = registerSession();
-        CoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS);
+        CoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false);
 
         Assert.assertEquals(ConsistentSession.State.PREPARING, session.getState());
         Assert.assertEquals(sessionID, session.sessionID);
@@ -139,7 +139,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
         InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
         UUID sessionID = registerSession();
 
-        InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS);
+        InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS,
false);
         Assert.assertEquals(0, session.prepareResponseCalls);
 
         sessions.handlePrepareResponse(new PrepareConsistentResponse(sessionID, PARTICIPANT1,
true));
@@ -164,7 +164,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
         InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
         UUID sessionID = registerSession();
 
-        InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS);
+        InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS,
false);
         Assert.assertEquals(0, session.finalizePromiseCalls);
 
         sessions.handleFinalizePromiseMessage(new FinalizePromise(sessionID, PARTICIPANT1,
true));
@@ -189,7 +189,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest
         InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions();
         UUID sessionID = registerSession();
 
-        InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS);
+        InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS,
false);
         Assert.assertEquals(0, session.failCalls);
 
         sessions.handleFailSessionMessage(new FailSession(sessionID));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 76f915e..c4b0a9c 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -336,22 +336,24 @@ public class ActiveRepairServiceTest
     public void repairedAt() throws Exception
     {
         // regular incremental repair
-        Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true))));
+        Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true)),
false));
         // subrange incremental repair
         Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
-                                                                      RANGES_KEY, "1:2")));
+                                                                      RANGES_KEY, "1:2"),
false));
 
         // hosts incremental repair
         Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
-                                                                   HOSTS_KEY, "127.0.0.1")));
+                                                                   HOSTS_KEY, "127.0.0.1"),
false));
         // dc incremental repair
         Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
-                                                                   DATACENTERS_KEY, "DC2")));
+                                                                   DATACENTERS_KEY, "DC2"),
false));
         // forced incremental repair
+        Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
+                                                                      FORCE_REPAIR_KEY, b2s(true)),
false));
         Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true),
-                                                                   FORCE_REPAIR_KEY, b2s(true))));
+                                                                      FORCE_REPAIR_KEY, b2s(true)),
true));
 
         // full repair
-        Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false))));
+        Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false)),
false));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message