From commits-return-207589-archive-asf-public=cust-asf.ponee.io@cassandra.apache.org Wed Mar 14 23:31:15 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id DF495180654 for ; Wed, 14 Mar 2018 23:31:14 +0100 (CET) Received: (qmail 31795 invoked by uid 500); 14 Mar 2018 22:31:13 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 31784 invoked by uid 99); 14 Mar 2018 22:31:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Mar 2018 22:31:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B3DB2F6600; Wed, 14 Mar 2018 22:31:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bdeggleston@apache.org To: commits@cassandra.apache.org Message-Id: <3f6631350bd5440bbfb844ea48a303be@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Forced incremental repairs should promote sstables if they can Date: Wed, 14 Mar 2018 22:31:13 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/trunk de12f29dc -> d4dfbb5c6 Forced incremental repairs should promote sstables if they can Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-14294 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4dfbb5c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4dfbb5c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4dfbb5c Branch: refs/heads/trunk Commit: d4dfbb5c678415e63bb7cf7dfd78518dfa6ea7b9 Parents: de12f29 Author: Blake Eggleston Authored: Wed Mar 7 09:35:02 2018 -0800 Committer: Blake Eggleston Committed: Wed Mar 14 15:28:36 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/RepairRunnable.java | 20 ++++++++++++++------ .../repair/consistent/CoordinatorSessions.java | 9 +++++++-- .../cassandra/service/ActiveRepairService.java | 17 ++++++++++------- .../consistent/CoordinatorSessionsTest.java | 12 ++++++------ .../service/ActiveRepairServiceTest.java | 14 ++++++++------ 6 files changed, 46 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c7517d7..f2b9f7c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294) * Use Murmur3 for validation compactions (CASSANDRA-14002) * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285) * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 89177ee..4097715 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -234,7 +234,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti traceState = null; } - final Set allNeighbors = new HashSet<>(); + Set allNeighbors = new HashSet<>(); List commonRanges = new ArrayList<>(); //pre-calculate output of getLocalRanges and pass it to getNeighbors to increase performance and prevent @@ -286,9 +286,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options); } + boolean force = options.isForcedRepair(); + + if (force && options.isIncremental()) + { + Set actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive)); + force = !allNeighbors.equals(actualNeighbors); + allNeighbors = actualNeighbors; + } + try (Timer.Context ctx = Keyspace.open(keyspace).metric.repairPrepareTime.time()) { - ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, columnFamilyStores); + ActiveRepairService.instance.prepareForRepair(parentSession, FBUtilities.getBroadcastAddressAndPort(), allNeighbors, options, force, columnFamilyStores); progress.incrementAndGet(); } catch (Throwable t) @@ -307,7 +316,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti } else if (options.isIncremental()) { - incrementalRepair(parentSession, startTime, options.isForcedRepair(), traceState, allNeighbors, commonRanges, cfnames); + incrementalRepair(parentSession, startTime, force, traceState, allNeighbors, commonRanges, cfnames); } else { @@ -398,15 +407,14 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti String... cfnames) { // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted - Predicate isAlive = FailureDetector.instance::isAlive; Set allParticipants = ImmutableSet.builder() - .addAll(forceRepair ? Iterables.filter(allNeighbors, isAlive) : allNeighbors) + .addAll(allNeighbors) .add(FBUtilities.getBroadcastAddressAndPort()) .build(); List allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair); - CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants); + CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair); ListeningExecutorService executor = createExecutor(); AtomicBoolean hasFailure = new AtomicBoolean(false); ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames), http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java index bb84d0a..b87a2c0 100644 --- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSessions.java @@ -43,10 +43,15 @@ public class CoordinatorSessions return new CoordinatorSession(builder); } - public synchronized CoordinatorSession registerSession(UUID sessionId, Set participants) + public synchronized CoordinatorSession registerSession(UUID sessionId, Set participants, boolean isForced) { - Preconditions.checkArgument(!sessions.containsKey(sessionId), "A coordinator already exists for session %s", sessionId); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionId); + + Preconditions.checkArgument(!sessions.containsKey(sessionId), + "A coordinator already exists for session %s", sessionId); + Preconditions.checkArgument(!isForced || prs.repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE, + "cannot promote data for forced incremental repairs"); + CoordinatorSession.Builder builder = CoordinatorSession.builder(); builder.withState(ConsistentSession.State.PREPARING); builder.withSessionID(sessionId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index c600789..950966f 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -379,11 +379,12 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai * incremental repairs, forced incremental repairs, and full repairs, the UNREPAIRED_SSTABLE value will prevent * sstables from being promoted to repaired or preserve the repairedAt/pendingRepair values, respectively. */ - static long getRepairedAt(RepairOption options) + static long getRepairedAt(RepairOption options, boolean force) { - // we only want to set repairedAt for incremental repairs including all replicas for a token range. For non-global incremental repairs, forced incremental repairs, and - // full repairs, the UNREPAIRED_SSTABLE value will prevent sstables from being promoted to repaired or preserve the repairedAt/pendingRepair values, respectively. - if (options.isIncremental() && options.isGlobal() && !options.isForcedRepair()) + // we only want to set repairedAt for incremental repairs including all replicas for a token range. For non-global incremental repairs, full repairs, the UNREPAIRED_SSTABLE value will prevent + // sstables from being promoted to repaired or preserve the repairedAt/pendingRepair values, respectively. For forced repairs, repairedAt time is only set to UNREPAIRED_SSTABLE if we actually + // end up skipping replicas + if (options.isIncremental() && options.isGlobal() && ! force) { return Clock.instance.currentTimeMillis(); } @@ -393,9 +394,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } } - public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator, Set endpoints, RepairOption options, List columnFamilyStores) + public UUID prepareForRepair(UUID parentRepairSession, InetAddressAndPort coordinator, Set endpoints, RepairOption options, boolean isForcedRepair, List columnFamilyStores) { - long repairedAt = getRepairedAt(options); + long repairedAt = getRepairedAt(options, isForcedRepair); registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); @@ -434,7 +435,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai } else { - if (options.isForcedRepair()) + // we pre-filter the endpoints we want to repair for forced incremental repairs. So if any of the + // remaining ones go down, we still want to fail so we don't create repair sessions that can't complete + if (isForcedRepair && !options.isIncremental()) { prepareLatch.countDown(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java index 9bf4270..bc90e9b 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java @@ -93,9 +93,9 @@ public class CoordinatorSessionsTest extends AbstractRepairTest return (InstrumentedCoordinatorSession) super.getSession(sessionId); } - public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set peers) + public InstrumentedCoordinatorSession registerSession(UUID sessionId, Set peers, boolean isForced) { - return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers); + return (InstrumentedCoordinatorSession) super.registerSession(sessionId, peers, isForced); } } @@ -118,7 +118,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest { CoordinatorSessions sessions = new CoordinatorSessions(); UUID sessionID = registerSession(); - CoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + CoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false); Assert.assertEquals(ConsistentSession.State.PREPARING, session.getState()); Assert.assertEquals(sessionID, session.sessionID); @@ -139,7 +139,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); UUID sessionID = registerSession(); - InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false); Assert.assertEquals(0, session.prepareResponseCalls); sessions.handlePrepareResponse(new PrepareConsistentResponse(sessionID, PARTICIPANT1, true)); @@ -164,7 +164,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); UUID sessionID = registerSession(); - InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false); Assert.assertEquals(0, session.finalizePromiseCalls); sessions.handleFinalizePromiseMessage(new FinalizePromise(sessionID, PARTICIPANT1, true)); @@ -189,7 +189,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest InstrumentedCoordinatorSessions sessions = new InstrumentedCoordinatorSessions(); UUID sessionID = registerSession(); - InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS); + InstrumentedCoordinatorSession session = sessions.registerSession(sessionID, PARTICIPANTS, false); Assert.assertEquals(0, session.failCalls); sessions.handleFailSessionMessage(new FailSession(sessionID)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4dfbb5c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 76f915e..c4b0a9c 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -336,22 +336,24 @@ public class ActiveRepairServiceTest public void repairedAt() throws Exception { // regular incremental repair - Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true)))); + Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true)), false)); // subrange incremental repair Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), - RANGES_KEY, "1:2"))); + RANGES_KEY, "1:2"), false)); // hosts incremental repair Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), - HOSTS_KEY, "127.0.0.1"))); + HOSTS_KEY, "127.0.0.1"), false)); // dc incremental repair Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), - DATACENTERS_KEY, "DC2"))); + DATACENTERS_KEY, "DC2"), false)); // forced incremental repair + Assert.assertNotEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), + FORCE_REPAIR_KEY, b2s(true)), false)); Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(true), - FORCE_REPAIR_KEY, b2s(true)))); + FORCE_REPAIR_KEY, b2s(true)), true)); // full repair - Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false)))); + Assert.assertEquals(UNREPAIRED_SSTABLE, getRepairedAt(opts(INCREMENTAL_KEY, b2s(false)), false)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org For additional commands, e-mail: commits-help@cassandra.apache.org