Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 02FC2200B3C for ; Tue, 28 Jun 2016 15:22:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0189B160A6C; Tue, 28 Jun 2016 13:22:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EC3BA160A56 for ; Tue, 28 Jun 2016 15:22:04 +0200 (CEST) Received: (qmail 63712 invoked by uid 500); 28 Jun 2016 13:22:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 61708 invoked by uid 99); 28 Jun 2016 13:21:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2016 13:21:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EF63FDFB02; Tue, 28 Jun 2016 13:21:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Tue, 28 Jun 2016 13:21:53 -0000 Message-Id: <61ff4ba2b74a4faea9bfc1139f76b303@git.apache.org> In-Reply-To: <71beec61030a470e9d4e22cef8edec78@git.apache.org> References: <71beec61030a470e9d4e22cef8edec78@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/17] cassandra git commit: Avoid stalling Paxos when the paxos state expires archived-at: Tue, 28 Jun 2016 13:22:06 -0000 Avoid stalling Paxos when the paxos state expires This commit does 2 things: - It ignores MRCs that are old enough to have expired in some nodes paxos tables - It ensures the same timestamp is used when reading the paxos state and ignoring old MRC patch by slebresne; reviewed by jasobraown for CASSANDRA-12043 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/017ec3e9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/017ec3e9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/017ec3e9 Branch: refs/heads/cassandra-3.0 Commit: 017ec3e99e704db5e1a36ad153af08d6e7eca523 Parents: 2811f15 Author: Sylvain Lebresne Authored: Wed Jun 22 12:12:37 2016 +0200 Committer: Sylvain Lebresne Committed: Tue Jun 28 15:16:00 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/QueryProcessor.java | 28 +++++++++++++++++++- .../cql3/statements/SelectStatement.java | 6 ++++- .../org/apache/cassandra/db/SystemKeyspace.java | 6 ++--- .../apache/cassandra/service/StorageProxy.java | 2 +- .../cassandra/service/paxos/PaxosState.java | 11 ++++++-- .../service/paxos/PrepareCallback.java | 18 ++++++++++++- 7 files changed, 63 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5741241..feeaded 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.15 + * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043) * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854) * Don't try to get sstables for non-repairing column families (CASSANDRA-12077) * Prevent select statements with clustering key > 64k (CASSANDRA-11882) http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index d4ca76f..4340d42 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -296,7 +296,7 @@ public class QueryProcessor implements QueryHandler return QueryOptions.forInternalCalls(boundValues); } - private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException + public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException { ParsedStatement.Prepared prepared = internalStatements.get(query); if (prepared != null) @@ -374,6 +374,32 @@ public class QueryProcessor implements QueryHandler } } + /** + * A special version of executeInternal that takes the time used as "now" for the query in argument. + * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare + * cases. + */ + public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values) + { + try + { + ParsedStatement.Prepared prepared = prepareInternal(query); + assert prepared.statement instanceof SelectStatement; + SelectStatement select = (SelectStatement)prepared.statement; + ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now); + assert result instanceof ResultMessage.Rows; + return UntypedResultSet.create(((ResultMessage.Rows)result).result); + } + catch (RequestExecutionException e) + { + throw new RuntimeException(e); + } + catch (RequestValidationException e) + { + throw new RuntimeException("Error validating query " + query, e); + } + } + public static UntypedResultSet resultify(String query, Row row) { return resultify(query, Collections.singletonList(row)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 1e142e0..6351bb5 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -312,8 +312,12 @@ public class SelectStatement implements CQLStatement public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { + return executeInternal(state, options, System.currentTimeMillis()); + } + + public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException + { int limit = getLimit(options); - long now = System.currentTimeMillis(); Pageable command = getPageableCommand(options, limit, now); int pageSize = options.getPageSize(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 1f66b1b..f8cf1ab 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -896,10 +896,10 @@ public class SystemKeyspace return new Row(key, cf); } - public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata) + public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now) { String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?"; - UntypedResultSet results = executeInternal(String.format(req, PAXOS_CF), key, metadata.cfId); + UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS_CF), key, metadata.cfId); if (results.isEmpty()) return new PaxosState(key, metadata); UntypedResultSet.Row row = results.one(); @@ -939,7 +939,7 @@ public class SystemKeyspace proposal.update.id()); } - private static int paxosTtl(CFMetaData metadata) + public static int paxosTtl(CFMetaData metadata) { // keep paxos state around for at least 3h return Math.max(3 * 3600, metadata.getGcGraceSeconds()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index af0693b..cddc7e9 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -421,7 +421,7 @@ public class StorageProxy implements StorageProxyMBean // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810) // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also // mean we lost messages), we pro-actively "repair" those nodes, and retry. - Iterable missingMRC = summary.replicasMissingMostRecentCommit(); + Iterable missingMRC = summary.replicasMissingMostRecentCommit(metadata, ballotMicros); if (Iterables.size(missingMRC) > 0) { Tracing.trace("Repairing replicas that missed the most recent commit"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index 01e03f4..fde881b 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -63,7 +63,13 @@ public class PaxosState lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); + // When preparing, we need to use the same time as "now" (that's the time we use to decide if something + // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up + // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no + // amount of re-submit will fix this (because the node on which the commit has expired will have a + // tombstone that hides any re-submit). See CASSANDRA-12043 for details. + long now = UUIDGen.unixTimestamp(toPrepare.ballot); + PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now); if (toPrepare.isAfter(state.promised)) { Tracing.trace("Promising ballot {}", toPrepare.ballot); @@ -98,7 +104,8 @@ public class PaxosState lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); + long now = UUIDGen.unixTimestamp(proposal.ballot); + PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(), now); if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) { Tracing.trace("Accepting proposal {}", proposal); http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index a446b0b..2859a69 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos; import java.net.InetAddress; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -33,7 +34,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.utils.UUIDGen; public class PrepareCallback extends AbstractPaxosCallback { @@ -86,8 +89,21 @@ public class PrepareCallback extends AbstractPaxosCallback latch.countDown(); } - public Iterable replicasMissingMostRecentCommit() + public Iterable replicasMissingMostRecentCommit(CFMetaData metadata, long now) { + // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see + // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes + // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a + // quorum and further commit may proceed on incomplete information). + // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't + // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as + // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set + // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be + // propagated to all nodes. + long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000; + if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros < now) + return Collections.emptySet(); + return Iterables.filter(commitsByReplica.keySet(), new Predicate() { public boolean apply(InetAddress inetAddress)