cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] cassandra git commit: Clean up ARE sendXRequests() methods and ReadCommand#setDigestQuery()
Date Tue, 20 Jan 2015 21:59:48 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk a792a7bae -> 9c6cf3cdf


Clean up ARE sendXRequests() methods and ReadCommand#setDigestQuery()

patch by Aleksey Yeschenko; reveiwed by Jonathan Ellis for
CASSANDRA-8647


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2d140ff
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2d140ff
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2d140ff

Branch: refs/heads/trunk
Commit: e2d140fff752e757f40c812dcd1b7bed3ea5fed2
Parents: 2445d4d
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Wed Jan 21 00:52:53 2015 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Jan 21 00:54:34 2015 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ReadCommand.java    |  3 +-
 .../db/RetriedSliceFromReadCommand.java         |  4 +-
 .../cassandra/db/SliceByNamesReadCommand.java   |  8 +--
 .../cassandra/db/SliceFromReadCommand.java      |  8 +--
 .../cassandra/service/AbstractReadExecutor.java | 56 +++++++++-----------
 5 files changed, 31 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 299693e..dedff6f 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -89,9 +89,10 @@ public abstract class ReadCommand implements IReadCommand, Pageable
         return isDigestQuery;
     }
 
-    public void setDigestQuery(boolean isDigestQuery)
+    public ReadCommand setIsDigestQuery(boolean isDigestQuery)
     {
         this.isDigestQuery = isDigestQuery;
+        return this;
     }
 
     public String getColumnFamilyName()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index fe54917..41f5a50 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -38,9 +38,7 @@ public class RetriedSliceFromReadCommand extends SliceFromReadCommand
     @Override
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp,
filter, originalCount);
-        readCommand.setDigestQuery(isDigestQuery());
-        return readCommand;
+        return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount).setIsDigestQuery(isDigestQuery());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index b1829f3..22f795e 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -44,9 +44,7 @@ public class SliceByNamesReadCommand extends ReadCommand
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand= new SliceByNamesReadCommand(ksName, key, cfName, timestamp,
filter);
-        readCommand.setDigestQuery(isDigestQuery());
-        return readCommand;
+        return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
     }
 
     public Row getRow(Keyspace keyspace)
@@ -97,9 +95,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
         long timestamp = in.readLong();
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
         NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in,
version);
-        ReadCommand command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp,
filter);
-        command.setDigestQuery(isDigest);
-        return command;
+        return new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
     }
 
     public long serializedSize(ReadCommand cmd, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index f06b9dc..2259f22 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -48,9 +48,7 @@ public class SliceFromReadCommand extends ReadCommand
 
     public ReadCommand copy()
     {
-        ReadCommand readCommand = new SliceFromReadCommand(ksName, key, cfName, timestamp,
filter);
-        readCommand.setDigestQuery(isDigestQuery());
-        return readCommand;
+        return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
     }
 
     public Row getRow(Keyspace keyspace)
@@ -151,9 +149,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
         long timestamp = in.readLong();
         CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
         SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in,
version);
-        ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp,
filter);
-        command.setDigestQuery(isDigest);
-        return command;
+        return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
     }
 
     public long serializedSize(ReadCommand cmd, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2c3261f..0546e27 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -77,43 +77,38 @@ public abstract class AbstractReadExecutor
 
     protected void makeDataRequests(Iterable<InetAddress> endpoints)
     {
-        boolean readLocal = false;
-        for (InetAddress endpoint : endpoints)
-        {
-            if (isLocalRequest(endpoint))
-            {
-                readLocal = true;
-            }
-            else
-            {
-                logger.trace("reading data from {}", endpoint);
-                MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
-            }
-        }
-        if (readLocal)
-        {
-            logger.trace("reading data locally");
-            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command,
handler));
-        }
+        makeRequests(command, endpoints);
     }
 
     protected void makeDigestRequests(Iterable<InetAddress> endpoints)
     {
-        ReadCommand digestCommand = command.copy();
-        digestCommand.setDigestQuery(true);
-        MessageOut<?> message = digestCommand.createMessage();
+        makeRequests(command.copy().setIsDigestQuery(true), endpoints);
+    }
+
+    private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
+    {
+        MessageOut<ReadCommand> message = null;
+        boolean hasLocalEndpoint = false;
+
         for (InetAddress endpoint : endpoints)
         {
             if (isLocalRequest(endpoint))
             {
-                logger.trace("reading digest locally");
-                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand,
handler));
-            }
-            else
-            {
-                logger.trace("reading digest from {}", endpoint);
-                MessagingService.instance().sendRR(message, endpoint, handler);
+                hasLocalEndpoint = true;
+                continue;
             }
+
+            logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data",
endpoint);
+            if (message == null)
+                message = readCommand.createMessage();
+            MessagingService.instance().sendRR(message, endpoint, handler);
+        }
+
+        // We delay the local (potentially blocking) read till the end to avoid stalling
remote requests.
+        if (hasLocalEndpoint)
+        {
+            logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
+            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command,
handler));
         }
     }
 
@@ -278,10 +273,7 @@ public abstract class AbstractReadExecutor
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
                 if (resolver.getData() != null)
-                {
-                    retryCommand = command.copy();
-                    retryCommand.setDigestQuery(true);
-                }
+                    retryCommand = command.copy().setIsDigestQuery(true);
 
                 InetAddress extraReplica = Iterables.getLast(targetReplicas);
                 logger.trace("speculating read retry on {}", extraReplica);


Mime
View raw message