Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 87FB22009F4 for ; Thu, 26 May 2016 08:33:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 86F7B160A2B; Thu, 26 May 2016 06:33:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 65FFA160A2C for ; Thu, 26 May 2016 08:33:54 +0200 (CEST) Received: (qmail 17857 invoked by uid 500); 26 May 2016 06:33:53 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 17698 invoked by uid 99); 26 May 2016 06:33:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 May 2016 06:33:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8B0EFDFDD0; Thu, 26 May 2016 06:33:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Thu, 26 May 2016 06:33:54 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/15] cassandra git commit: Fail parent repair session if repair coordinator dies archived-at: Thu, 26 May 2016 06:33:55 -0000 Fail parent repair session if repair coordinator dies Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-11824 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03180469 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03180469 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03180469 Branch: refs/heads/trunk Commit: 03180469d62930ef73946458719223c8b9bec245 Parents: d27f9b0 Author: Marcus Eriksson Authored: Wed May 18 10:44:49 2016 +0200 Committer: Marcus Eriksson Committed: Thu May 26 07:47:50 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../repair/RepairMessageVerbHandler.java | 5 +- .../cassandra/service/ActiveRepairService.java | 137 +++++++++++++++++-- .../cassandra/service/StorageService.java | 2 +- .../LeveledCompactionStrategyTest.java | 2 +- .../cassandra/repair/DifferencerTest.java | 3 +- .../service/ActiveRepairServiceTest.java | 3 +- 7 files changed, 139 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8dfa02a..f73db6e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824) * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840) * Do not consider local node a valid source during replace (CASSANDRA-11848) * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index fd4ac28..7debc93 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -71,8 +71,9 @@ public class RepairMessageVerbHandler implements IVerbHandler columnFamilyStores.add(columnFamilyStore); } ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession, - columnFamilyStores, - prepareMessage.ranges); + message.from, + columnFamilyStores, + prepareMessage.ranges); MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 5297ce3..f8975f9 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -35,12 +35,18 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.IFailureDetectionEventListener; +import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; @@ -72,7 +78,7 @@ import org.apache.cassandra.utils.concurrent.Refs; * The creation of a repair session is done through the submitRepairSession that * returns a future on the completion of that session. */ -public class ActiveRepairService +public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener { private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class); // singleton enforcement @@ -81,6 +87,8 @@ public class ActiveRepairService public static final long UNREPAIRED_SSTABLE = 0; private static final ThreadPoolExecutor executor; + private boolean registeredForEndpointChanges = false; + static { executor = new JMXConfigurableThreadPoolExecutor(4, @@ -244,10 +252,10 @@ public class ActiveRepairService return neighbors; } - public synchronized UUID prepareForRepair(Set endpoints, Collection> ranges, List columnFamilyStores) + public synchronized UUID prepareForRepair(InetAddress coordinator, Set endpoints, Collection> ranges, List columnFamilyStores) { UUID parentRepairSession = UUIDGen.getTimeUUID(); - registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges); + registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, ranges); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set failedNodes = Collections.synchronizedSet(new HashSet()); @@ -309,9 +317,36 @@ public class ActiveRepairService return parentRepairSession; } - public synchronized void registerParentRepairSession(UUID parentRepairSession, List columnFamilyStores, Collection> ranges) + public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List columnFamilyStores, Collection> ranges) { - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, System.currentTimeMillis())); + if (!registeredForEndpointChanges) + { + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); + registeredForEndpointChanges = true; + } + + cleanupOldParentRepairSessions(); + + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, System.currentTimeMillis())); + } + + /** + * Cleans up old failed parent repair sessions - if it is 24h old, we remove it from the map + */ + private void cleanupOldParentRepairSessions() + { + long currentTime = System.currentTimeMillis(); + + Set expired = new HashSet<>(); + for (Map.Entry entry : parentRepairSessions.entrySet()) + { + ParentRepairSession session = entry.getValue(); + if (session.failed && currentTime - session.repairedAt > TimeUnit.HOURS.toMillis(24)) + expired.add(entry.getKey()); + } + for (UUID remove : expired) + parentRepairSessions.remove(remove); } public Set currentlyRepairing(UUID cfId, UUID parentRepairSession) @@ -359,7 +394,13 @@ public class ActiveRepairService public ParentRepairSession getParentRepairSession(UUID parentSessionId) { - return parentRepairSessions.get(parentSessionId); + ParentRepairSession session = parentRepairSessions.get(parentSessionId); + // this can happen if a node thinks that the coordinator was down, but that coordinator got back before noticing + // that it was down itself. + if (session != null && session.failed) + throw new RuntimeException("Parent repair session with id = " + parentSessionId + " has failed."); + + return session; } public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId) @@ -427,17 +468,34 @@ public class ActiveRepairService public final Map columnFamilyStores = new HashMap<>(); public final Collection> ranges; public final Map> sstableMap = new HashMap<>(); + /** + * used as fail time if failed is true + */ public final long repairedAt; - - public ParentRepairSession(List columnFamilyStores, Collection> ranges, long repairedAt) + public final InetAddress coordinator; + /** + * Used to mark a repair as failed - if the coordinator thinks that the repair is still ongoing and sends a + * request, we need to fail the coordinator as well. + */ + public final boolean failed; + + public ParentRepairSession(InetAddress coordinator, List columnFamilyStores, Collection> ranges, long repairedAt, boolean failed) { + this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) { + this.columnFamilyStores.put(cfs.metadata.cfId, cfs); sstableMap.put(cfs.metadata.cfId, new HashSet()); } this.ranges = ranges; this.repairedAt = repairedAt; + this.failed = failed; + } + + public ParentRepairSession(InetAddress coordinator, List columnFamilyStores, Collection> ranges, long repairedAt) + { + this(coordinator, columnFamilyStores, ranges, repairedAt, false); } @SuppressWarnings("resource") @@ -457,6 +515,8 @@ public class ActiveRepairService private Set getActiveSSTables(UUID cfId) { + if (failed) + return Collections.emptySet(); Set repairedSSTables = sstableMap.get(cfId); Set activeSSTables = new HashSet<>(); Set activeSSTableNames = new HashSet<>(); @@ -480,6 +540,10 @@ public class ActiveRepairService } } + public ParentRepairSession asFailed() + { + return new ParentRepairSession(coordinator, Collections.emptyList(), Collections.>emptyList(), System.currentTimeMillis(), true); + } @Override public String toString() { @@ -491,4 +555,61 @@ public class ActiveRepairService '}'; } } + + /* + If the coordinator node dies we should remove the parent repair session from the other nodes. + This uses the same notifications as we get in RepairSession + */ + public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddress endpoint, EndpointState state) {} + + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void onRestart(InetAddress endpoint, EndpointState state) + { + convict(endpoint, Double.MAX_VALUE); + } + + /** + * Something has happened to a remote node - if that node is a coordinator, we mark the parent repair session id as failed. + * + * The fail marker is kept in the map for 24h to make sure that if the coordinator does not agree + * that the repair failed, we need to fail the entire repair session + * + * @param ep endpoint to be convicted + * @param phi the value of phi with with ep was convicted + */ + public void convict(InetAddress ep, double phi) + { + // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || parentRepairSessions.isEmpty()) + return; + + Set toRemove = new HashSet<>(); + + for (Map.Entry repairSessionEntry : parentRepairSessions.entrySet()) + { + if (repairSessionEntry.getValue().coordinator.equals(ep)) + { + toRemove.add(repairSessionEntry.getKey()); + } + } + + if (!toRemove.isEmpty()) + { + logger.debug("Failing {} in parent repair sessions", toRemove); + for (UUID id : toRemove) + { + ParentRepairSession failed = parentRepairSessions.get(id); + parentRepairSessions.replace(id, failed, failed.asFailed()); + } + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 507aedb..eea4556 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3013,7 +3013,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { try { - parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores); + parentSession = ActiveRepairService.instance.prepareForRepair(FBUtilities.getBroadcastAddress(), allNeighbors, ranges, columnFamilyStores); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 749056c..6ec4c7b 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -108,7 +108,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader Range range = new Range<>(Util.token(""), Util.token("")); int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range)); + ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range)); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), ksname, cfname, range); Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); CompactionManager.instance.submitValidation(cfs, validator).get(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/repair/DifferencerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java index bc0f0de..3229c58 100644 --- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java +++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.sink.IMessageSink; import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.SyncComplete; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; import static org.junit.Assert.assertEquals; @@ -109,7 +110,7 @@ public class DifferencerTest extends SchemaLoader Keyspace keyspace = Keyspace.open("Keyspace1"); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range)); + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range)); RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), "Keyspace1", "Standard1", range); http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 419ea1a..26e5126 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -37,6 +37,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; @@ -55,7 +56,7 @@ public class ActiveRepairServiceTest extends SchemaLoader Set original = store.getUnrepairedSSTables(); UUID prsId = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId); //add all sstables to parent repair session