cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1041951 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/ test/unit/org/apache/cassandra/service/
Date Fri, 03 Dec 2010 18:52:07 GMT
Author: jbellis
Date: Fri Dec  3 18:52:06 2010
New Revision: 1041951

URL: http://svn.apache.org/viewvc?rev=1041951&view=rev
Log:
reads at ConsistencyLevel > 1 throwUnavailableException immediately if insufficient live
nodes exist
patch by jbellis and tjake for CASSANDRA-1803

Added:
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Dec  3 18:52:06 2010
@@ -27,6 +27,8 @@ dev
    (CASSANDRA-1804)
  * cli support index type enum names (CASSANDRA-1810)
  * improved validation of column_metadata (CASSANDRA-1813)
+ * reads at ConsistencyLevel > 1 throw UnavailableException
+   immediately if insufficient live nodes exist (CASSANDRA-1803)
 
 
 0.7.0-rc1

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Fri Dec  3 18:52:06 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -122,7 +123,7 @@ public class HintedHandOffManager
             rm.add(cf);
             Message message = rm.makeRowMutationMessage();
             IWriteResponseHandler responseHandler =  WriteResponseHandler.create(endpoint);
-            MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
+            MessagingService.instance.sendRR(message, Arrays.asList(endpoint), responseHandler);
             try
             {
                 responseHandler.get();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Fri Dec  3 18:52:06 2010
@@ -29,9 +29,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -226,7 +224,7 @@ public class MessagingService implements
      * @return an reference to an IAsyncResult which can be queried for the
      * response
      */
-    public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb)
+    public String sendRR(Message message, Collection<InetAddress> to, IAsyncCallback
cb)
     {
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
@@ -273,18 +271,16 @@ public class MessagingService implements
      *           suggest that a timeout occured to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb)
+    public String sendRR(Message[] messages, List<InetAddress> to, IAsyncCallback cb)
     {
-        if ( messages.length != to.length )
-        {
+        if (messages.length != to.size())
             throw new IllegalArgumentException("Number of messages and the number of endpoints
need to be same.");
-        }
         String groupId = GuidGenerator.guid();
         addCallback(cb, groupId);
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
-            sendOneWay(messages[i], to[i]);
+            sendOneWay(messages[i], to.get(i));
         }
         return groupId;
     } 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Fri Dec  3 18:52:06 2010
@@ -156,7 +156,6 @@ class ConsistencyChecker implements Runn
 
 	static class DataRepairHandler implements IAsyncCallback
 	{
-		private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
 		private final ReadResponseResolver readResponseResolver_;
 		private final int majority_;
 		
@@ -167,7 +166,6 @@ class ConsistencyChecker implements Runn
             // wrap localRow in a response Message so it doesn't need to be special-cased
in the resolver
             ReadResponse readResponse = new ReadResponse(localRow);
             Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);
-            responses_.add(fakeMessage);
             readResponseResolver_.injectPreProcessed(fakeMessage, readResponse);
         }
 
