cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject git commit: Fix AbstractRowResolver and RowDigestResolver for speculative retries
Date Wed, 23 Oct 2013 15:45:59 GMT
Updated Branches:
  refs/heads/cassandra-2.0 cc01b3165 -> 01370bb6c


Fix AbstractRowResolver and RowDigestResolver for speculative retries

patch by Jonathan Ellis; reviewed by Aleksey Yeschenko for
CASSANDRA-6194


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01370bb6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01370bb6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01370bb6

Branch: refs/heads/cassandra-2.0
Commit: 01370bb6c7d7fa816c7162a379bee4dc710a5556
Parents: cc01b31
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Wed Oct 23 23:40:57 2013 +0800
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Oct 23 23:43:36 2013 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../cassandra/service/AbstractReadExecutor.java |  9 +++---
 .../cassandra/service/AbstractRowResolver.java  | 20 +-----------
 .../cassandra/service/IResponseResolver.java    |  2 +-
 .../service/RangeSliceResponseResolver.java     |  3 +-
 .../apache/cassandra/service/ReadCallback.java  | 11 ++-----
 .../cassandra/service/RowDigestResolver.java    | 32 ++++++--------------
 .../apache/cassandra/service/StorageProxy.java  |  7 ++++-
 8 files changed, 27 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b55e0cf..32c74aa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,7 +6,7 @@
  * Add configurable metrics reporting (CASSANDRA-4430)
  * drop queries exceeding a configurable number of tombstones (CASSANDRA-6117)
  * Track and persist sstable read activity (CASSANDRA-5515)
- * Fixes for speculative retry (CASSANDRA-5932)
+ * Fixes for speculative retry (CASSANDRA-5932, CASSANDRA-6194)
  * Improve memory usage of metadata min/max column names (CASSANDRA-6077)
  * Fix thrift validation refusing row markers on CQL3 tables (CASSANDRA-6081)
  * Fix insertion of collections with CAS (CASSANDRA-6069)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index c56975c..3f57e73 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -122,7 +123,7 @@ public abstract class AbstractReadExecutor
      *
      * @return target replicas + the extra replica, *IF* we speculated.
      */
-    public abstract Iterable<InetAddress> getContactedReplicas();
+    public abstract Collection<InetAddress> getContactedReplicas();
 
     /**
      * send the initial set of requests
@@ -216,7 +217,7 @@ public abstract class AbstractReadExecutor
             // no-op
         }
 
-        public Iterable<InetAddress> getContactedReplicas()
+        public Collection<InetAddress> getContactedReplicas()
         {
             return targetReplicas;
         }
@@ -286,7 +287,7 @@ public abstract class AbstractReadExecutor
             }
         }
 
-        public Iterable<InetAddress> getContactedReplicas()
+        public Collection<InetAddress> getContactedReplicas()
         {
             return speculated
                  ? targetReplicas
@@ -312,7 +313,7 @@ public abstract class AbstractReadExecutor
             // no-op
         }
 
-        public Iterable<InetAddress> getContactedReplicas()
+        public Collection<InetAddress> getContactedReplicas()
         {
             return targetReplicas;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index 2ebaaf1..47a00da 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -43,27 +43,9 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo
         this.keyspaceName = keyspaceName;
     }
 
-    public boolean preprocess(MessageIn<ReadResponse> message)
+    public void preprocess(MessageIn<ReadResponse> message)
     {
-        MessageIn<ReadResponse> toReplace = null;
-        for (MessageIn<ReadResponse> reply : replies)
-        {
-            if (reply.from.equals(message.from))
-            {
-                if (!message.payload.isDigestQuery())
-                    toReplace = reply;
-                break;
-            }
-        }
-        // replace old message
-        if (toReplace != null)
-        {
-            replies.remove(toReplace);
-            replies.add(message);
-            return false;
-        }
         replies.add(message);
-        return true;
     }
 
     public Iterable<MessageIn<ReadResponse>> getMessages()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
index 0c54690..17c8bff 100644
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/IResponseResolver.java
@@ -38,6 +38,6 @@ public interface IResponseResolver<TMessage, TResolved> {
      */
     public TResolved getData();
 
