cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/7] git commit: Include correct consistencyLevel in LWT timeout patch by Sankalp Kohli; reviewed by jbellis for CASSANDRA-6884
Date Wed, 19 Mar 2014 06:03:26 GMT
Include correct consistencyLevel in LWT timeout
patch by Sankalp Kohli; reviewed by jbellis for CASSANDRA-6884


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5e1cbca
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5e1cbca
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5e1cbca

Branch: refs/heads/trunk
Commit: f5e1cbca871e6e4cc7007177b2b0e9f367ae60ba
Parents: 9269cb8
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Wed Mar 19 00:15:19 2014 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Wed Mar 19 00:15:19 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                         |  1 +
 .../org/apache/cassandra/service/StorageProxy.java  | 16 ++++++++--------
 .../service/paxos/AbstractPaxosCallback.java        |  6 ++++--
 .../cassandra/service/paxos/PrepareCallback.java    |  5 +++--
 .../cassandra/service/paxos/ProposeCallback.java    |  5 +++--
 5 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e1cbca/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4741475..7eebd5b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.7
+ * Include correct consistencyLevel in LWT timeout (CASSANDRA-6884)
  * Lower chances for losing new SSTables during nodetool refresh and
    ColumnFamilyStore.loadNewSSTables (CASSANDRA-6514)
  * Add support for DELETE ... IF EXISTS to CQL3 (CASSANDRA-5708)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e1cbca/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index fda9819..a6912c2 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -246,7 +246,7 @@ public class StorageProxy implements StorageProxyMBean
 
             Commit proposal = Commit.newProposal(key, ballot, updates);
             Tracing.trace("CAS precondition is met; proposing client-requested updates for
{}", ballot);
-            if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true))
+            if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
             {
                 if (consistencyForCommit == ConsistencyLevel.ANY)
                     sendCommit(proposal, liveEndpoints);
@@ -318,7 +318,7 @@ public class StorageProxy implements StorageProxyMBean
             // prepare
             Tracing.trace("Preparing {}", ballot);
             Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants);
+            summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos);
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher ballot than ours;
aborting");
@@ -336,7 +336,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 Tracing.trace("Finishing incomplete paxos round {}", inProgress);
                 Commit refreshedInProgress = Commit.newProposal(inProgress.key, ballot, inProgress.update);
-                if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants,
false))
+                if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants,
false, consistencyForPaxos))
                 {
                     commitPaxos(refreshedInProgress, ConsistencyLevel.QUORUM);
                 }
@@ -381,10 +381,10 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress>
endpoints, int requiredParticipants)
+    private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress>
endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new PrepareCallback(toPrepare.key, toPrepare.update.metadata(),
requiredParticipants);
+        PrepareCallback callback = new PrepareCallback(toPrepare.key, toPrepare.update.metadata(),
requiredParticipants, consistencyForPaxos);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE,
toPrepare, Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
@@ -392,10 +392,10 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints,
int requiredParticipants, boolean timeoutIfPartial)
+    private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints,
int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants,
!timeoutIfPartial);
+        ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants,
!timeoutIfPartial, consistencyLevel);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE,
proposal, Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
@@ -406,7 +406,7 @@ public class StorageProxy implements StorageProxyMBean
             return true;
 
         if (timeoutIfPartial && !callback.isFullyRefused())
-            throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, callback.getAcceptCount(),
requiredParticipants);
+            throw new WriteTimeoutException(WriteType.CAS, consistencyLevel, callback.getAcceptCount(),
requiredParticipants);
 
         return false;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e1cbca/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
index 8197cfd..37defde 100644
--- a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
@@ -34,10 +34,12 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
 {
     protected final CountDownLatch latch;
     protected final int targets;
+    private final ConsistencyLevel consistency;
 
-    public AbstractPaxosCallback(int targets)
+    public AbstractPaxosCallback(int targets, ConsistencyLevel consistency)
     {
         this.targets = targets;
+        this.consistency = consistency;
         latch = new CountDownLatch(targets);
     }
 
@@ -56,7 +58,7 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
         try
         {
             if (!latch.await(DatabaseDescriptor.getWriteRpcTimeout(), TimeUnit.MILLISECONDS))
-                throw new WriteTimeoutException(WriteType.CAS, ConsistencyLevel.SERIAL, getResponseCount(),
targets);
+                throw new WriteTimeoutException(WriteType.CAS, consistency, getResponseCount(),
targets);
         }
         catch (InterruptedException ex)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e1cbca/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 04a18b9..a446b0b 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,9 +46,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 
     private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress,
Commit>();
 
-    public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets)
+    public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets, ConsistencyLevel
consistency)
     {
-        super(targets);
+        super(targets, consistency);
         // need to inject the right key in the empty commit so comparing with empty commits
in the reply works as expected
         mostRecentCommit = Commit.emptyCommit(key, metadata);
         mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5e1cbca/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
index 0075840..018dab9 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,9 +50,9 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean>
     private final int requiredAccepts;
     private final boolean failFast;
 
-    public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast)
+    public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel
consistency)
     {
-        super(totalTargets);
+        super(totalTargets, consistency);
         this.requiredAccepts = requiredTargets;
         this.failFast = failFast;
     }


Mime
View raw message