@@ -176,15 +174,14 @@ class ConsistencyChecker implements Runn
 		{
 			if (logger_.isDebugEnabled())
 			  logger_.debug("Received response in DataRepairHandler : " + message.toString());
-			responses_.add(message);
             readResponseResolver_.preprocess(message);
-            if (responses_.size() == majority_)
+            if (readResponseResolver_.getMessageCount() == majority_)
             {
                 Runnable runnable = new WrappedRunnable()
                 {
                     public void runMayThrow() throws IOException, DigestMismatchException
                     {
-                        readResponseResolver_.resolve(responses_);
+                        readResponseResolver_.resolve();
                     }
                 };
                 // give remaining replicas until timeout to reply and get added to responses_

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Fri Dec  3 18:52:06 2010
@@ -21,6 +21,9 @@ package org.apache.cassandra.service;
  */
 
 
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -29,6 +32,7 @@ import org.apache.cassandra.locator.IEnd
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -49,14 +53,14 @@ public class DatacenterQuorumResponseHan
     @Override
     public void response(Message message)
     {
-        responses.add(message); // we'll go ahead and resolve a reply from anyone, even if
it's not from this dc
+        resolver.preprocess(message);
 
         int n;
         n = localdc.equals(snitch.getDatacenter(message.getFrom())) 
                 ? localResponses.decrementAndGet()
                 : localResponses.get();
 
-        if (n == 0 && responseResolver.isDataPresent(responses))
+        if (n == 0 && resolver.isDataPresent())
         {
             condition.signal();
         }
@@ -68,4 +72,18 @@ public class DatacenterQuorumResponseHan
         NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
 		return (stategy.getReplicationFactor(localdc) / 2) + 1;
 	}
+
+    @Override
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws
UnavailableException
+    {
+        int localEndpoints = 0;
+        for (InetAddress endpoint : endpoints)
+        {
+            if (localdc.equals(snitch.getDatacenter(endpoint)))
+                localEndpoints++;
+        }
+        
+        if(localEndpoints < blockfor)
+            throw new UnavailableException();
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java
Fri Dec  3 18:52:06 2010
@@ -34,8 +34,10 @@ public interface IResponseResolver<T> {
 	 * repairs . Hence you need to derive a response resolver based on your
 	 * needs from this interface.
 	 */
-	public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException;
-	public boolean isDataPresent(Collection<Message> responses);
+	public T resolve() throws DigestMismatchException, IOException;
+	public boolean isDataPresent();
 
     public void preprocess(Message message);
+    public Iterable<Message> getMessages();
+    public int getMessageCount();
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Fri Dec  3 18:52:06 2010
@@ -18,11 +18,11 @@
 
 package org.apache.cassandra.service;
 
+import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.io.IOException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
@@ -30,8 +30,8 @@ import org.apache.cassandra.net.IAsyncCa
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,19 +39,20 @@ public class QuorumResponseHandler<T> im
 {
     protected static final Logger logger = LoggerFactory.getLogger( QuorumResponseHandler.class
);
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();;
-    protected IResponseResolver<T> responseResolver;
+    protected final IResponseResolver<T> resolver;
     private final long startTime;
-    protected int blockfor;
+    protected final int blockfor;
     
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public QuorumResponseHandler(IResponseResolver<T> responseResolver, ConsistencyLevel
consistencyLevel, String table)
+    public QuorumResponseHandler(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel,
String table)
     {
         this.blockfor = determineBlockFor(consistencyLevel, table);
-        this.responseResolver = responseResolver;
+        this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
+
+        logger.debug("QuorumResponseHandler blocking for {} responses", blockfor);
     }
     
     public T get() throws TimeoutException, DigestMismatchException, IOException
@@ -72,35 +73,31 @@ public class QuorumResponseHandler<T> im
             if (!success)
             {
                 StringBuilder sb = new StringBuilder("");
-                for (Message message : responses)
+                for (Message message : resolver.getMessages())
                 {
                     sb.append(message.getFrom());
                 }
-                throw new TimeoutException("Operation timed out - received only " + responses.size()
+ " responses from " + sb.toString() + " .");
+                throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount()
+ " responses from " + sb.toString() + " .");
             }
         }
         finally
         {
-            for (Message response : responses)
+            for (Message response : resolver.getMessages())
             {
                 MessagingService.removeRegisteredCallback(response.getMessageId());
             }
         }
 
-        return responseResolver.resolve(responses);
+        return resolver.resolve();
     }
     
     public void response(Message message)
     {
-        responses.add(message);
-        responseResolver.preprocess(message);
-        if (responses.size() < blockfor) {
+        resolver.preprocess(message);
+        if (resolver.getMessageCount() < blockfor)
             return;
-        }
-        if (responseResolver.isDataPresent(responses))
-        {
+        if (resolver.isDataPresent())
             condition.signal();
-        }
     }
     
     public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
@@ -115,7 +112,13 @@ public class QuorumResponseHandler<T> im
             case ALL:
                 return Table.open(table).getReplicationStrategy().getReplicationFactor();
             default:
-                throw new UnsupportedOperationException("invalid consistency level: " + table.toString());
+                throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel);
         }
     }