-    public boolean preprocess(MessageIn<TMessage> message);
+    public void preprocess(MessageIn<TMessage> message);
     public Iterable<MessageIn<TMessage>> getMessages();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
index 72ea69c..640681b 100644
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
@@ -93,10 +93,9 @@ public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceR
         return resolvedRows;
     }
 
-    public boolean preprocess(MessageIn message)
+    public void preprocess(MessageIn message)
     {
         responses.add(message);
-        return true;
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index b7d5380..d4cc7f5 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -67,7 +67,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
             logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor,
StringUtils.join(this.endpoints, ",")));
     }
 
-    private ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel
consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress>
endpoints)
+    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel
consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress>
endpoints)
     {
         this.command = command;
         this.keyspace = keyspace;
@@ -78,11 +78,6 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
         this.endpoints = endpoints;
     }
 
-    public ReadCallback<TMessage, TResolved> withNewResolver(IResponseResolver<TMessage,
TResolved> newResolver)
-    {
-        return new ReadCallback<TMessage, TResolved>(newResolver, consistencyLevel,
blockfor, command, keyspace, endpoints);
-    }
-
     public boolean await(long timePastStart, TimeUnit unit)
     {
         long time = unit.toNanos(timePastStart) - (System.nanoTime() - start);
@@ -111,8 +106,8 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
 
     public void response(MessageIn<TMessage> message)
     {
-        boolean hasAdded = resolver.preprocess(message);
-        int n = (waitingFor(message) && hasAdded)
+        resolver.preprocess(message);
+        int n = waitingFor(message)
               ? received.incrementAndGet()
               : received.get();
         if (n >= blockfor && resolver.isDataPresent())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
index bc4cf49..ec9f0d3 100644
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDigestResolver.java
@@ -71,37 +71,23 @@ public class RowDigestResolver extends AbstractRowResolver
         for (MessageIn<ReadResponse> message : replies)
         {
             ReadResponse response = message.payload;
+
+            ByteBuffer newDigest;
             if (response.isDigestQuery())
             {
-                if (digest == null)
-                {
-                    digest = response.digest();
-                }
-                else
-                {
-                    ByteBuffer digest2 = response.digest();
-                    if (!digest.equals(digest2))
-                        throw new DigestMismatchException(key, digest, digest2);
-                }
+                newDigest = response.digest();
             }
             else
             {
+                // note that this allows for multiple data replies, post-CASSANDRA-5932
                 data = response.row().cf;
+                newDigest = ColumnFamily.digest(data);
             }
-        }
 
-        // Compare digest (only one, since we threw earlier if there were different replies)
-        // with the data response. If there is a mismatch then throw an exception so that
read repair can happen.
-        //
-        // It's important to note that we do not consider the possibility of multiple data
responses --
-        // that can only happen when we're doing the repair post-mismatch, and will be handled
by RowDataResolver.
-        if (digest != null)
-        {
-            ByteBuffer digest2 = ColumnFamily.digest(data);
-            if (!digest.equals(digest2))
-                throw new DigestMismatchException(key, digest, digest2);
-            if (logger.isDebugEnabled())
-                logger.debug("digests verified");
+            if (digest == null)
+                digest = newDigest;
+            else if (!digest.equals(newDigest))
+                throw new DigestMismatchException(key, digest, newDigest);
         }
 
         if (logger.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01370bb6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e177eed..6dd702b 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1292,7 +1292,12 @@ public class StorageProxy implements StorageProxyMBean
 
                     // Do a full data read to resolve the correct response (and repair node
that need be)
                     RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key,
exec.command.filter(), exec.command.timestamp);
-                    ReadCallback<ReadResponse, Row> repairHandler = exec.handler.withNewResolver(resolver);
+                    ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver,
+                                                                                       ConsistencyLevel.ALL,
+                                                                                       exec.getContactedReplicas().size(),
+                                                                                       exec.command,
+                                                                                       Keyspace.open(exec.command.getKeyspace()),
+                                                                                       exec.handler.endpoints);
 
                     if (repairCommands == null)
                     {


Mime
View raw message