Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 991AB200B2B for ; Tue, 28 Jun 2016 15:22:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 979DA160A06; Tue, 28 Jun 2016 13:22:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 63206160A56 for ; Tue, 28 Jun 2016 15:22:19 +0200 (CEST) Received: (qmail 66670 invoked by uid 500); 28 Jun 2016 13:22:17 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 61752 invoked by uid 99); 28 Jun 2016 13:21:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2016 13:21:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2DC59ED310; Tue, 28 Jun 2016 13:21:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Tue, 28 Jun 2016 13:22:05 -0000 Message-Id: <2d535be5f6634f42a8ced76a7d3466ee@git.apache.org> In-Reply-To: <71beec61030a470e9d4e22cef8edec78@git.apache.org> References: <71beec61030a470e9d4e22cef8edec78@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [15/17] cassandra git commit: Avoid stalling Paxos when the paxos state expires archived-at: Tue, 28 Jun 2016 13:22:20 -0000 Avoid stalling Paxos when the paxos state expires This commit does 2 things: - It ignores MRCs that are old enough to have expired in some nodes paxos tables - It ensures the same timestamp is used when reading the paxos state and ignoring old MRC patch by slebresne; reviewed by jasobraown for CASSANDRA-12043 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53f8f095 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53f8f095 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53f8f095 Branch: refs/heads/trunk Commit: 53f8f09575dfbdc166b257fe71a4d4a832470253 Parents: d289778 Author: Sylvain Lebresne Authored: Wed Jun 22 12:12:37 2016 +0200 Committer: Sylvain Lebresne Committed: Tue Jun 28 15:21:05 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cql3/QueryProcessor.java | 17 ++++++++++++++++- .../cql3/statements/SelectStatement.java | 6 +++++- .../org/apache/cassandra/db/SystemKeyspace.java | 12 ++++++------ .../apache/cassandra/service/StorageProxy.java | 4 +++- .../cassandra/service/paxos/PaxosState.java | 11 +++++++++-- .../cassandra/service/paxos/PrepareCallback.java | 18 +++++++++++++++++- src/java/org/apache/cassandra/utils/UUIDGen.java | 12 ++++++++++++ 8 files changed, 69 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cd13896..075d44a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 3.9 Merged from 2.1: + * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043) * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854) http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index d812af4..222204b 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -258,7 +258,7 @@ public class QueryProcessor implements QueryHandler return QueryOptions.forInternalCalls(cl, boundValues); } - private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException + public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException { ParsedStatement.Prepared prepared = internalStatements.get(query); if (prepared != null) @@ -331,6 +331,21 @@ public class QueryProcessor implements QueryHandler return null; } + /** + * A special version of executeInternal that takes the time used as "now" for the query in argument. + * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare + * cases. + */ + public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values) + { + ParsedStatement.Prepared prepared = prepareInternal(query); + assert prepared.statement instanceof SelectStatement; + SelectStatement select = (SelectStatement)prepared.statement; + ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec); + assert result instanceof ResultMessage.Rows; + return UntypedResultSet.create(((ResultMessage.Rows)result).result); + } + public static UntypedResultSet resultify(String query, RowIterator partition) { return resultify(query, PartitionIterators.singletonIterator(partition)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 5f37e5e..f2b484e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -414,7 +414,11 @@ public class SelectStatement implements CQLStatement public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { - int nowInSec = FBUtilities.nowInSeconds(); + return executeInternal(state, options, FBUtilities.nowInSeconds()); + } + + public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException + { int userLimit = getLimit(options); int userPerPartitionLimit = getPerPartitionLimit(options); ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 1203834..584279d 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -1103,10 +1103,10 @@ public final class SystemKeyspace return null; } - public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata) + public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId); + UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); @@ -1131,7 +1131,7 @@ public final class SystemKeyspace String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(String.format(req, PAXOS), UUIDGen.microsTimestamp(promise.ballot), - paxosTtl(promise.update.metadata()), + paxosTtlSec(promise.update.metadata()), promise.ballot, promise.update.partitionKey().getKey(), promise.update.metadata().cfId); @@ -1141,7 +1141,7 @@ public final class SystemKeyspace { executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS), UUIDGen.microsTimestamp(proposal.ballot), - paxosTtl(proposal.update.metadata()), + paxosTtlSec(proposal.update.metadata()), proposal.ballot, PartitionUpdate.toBytes(proposal.update, MessagingService.current_version), MessagingService.current_version, @@ -1149,7 +1149,7 @@ public final class SystemKeyspace proposal.update.metadata().cfId); } - private static int paxosTtl(CFMetaData metadata) + public static int paxosTtlSec(CFMetaData metadata) { // keep paxos state around for at least 3h return Math.max(3 * 3600, metadata.params.gcGraceSeconds); @@ -1162,7 +1162,7 @@ public final class SystemKeyspace String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?"; executeInternal(String.format(cql, PAXOS), UUIDGen.microsTimestamp(commit.ballot), - paxosTtl(commit.update.metadata()), + paxosTtlSec(commit.update.metadata()), commit.ballot, PartitionUpdate.toBytes(commit.update, MessagingService.current_version), MessagingService.current_version, http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 90c246e..c88c449 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -31,6 +31,7 @@ import javax.management.ObjectName; import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -440,7 +441,8 @@ public class StorageProxy implements StorageProxyMBean // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also // mean we lost messages), we pro-actively "repair" those nodes, and retry. - Iterable missingMRC = summary.replicasMissingMostRecentCommit(); + int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros)); + Iterable missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec); if (Iterables.size(missingMRC) > 0) { Tracing.trace("Repairing replicas that missed the most recent commit"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index 0b3af8f..e01f568 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -65,7 +65,13 @@ public class PaxosState lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata()); + // When preparing, we need to use the same time as "now" (that's the time we use to decide if something + // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up + // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no + // amount of re-submit will fix this (because the node on which the commit has expired will have a + // tombstone that hides any re-submit). See CASSANDRA-12043 for details. + int nowInSec = UUIDGen.unixTimestampInSec(toPrepare.ballot); + PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata(), nowInSec); if (toPrepare.isAfter(state.promised)) { Tracing.trace("Promising ballot {}", toPrepare.ballot); @@ -100,7 +106,8 @@ public class PaxosState lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata()); + int nowInSec = UUIDGen.unixTimestampInSec(proposal.ballot); + PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata(), nowInSec); if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) { Tracing.trace("Accepting proposal {}", proposal); http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index 9c54b01..ff81803 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,7 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.utils.UUIDGen; public class PrepareCallback extends AbstractPaxosCallback { @@ -87,8 +90,21 @@ public class PrepareCallback extends AbstractPaxosCallback latch.countDown(); } - public Iterable replicasMissingMostRecentCommit() + public Iterable replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec) { + // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see + // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes + // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a + // quorum and further commit may proceed on incomplete information). + // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't + // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as + // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set + // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be + // propagated to all nodes. + long paxosTtlSec = SystemKeyspace.paxosTtlSec(metadata); + if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec) + return Collections.emptySet(); + return Iterables.filter(commitsByReplica.keySet(), new Predicate() { public boolean apply(InetAddress inetAddress) http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/utils/UUIDGen.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java index 00efbe3..a8b3093 100644 --- a/src/java/org/apache/cassandra/utils/UUIDGen.java +++ b/src/java/org/apache/cassandra/utils/UUIDGen.java @@ -26,8 +26,11 @@ import java.util.Collection; import java.util.Random; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.primitives.Ints; /** * The goods are here: www.ietf.org/rfc/rfc4122.txt. @@ -211,6 +214,15 @@ public class UUIDGen /** * @param uuid + * @return seconds since Unix epoch + */ + public static int unixTimestampInSec(UUID uuid) + { + return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(unixTimestamp(uuid))); + } + + /** + * @param uuid * @return microseconds since Unix epoch */ public static long microsTimestamp(UUID uuid)