cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1167300 - in /cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service: RangeSliceResponseResolver.java RowDigestResolver.java RowRepairResolver.java StorageProxy.java
Date Fri, 09 Sep 2011 17:09:37 GMT
Author: jbellis
Date: Fri Sep  9 17:09:37 2011
New Revision: 1167300

URL: http://svn.apache.org/viewvc?rev=1167300&view=rev
Log:
clean up (StorageProxy side of) read path, take 2
patch by jbellis; reviewed by slebresne for CASSANDRA-3161

Modified:
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowDigestResolver.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1167300&r1=1167299&r2=1167300&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Fri Sep  9 17:09:37 2011
@@ -45,7 +45,7 @@ import org.apache.cassandra.utils.MergeI
  */
 public class RangeSliceResponseResolver implements IResponseResolver<Iterable<Row>>
 {
-    private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
+    private static final Logger logger = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
 
     private static final Comparator<Pair<Row,InetAddress>> pairComparator = new
Comparator<Pair<Row, InetAddress>>()
     {

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowDigestResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowDigestResolver.java?rev=1167300&r1=1167299&r2=1167300&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowDigestResolver.java
(original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowDigestResolver.java
Fri Sep  9 17:09:37 2011
@@ -33,7 +33,10 @@ public class RowDigestResolver extends A
     {
         super(key, table);
     }
-    
+
+    /**
+     * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException
in the foreground
+     */
     public Row getData() throws IOException
     {
         for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
@@ -62,14 +65,10 @@ public class RowDigestResolver extends A
             logger.debug("resolving " + replies.size() + " responses");
 
         long startTime = System.currentTimeMillis();
-		ColumnFamily data = null;
 
         // validate digests against each other; throw immediately on mismatch.
-        // also, collects data results into versions/endpoints lists.
-        //
-        // results are cleared as we process them, to avoid unnecessary duplication of work
-        // when resolve() is called a second time for read repair on responses that were
not
-        // necessary to satisfy ConsistencyLevel.
+        // also extract the data reply, if any.
+        ColumnFamily data = null;
         ByteBuffer digest = null;
         for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
         {

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1167300&r1=1167299&r2=1167300&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
(original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RowRepairResolver.java
Fri Sep  9 17:09:37 2011
@@ -27,6 +27,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Iterables;
+
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -59,45 +61,41 @@ public class RowRepairResolver extends A
     {
         if (logger.isDebugEnabled())
             logger.debug("resolving " + replies.size() + " responses");
-
         long startTime = System.currentTimeMillis();
-		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
-		List<InetAddress> endpoints = new ArrayList<InetAddress>();
-
-        // case 1: validate digests against each other; throw immediately on mismatch.
-        // also, collects data results into versions/endpoints lists.
-        //
-        // results are cleared as we process them, to avoid unnecessary duplication of work
-        // when resolve() is called a second time for read repair on responses that were
not
-        // necessary to satisfy ConsistencyLevel.
-        for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
-        {
-            Message message = entry.getKey();
-            ReadResponse response = entry.getValue();
-            assert !response.isDigestQuery();
-            versions.add(response.row().cf);
-            endpoints.add(message.getFrom());
-        }
 
         ColumnFamily resolved;
-        if (versions.size() > 1)
+        if (replies.size() > 1)
         {
-            for (ColumnFamily cf : versions)
+            List<ColumnFamily> versions = new ArrayList<ColumnFamily>(replies.size());
+            List<InetAddress> endpoints = new ArrayList<InetAddress>(replies.size());
+
+            for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
             {
+                Message message = entry.getKey();
+                ReadResponse response = entry.getValue();
+                ColumnFamily cf = response.row().cf;
+                assert !response.isDigestQuery() : "Received digest response to repair read
from " + entry.getKey().getFrom();
+                versions.add(cf);
+                endpoints.add(message.getFrom());
+
+                // compute maxLiveColumns to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
                 int liveColumns = cf.getLiveColumnCount();
                 if (liveColumns > maxLiveColumns)
                     maxLiveColumns = liveColumns;
             }
+
             resolved = resolveSuperset(versions);
             if (logger.isDebugEnabled())
                 logger.debug("versions merged");
-            // resolved can be null even if versions doesn't have all nulls because of the
call to removeDeleted in resolveSuperSet
+
+            // send updates to any replica that was missing part of the full row
+            // (resolved can be null even if versions doesn't have all nulls because of the
call to removeDeleted in resolveSuperSet)
             if (resolved != null)
                 repairResults = scheduleRepairs(resolved, table, key, versions, endpoints);
         }
         else
         {
-            resolved = versions.get(0);
+            resolved = replies.values().iterator().next().row().cf;
         }
 
         if (logger.isDebugEnabled())
@@ -138,9 +136,9 @@ public class RowRepairResolver extends A
         return results;
     }
 
-    static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions)
     {
-        assert versions.size() > 0;
+        assert Iterables.size(versions) > 0;
 
         ColumnFamily resolved = null;
         for (ColumnFamily cf : versions)

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1167300&r1=1167299&r2=1167300&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Sep  9 17:09:37 2011
@@ -573,24 +573,23 @@ public class StorageProxy implements Sto
      * 4. If the digests (if any) match the data return the data
      * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel
consistency_level) throws IOException, UnavailableException, TimeoutException
+    private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel
consistency_level) throws IOException, UnavailableException, TimeoutException
     {
-        List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
-        List<Row> rows = new ArrayList<Row>();
+        List<Row> rows = new ArrayList<Row>(initialCommands.size());
         List<ReadCommand> commandsToRetry = Collections.emptyList();
-        List<ReadCommand> repairCommands = Collections.emptyList();
 
         do
         {
-            readCallbacks.clear();
-            List<ReadCommand> commandsToSend = commandsToRetry.isEmpty() ? commands
: commandsToRetry;
+            List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands
: commandsToRetry;
+            ReadCallback<Row>[] readCallbacks = new ReadCallback[commands.size()];
 
             if (!commandsToRetry.isEmpty())
                 logger.debug("Retrying {} commands", commandsToRetry.size());
 
             // send out read requests
-            for (ReadCommand command : commandsToSend)
+            for (int i = 0; i < commands.size(); i++)
             {
+                ReadCommand command = commands.get(i);
                 assert !command.isDigestQuery();
                 logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
@@ -602,7 +601,7 @@ public class StorageProxy implements Sto
                 ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level,
endpoints);
                 handler.assureSufficientLiveNodes();
                 assert !handler.endpoints.isEmpty();
-                readCallbacks.add(handler);
+                readCallbacks[i] = handler;
 
                 // The data-request message is sent to dataPoint, the node that will actually
get the data for us
                 InetAddress dataPoint = handler.endpoints.get(0);
@@ -643,15 +642,13 @@ public class StorageProxy implements Sto
                 }
             }
 
-            if (repairCommands != Collections.EMPTY_LIST)
-                repairCommands.clear();
-
             // read results and make a second pass for any digest mismatches
+            List<ReadCommand> repairCommands = null;
             List<RepairCallback> repairResponseHandlers = null;
-            for (int i = 0; i < commandsToSend.size(); i++)
+            for (int i = 0; i < commands.size(); i++)
             {
-                ReadCallback<Row> handler = readCallbacks.get(i);
-                ReadCommand command = commandsToSend.get(i);
+                ReadCallback<Row> handler = readCallbacks[i];
+                ReadCommand command = commands.get(i);
                 try
                 {
                     long startTime2 = System.currentTimeMillis();
@@ -675,17 +672,17 @@ public class StorageProxy implements Sto
                     RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
                     RepairCallback repairHandler = new RepairCallback(resolver, handler.endpoints);
 
-                    if (repairCommands == Collections.EMPTY_LIST)
+                    if (repairCommands == null)
+                    {
                         repairCommands = new ArrayList<ReadCommand>();
+                        repairResponseHandlers = new ArrayList<RepairCallback>();
+                    }
                     repairCommands.add(command);
+                    repairResponseHandlers.add(repairHandler);
 
                     MessageProducer producer = new CachingMessageProducer(command);
                     for (InetAddress endpoint : handler.endpoints)
                         MessagingService.instance().sendRR(producer, endpoint, repairHandler);
-
-                    if (repairResponseHandlers == null)
-                        repairResponseHandlers = new ArrayList<RepairCallback>();
-                    repairResponseHandlers.add(repairHandler);
                 }
             }
 



Mime
View raw message