+
+    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws
UnavailableException
+    {
+        if (endpoints.size() < blockfor)
+            throw new UnavailableException();
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Fri Dec  3 18:52:06 2010
@@ -21,6 +21,7 @@ package org.apache.cassandra.service;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class RangeSliceResponseResolver 
     private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class);
     private final String table;
     private final List<InetAddress> sources;
+    protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();;
 
     public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
@@ -53,7 +55,7 @@ public class RangeSliceResponseResolver 
         this.table = table;
     }
 
-    public List<Row> resolve(Collection<Message> responses) throws DigestMismatchException,
IOException
+    public List<Row> resolve() throws DigestMismatchException, IOException
     {
         CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
         {
@@ -110,11 +112,12 @@ public class RangeSliceResponseResolver 
 
     public void preprocess(Message message)
     {
+        responses.add(message);
     }
 
-    public boolean isDataPresent(Collection<Message> responses)
+    public boolean isDataPresent()
     {
-        return responses.size() >= sources.size();
+        return !responses.isEmpty();
     }
 
     private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>>
@@ -134,4 +137,14 @@ public class RangeSliceResponseResolver 
             return iter.hasNext() ? new Pair<Row, InetAddress>(iter.next(), source)
: endOfData();
         }
     }
+
+    public Iterable<Message> getMessages()
+    {
+        return responses;
+    }
+
+    public int getMessageCount()
+    {
+        return responses.size();
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Fri Dec  3 18:52:06 2010
@@ -58,14 +58,14 @@ public class ReadResponseResolver implem
       * repair request should be scheduled.
       *
       */
-	public Row resolve(Collection<Message> responses) throws DigestMismatchException,
IOException
+	public Row resolve() throws DigestMismatchException, IOException
     {
         if (logger_.isDebugEnabled())
-            logger_.debug("resolving " + responses.size() + " responses");
+            logger_.debug("resolving " + results.size() + " responses");
 
         long startTime = System.currentTimeMillis();
-		List<ColumnFamily> versions = new ArrayList<ColumnFamily>(responses.size());
-		List<InetAddress> endpoints = new ArrayList<InetAddress>(responses.size());
+		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+		List<InetAddress> endpoints = new ArrayList<InetAddress>();
 		DecoratedKey key = null;
 		ByteBuffer digest = FBUtilities.EMPTY_BYTE_BUFFER;
 		boolean isDigestQuery = false;
@@ -76,11 +76,10 @@ public class ReadResponseResolver implem
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-		for (Message message : responses)
-		{
-            ReadResponse result = results.get(message);
-            if (result == null)
-                continue; // arrived after quorum already achieved
+        for (Map.Entry<Message, ReadResponse> entry : results.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            Message message = entry.getKey();
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -187,6 +186,8 @@ public class ReadResponseResolver implem
         try
         {
             ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+            if (logger_.isDebugEnabled())
+                logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest"
: "data");
             results.put(message, result);
         }
         catch (IOException e)
@@ -201,16 +202,23 @@ public class ReadResponseResolver implem
         results.put(message, result);
     }
 
-    public boolean isDataPresent(Collection<Message> responses)
+    public boolean isDataPresent()
 	{
-        for (Message message : responses)
+        for (ReadResponse result : results.values())
         {
-            ReadResponse result = results.get(message);
-            if (result == null)
-                continue; // arrived concurrently
             if (!result.isDigestQuery())
                 return true;
         }
         return false;
     }
+
+    public Iterable<Message> getMessages()
+    {
+        return results.keySet();
+    }
+
+    public int getMessageCount()
+    {
+        return results.size();
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Dec  3 18:52:06 2010
@@ -314,7 +314,7 @@ public class StorageProxy implements Sto
     private static List<Row> strongRead(List<ReadCommand> commands, ConsistencyLevel
consistency_level) throws IOException, UnavailableException, TimeoutException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
-        List<InetAddress[]> commandEndpoints = new ArrayList<InetAddress[]>();
+        List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
 
         // send out read requests
@@ -327,25 +327,25 @@ public class StorageProxy implements Sto
             Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
             InetAddress dataPoint = StorageService.instance.findSuitableEndpoint(command.table,
command.key);
-            List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
+            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
 
-            InetAddress[] endpoints = new InetAddress[endpointList.size()];
-            Message messages[] = new Message[endpointList.size()];
+            AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
+            QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
consistency_level);
+            handler.assureSufficientLiveNodes(endpoints);
+
+            Message messages[] = new Message[endpoints.size()];
             // data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
             int n = 0;
-            for (InetAddress endpoint : endpointList)
+            for (InetAddress endpoint : endpoints)
             {
                 Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
-                endpoints[n] = endpoint;
                 messages[n++] = m;
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? "data" : "digest")
+ " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
-            AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
-            QuorumResponseHandler<Row> quorumResponseHandler = rs.getQuorumResponseHandler(new
ReadResponseResolver(command.table), consistency_level);
-            MessagingService.instance.sendRR(messages, endpoints, quorumResponseHandler);
-            quorumResponseHandlers.add(quorumResponseHandler);
+            MessagingService.instance.sendRR(messages, endpoints, handler);
+            quorumResponseHandlers.add(handler);
             commandEndpoints.add(endpoints);
         }
 
@@ -369,14 +369,14 @@ public class StorageProxy implements Sto
             catch (DigestMismatchException ex)
             {
                 AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
-                QuorumResponseHandler<Row> qrhRepair = rs.getQuorumResponseHandler(new
ReadResponseResolver(command.table), ConsistencyLevel.QUORUM);
+                QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(new
ReadResponseResolver(command.table), ConsistencyLevel.QUORUM);
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
                 Message messageRepair = command.makeReadMessage();
-                MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(i),
qrhRepair);
+                MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(i),
handler);
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
-                repairResponseHandlers.add(qrhRepair);
+                repairResponseHandlers.add(handler);
             }
         }
 
