cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Always send Paxos commit to all replicas
Date Mon, 25 Aug 2014 13:56:58 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 618441b97 -> 5d4740c58


Always send Paxos commit to all replicas

patch by kohlisankalp; reviewed by slebresne for CASSANDRA-7479


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5d4740c5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5d4740c5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5d4740c5

Branch: refs/heads/cassandra-2.0
Commit: 5d4740c5841c9c3a8d6c24578c1c6fb512524321
Parents: 618441b
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Mon Aug 25 15:55:43 2014 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon Aug 25 15:55:43 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../apache/cassandra/service/StorageProxy.java  | 24 +++++++++++++-------
 2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d4740c5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9aeeb29..4d5d851 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+2.0.11:
+ * Always send Paxos commit to all replicas (CASSANDRA-7479)
+
 2.0.10
  * Don't send schema change responses and events for no-op DDL
    statements (CASSANDRA-7600)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5d4740c5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index c606d75..904d602 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -248,10 +248,7 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("CAS precondition is met; proposing client-requested updates for
{}", ballot);
             if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
             {
-                if (consistencyForCommit == ConsistencyLevel.ANY)
-                    sendCommit(proposal, liveEndpoints);
-                else
-                    commitPaxos(proposal, consistencyForCommit);
+                commitPaxos(proposal, consistencyForCommit);
                 Tracing.trace("CAS successful");
                 return null;
             }
@@ -413,23 +410,34 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws
WriteTimeoutException
     {
+        boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
 
         Token tk = StorageService.getPartitioner().getToken(proposal.key);
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(),
tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
keyspace.getName());
 
-        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
+        AbstractWriteResponseHandler responseHandler = null;
+        if (shouldBlock)
+        {
+            AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
+            responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints,
consistencyLevel, null, WriteType.SIMPLE);
+        }
 
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT,
proposal, Commit.serializer);
         for (InetAddress destination : Iterables.concat(naturalEndpoints, pendingEndpoints))
         {
             if (FailureDetector.instance.isAlive(destination))
-                MessagingService.instance().sendRR(message, destination, responseHandler);
+            {
+                if (shouldBlock)
+                    MessagingService.instance().sendRR(message, destination, responseHandler);
+                else
+                    MessagingService.instance().sendOneWay(message, destination);
+            }
         }
 
-        responseHandler.get();
+        if (shouldBlock)
+            responseHandler.get();
     }
 
     /**


Mime
View raw message