cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [01/17] cassandra git commit: Avoid stalling Paxos when the paxos state expires
Date Tue, 28 Jun 2016 13:21:51 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 2811f15bc -> 017ec3e99
  refs/heads/cassandra-2.2 3b448b337 -> 6555a87bd
  refs/heads/cassandra-3.0 cb4540ec2 -> 70059726f
  refs/heads/cassandra-3.9 8b60fb779 -> 53f8f0957
  refs/heads/trunk 9ff299e40 -> ad5b30eff


Avoid stalling Paxos when the paxos state expires

This commit does 2 things:
- It ignores MRCs that are old enough to have expired in some nodes paxos tables
- It ensures the same timestamp is used when reading the paxos state and ignoring old MRC

patch by slebresne; reviewed by jasobraown for CASSANDRA-12043


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/017ec3e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/017ec3e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/017ec3e9

Branch: refs/heads/cassandra-2.1
Commit: 017ec3e99e704db5e1a36ad153af08d6e7eca523
Parents: 2811f15
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Jun 22 12:12:37 2016 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Tue Jun 28 15:16:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cql3/QueryProcessor.java   | 28 +++++++++++++++++++-
 .../cql3/statements/SelectStatement.java        |  6 ++++-
 .../org/apache/cassandra/db/SystemKeyspace.java |  6 ++---
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../cassandra/service/paxos/PaxosState.java     | 11 ++++++--
 .../service/paxos/PrepareCallback.java          | 18 ++++++++++++-
 7 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5741241..feeaded 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
  * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
  * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
  * Prevent select statements with clustering key > 64k (CASSANDRA-11882)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index d4ca76f..4340d42 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -296,7 +296,7 @@ public class QueryProcessor implements QueryHandler
         return QueryOptions.forInternalCalls(boundValues);
     }
 
-    private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+    public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
     {
         ParsedStatement.Prepared prepared = internalStatements.get(query);
         if (prepared != null)
@@ -374,6 +374,32 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
+    /**
+     * A special version of executeInternal that takes the time used as "now" for the query
in argument.
+     * Note that this only make sense for Selects so this only accept SELECT statements and
is only useful in rare
+     * cases.
+     */
+    public static UntypedResultSet executeInternalWithNow(long now, String query, Object...
values)
+    {
+        try
+        {
+            ParsedStatement.Prepared prepared = prepareInternal(query);
+            assert prepared.statement instanceof SelectStatement;
+            SelectStatement select = (SelectStatement)prepared.statement;
+            ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared,
values), now);
+            assert result instanceof ResultMessage.Rows;
+            return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+        }
+        catch (RequestExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new RuntimeException("Error validating query " + query, e);
+        }
+    }
+
     public static UntypedResultSet resultify(String query, Row row)
     {
         return resultify(query, Collections.singletonList(row));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1e142e0..6351bb5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -312,8 +312,12 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws
RequestExecutionException, RequestValidationException
     {
+        return executeInternal(state, options, System.currentTimeMillis());
+    }
+
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long
now) throws RequestExecutionException, RequestValidationException
+    {
         int limit = getLimit(options);
-        long now = System.currentTimeMillis();
         Pageable command = getPageableCommand(options, limit, now);
 
         int pageSize = options.getPageSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1f66b1b..f8cf1ab 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -896,10 +896,10 @@ public class SystemKeyspace
         return new Row(key, cf);
     }
 
-    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
+    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = executeInternal(String.format(req, PAXOS_CF), key, metadata.cfId);
+        UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req,
PAXOS_CF), key, metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
@@ -939,7 +939,7 @@ public class SystemKeyspace
                         proposal.update.id());
     }
 
-    private static int paxosTtl(CFMetaData metadata)
+    public static int paxosTtl(CFMetaData metadata)
     {
         // keep paxos state around for at least 3h
         return Math.max(3 * 3600, metadata.getGcGraceSeconds());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index af0693b..cddc7e9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -421,7 +421,7 @@ public class StorageProxy implements StorageProxyMBean
             // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
             // Since we waited for quorum nodes, if some of them haven't seen the last commit
(which may just be a timing issue, but may also
             // mean we lost messages), we pro-actively "repair" those nodes, and retry.
-            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit();
+            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata,
ballotMicros);
             if (Iterables.size(missingMRC) > 0)
             {
                 Tracing.trace("Repairing replicas that missed the most recent commit");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 01e03f4..fde881b 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -63,7 +63,13 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
+                // When preparing, we need to use the same time as "now" (that's the time
we use to decide if something
+                // is expired or not) accross nodes otherwise we may have a window where
a Most Recent Commit shows up
+                // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()),
and no
+                // amount of re-submit will fix this (because the node on which the commit
has expired will have a
+                // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
+                long now = UUIDGen.unixTimestamp(toPrepare.ballot);
+                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(),
now);
                 if (toPrepare.isAfter(state.promised))
                 {
                     Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -98,7 +104,8 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
+                long now = UUIDGen.unixTimestamp(proposal.ballot);
+                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(),
now);
                 if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
                 {
                     Tracing.trace("Accepting proposal {}", proposal);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index a446b0b..2859a69 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -33,7 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 {
@@ -86,8 +89,21 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         latch.countDown();
     }
 
-    public Iterable<InetAddress> replicasMissingMostRecentCommit()
+    public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata,
long now)
     {
+        // In general, we need every replicas that have answered to the prepare (a quorum)
to agree on the MRC (see
+        // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure
at least a quorum of nodes
+        // have learn a commit before commit a new one otherwise that previous commit is
not guaranteed to have reach a
+        // quorum and further commit may proceed on incomplete information).
+        // However, if that commit is too hold, it may have been expired from some of the
replicas paxos table (we don't
+        // keep the paxos state forever or that could grow unchecked), and we could end up
in some infinite loop as
+        // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old,
i.e. older than the TTL we set
+        // on paxos tables. For such old commit, we rely on hints and repair to ensure the
commit has indeed be
+        // propagated to all nodes.
+        long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000;
+        if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros < now)
+            return Collections.emptySet();
+
         return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
         {
             public boolean apply(InetAddress inetAddress)


Mime
View raw message