@@ -498,7 +498,7 @@ public class StorageProxy implements Sto
         final Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.SCHEMA_CHECK,
ArrayUtils.EMPTY_BYTE_ARRAY);
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
         // an empty message acts as a request to the SchemaCheckVerbHandler.
-        MessagingService.instance.sendRR(msg, liveHosts.toArray(new InetAddress[]{}), new
IAsyncCallback() 
+        MessagingService.instance.sendRR(msg, liveHosts, new IAsyncCallback()
         {
             public void response(Message msg)
             {
@@ -775,7 +775,7 @@ public class StorageProxy implements Sto
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
         Truncation truncation = new Truncation(keyspace, cfname);
         Message message = truncation.makeTruncationMessage();
-        MessagingService.instance.sendRR(message, allEndpoints.toArray(new InetAddress[]{}),
responseHandler);
+        MessagingService.instance.sendRR(message, allEndpoints, responseHandler);
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1041951&r1=1041950&r2=1041951&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java
Fri Dec  3 18:52:06 2010
@@ -138,6 +138,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e) 
         {
+            logger.debug("... timed out");
         	throw new TimedOutException();
         }
         catch (IOException e)
@@ -442,11 +443,12 @@ public class CassandraServer implements 
 
             try
             {
-              StorageProxy.mutate(mutations, consistency_level);
+                StorageProxy.mutate(mutations, consistency_level);
             }
             catch (TimeoutException e)
             {
-              throw new TimedOutException();
+                logger.debug("... timed out");
+                throw new TimedOutException();
             }
         }
         finally
@@ -512,6 +514,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e)
         {
+            logger.debug("... timed out");
         	throw new TimedOutException();
         }
         catch (IOException e)
@@ -556,6 +559,7 @@ public class CassandraServer implements 
         }
         catch (TimeoutException e)
         {
+            logger.debug("... timed out");
             throw new TimedOutException();
         }
         return thriftifyKeySlices(rows, column_parent, column_predicate);

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1041951&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
(added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
Fri Dec  3 18:52:06 2010
@@ -0,0 +1,145 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.HashMultimap;
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.SimpleSnitch;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConsistencyLevelTest extends CleanupHelper
+{
+    @Test
+    public void testReadWriteConsistencyChecks() throws Exception
+    {
+        StorageService ss = StorageService.instance;
+        final int RING_SIZE = 3;
+
+        TokenMetadata tmd = ss.getTokenMetadata();
+        tmd.clearUnsafe();
+        IPartitioner partitioner = new RandomPartitioner();
+
+        ss.setPartitionerUnsafe(partitioner);
+
+        ArrayList<Token> endpointTokens = new ArrayList<Token>();
+        ArrayList<Token> keyTokens = new ArrayList<Token>();
+        List<InetAddress> hosts = new ArrayList<InetAddress>();
+
+        Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE);
+
+        HashMultimap<InetAddress, InetAddress> hintedNodes = HashMultimap.create();
+
+
+        AbstractReplicationStrategy strategy;
+
+        for (String table : DatabaseDescriptor.getNonSystemTables())
+        {
+            strategy = getStrategy(table, tmd);
+            StorageService.calculatePendingRanges(strategy, table);
+            int replicationFactor = strategy.getReplicationFactor();
+            if (replicationFactor < 2)
+                continue;
+
+            for (ConsistencyLevel c : ConsistencyLevel.values())
+            {
+
+                if (c == ConsistencyLevel.EACH_QUORUM || c == ConsistencyLevel.LOCAL_QUORUM)
+                    continue;
+
+                for (int i = 0; i < replicationFactor; i++)
+                {
+                    hintedNodes.clear();
+
+                    for (int j = 0; j < i; j++)
+                    {
+                        hintedNodes.put(hosts.get(j), hosts.get(j));
+                    }
+
+                    IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts,
hintedNodes, c);
+
+                    QuorumResponseHandler<Row> readHandler = strategy.getQuorumResponseHandler(new
ReadResponseResolver(table), c);
+
+                    boolean isWriteUnavailable = false;
+                    boolean isReadUnavailable = false;
+                    try
+                    {
+                        writeHandler.assureSufficientLiveNodes();
+                    }
+                    catch (UnavailableException e)
+                    {
+                        isWriteUnavailable = true;
+                    }
+
+                    try
+                    {
+                        readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
+                    }
+                    catch (UnavailableException e)
+                    {
+                        isReadUnavailable = true;
+                    }
+
+                    //these should always match (in this kind of test)
+                    assertTrue(isWriteUnavailable == isReadUnavailable);
+
+                    switch (c)
+                    {
+                        case ALL:
+                            if (isWriteUnavailable)
+                                assertTrue(hintedNodes.size() < replicationFactor);
+                            else
+                                assertTrue(hintedNodes.size() >= replicationFactor);
+
+                            break;
+                        case ONE:
+                        case ANY:
+                            if (isWriteUnavailable)
+                                assertTrue(hintedNodes.size() == 0);
+                            else
+                                assertTrue(hintedNodes.size() > 0);
+                            break;
+                        case QUORUM:
+                            if (isWriteUnavailable)
+                                assertTrue(hintedNodes.size() < (replicationFactor / 2
+ 1));
+                            else
+                                assertTrue(hintedNodes.size() >= (replicationFactor /
2 + 1));
+                            break;
+                        default:
+                            fail("Unhandled CL: " + c);
+
+                    }
+                }
+            }
+            return;
+        }
+
+        fail("Test requires at least one table with RF > 1");
+    }
+
+    private AbstractReplicationStrategy getStrategy(String table, TokenMetadata tmd) throws
ConfigurationException
+    {
+        return AbstractReplicationStrategy.createReplicationStrategy(table,
+                                                                     "org.apache.cassandra.locator.SimpleStrategy",
+                                                                     tmd,
+                                                                     new SimpleSnitch(),
+                                                                     null);
+    }
+
+}



Mime
View raw message