Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 45384 invoked from network); 3 Dec 2010 18:53:46 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 3 Dec 2010 18:53:46 -0000 Received: (qmail 51951 invoked by uid 500); 3 Dec 2010 18:53:46 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 51937 invoked by uid 500); 3 Dec 2010 18:53:46 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 51927 invoked by uid 99); 3 Dec 2010 18:53:45 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Dec 2010 18:53:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Dec 2010 18:53:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 690B123889B1; Fri, 3 Dec 2010 18:52:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1041951 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/thrift/ test/unit/org/apache/cassandra/service/ Date: Fri, 03 Dec 2010 18:52:07 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101203185207.690B123889B1@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jbellis Date: Fri Dec 3 18:52:06 2010 New Revision: 1041951 URL: http://svn.apache.org/viewvc?rev=1041951&view=rev Log: reads at ConsistencyLevel > 1 throwUnavailableException immediately if insufficient live nodes exist patch by jbellis and tjake for CASSANDRA-1803 Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Fri Dec 3 18:52:06 2010 @@ -27,6 +27,8 @@ dev (CASSANDRA-1804) * cli support index type enum names (CASSANDRA-1810) * improved validation of column_metadata (CASSANDRA-1813) + * reads at ConsistencyLevel > 1 throw UnavailableException + immediately if insufficient live nodes exist (CASSANDRA-1803) 0.7.0-rc1 Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Dec 3 18:52:06 2010 @@ -24,6 +24,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; @@ -122,7 +123,7 @@ public class HintedHandOffManager rm.add(cf); Message message = rm.makeRowMutationMessage(); IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint); - MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler); + MessagingService.instance.sendRR(message, Arrays.asList(endpoint), responseHandler); try { responseHandler.get(); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Fri Dec 3 18:52:06 2010 @@ -29,9 +29,7 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ServerSocketChannel; import java.security.MessageDigest; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -226,7 +224,7 @@ public class MessagingService implements * @return an reference to an IAsyncResult which can be queried for the * response */ - public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb) + public String sendRR(Message message, Collection to, IAsyncCallback cb) { String messageId = message.getMessageId(); addCallback(cb, messageId); @@ -273,18 +271,16 @@ public class MessagingService implements * suggest that a timeout occured to the invoker of the send(). * @return an reference to message id used to match with the result */ - public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb) + public String sendRR(Message[] messages, List to, IAsyncCallback cb) { - if ( messages.length != to.length ) - { + if (messages.length != to.size()) throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same."); - } String groupId = GuidGenerator.guid(); addCallback(cb, groupId); for ( int i = 0; i < messages.length; ++i ) { messages[i].setMessageId(groupId); - sendOneWay(messages[i], to[i]); + sendOneWay(messages[i], to.get(i)); } return groupId; } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java Fri Dec 3 18:52:06 2010 @@ -156,7 +156,6 @@ class ConsistencyChecker implements Runn static class DataRepairHandler implements IAsyncCallback { - private final Collection responses_ = new LinkedBlockingQueue(); private final ReadResponseResolver readResponseResolver_; private final int majority_; @@ -167,7 +166,6 @@ class ConsistencyChecker implements Runn // wrap localRow in a response Message so it doesn't need to be special-cased in the resolver ReadResponse readResponse = new ReadResponse(localRow); Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY); - responses_.add(fakeMessage); readResponseResolver_.injectPreProcessed(fakeMessage, readResponse); } @@ -176,15 +174,14 @@ class ConsistencyChecker implements Runn { if (logger_.isDebugEnabled()) logger_.debug("Received response in DataRepairHandler : " + message.toString()); - responses_.add(message); readResponseResolver_.preprocess(message); - if (responses_.size() == majority_) + if (readResponseResolver_.getMessageCount() == majority_) { Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws IOException, DigestMismatchException { - readResponseResolver_.resolve(responses_); + readResponseResolver_.resolve(); } }; // give remaining replicas until timeout to reply and get added to responses_ Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java Fri Dec 3 18:52:06 2010 @@ -21,6 +21,9 @@ package org.apache.cassandra.service; */ +import java.net.InetAddress; +import java.util.Collection; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.cassandra.config.DatabaseDescriptor; @@ -29,6 +32,7 @@ import org.apache.cassandra.locator.IEnd import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.Message; import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.FBUtilities; /** @@ -49,14 +53,14 @@ public class DatacenterQuorumResponseHan @Override public void response(Message message) { - responses.add(message); // we'll go ahead and resolve a reply from anyone, even if it's not from this dc + resolver.preprocess(message); int n; n = localdc.equals(snitch.getDatacenter(message.getFrom())) ? localResponses.decrementAndGet() : localResponses.get(); - if (n == 0 && responseResolver.isDataPresent(responses)) + if (n == 0 && resolver.isDataPresent()) { condition.signal(); } @@ -68,4 +72,18 @@ public class DatacenterQuorumResponseHan NetworkTopologyStrategy stategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy(); return (stategy.getReplicationFactor(localdc) / 2) + 1; } + + @Override + public void assureSufficientLiveNodes(Collection endpoints) throws UnavailableException + { + int localEndpoints = 0; + for (InetAddress endpoint : endpoints) + { + if (localdc.equals(snitch.getDatacenter(endpoint))) + localEndpoints++; + } + + if(localEndpoints < blockfor) + throw new UnavailableException(); + } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IResponseResolver.java Fri Dec 3 18:52:06 2010 @@ -34,8 +34,10 @@ public interface IResponseResolver { * repairs . Hence you need to derive a response resolver based on your * needs from this interface. */ - public T resolve(Collection responses) throws DigestMismatchException, IOException; - public boolean isDataPresent(Collection responses); + public T resolve() throws DigestMismatchException, IOException; + public boolean isDataPresent(); public void preprocess(Message message); + public Iterable getMessages(); + public int getMessageCount(); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Fri Dec 3 18:52:06 2010 @@ -18,11 +18,11 @@ package org.apache.cassandra.service; +import java.io.IOException; +import java.net.InetAddress; import java.util.Collection; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.io.IOException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Table; @@ -30,8 +30,8 @@ import org.apache.cassandra.net.IAsyncCa import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.UnavailableException; import org.apache.cassandra.utils.SimpleCondition; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,19 +39,20 @@ public class QuorumResponseHandler im { protected static final Logger logger = LoggerFactory.getLogger( QuorumResponseHandler.class ); protected final SimpleCondition condition = new SimpleCondition(); - protected final Collection responses = new LinkedBlockingQueue();; - protected IResponseResolver responseResolver; + protected final IResponseResolver resolver; private final long startTime; - protected int blockfor; + protected final int blockfor; /** * Constructor when response count has to be calculated and blocked for. */ - public QuorumResponseHandler(IResponseResolver responseResolver, ConsistencyLevel consistencyLevel, String table) + public QuorumResponseHandler(IResponseResolver resolver, ConsistencyLevel consistencyLevel, String table) { this.blockfor = determineBlockFor(consistencyLevel, table); - this.responseResolver = responseResolver; + this.resolver = resolver; this.startTime = System.currentTimeMillis(); + + logger.debug("QuorumResponseHandler blocking for {} responses", blockfor); } public T get() throws TimeoutException, DigestMismatchException, IOException @@ -72,35 +73,31 @@ public class QuorumResponseHandler im if (!success) { StringBuilder sb = new StringBuilder(""); - for (Message message : responses) + for (Message message : resolver.getMessages()) { sb.append(message.getFrom()); } - throw new TimeoutException("Operation timed out - received only " + responses.size() + " responses from " + sb.toString() + " ."); + throw new TimeoutException("Operation timed out - received only " + resolver.getMessageCount() + " responses from " + sb.toString() + " ."); } } finally { - for (Message response : responses) + for (Message response : resolver.getMessages()) { MessagingService.removeRegisteredCallback(response.getMessageId()); } } - return responseResolver.resolve(responses); + return resolver.resolve(); } public void response(Message message) { - responses.add(message); - responseResolver.preprocess(message); - if (responses.size() < blockfor) { + resolver.preprocess(message); + if (resolver.getMessageCount() < blockfor) return; - } - if (responseResolver.isDataPresent(responses)) - { + if (resolver.isDataPresent()) condition.signal(); - } } public int determineBlockFor(ConsistencyLevel consistencyLevel, String table) @@ -115,7 +112,13 @@ public class QuorumResponseHandler im case ALL: return Table.open(table).getReplicationStrategy().getReplicationFactor(); default: - throw new UnsupportedOperationException("invalid consistency level: " + table.toString()); + throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel); } } + + public void assureSufficientLiveNodes(Collection endpoints) throws UnavailableException + { + if (endpoints.size() < blockfor) + throw new UnavailableException(); + } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Fri Dec 3 18:52:06 2010 @@ -21,6 +21,7 @@ package org.apache.cassandra.service; import java.io.IOException; import java.net.InetAddress; import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ public class RangeSliceResponseResolver private static final Logger logger_ = LoggerFactory.getLogger(RangeSliceResponseResolver.class); private final String table; private final List sources; + protected final Collection responses = new LinkedBlockingQueue();; public RangeSliceResponseResolver(String table, List sources) { @@ -53,7 +55,7 @@ public class RangeSliceResponseResolver this.table = table; } - public List resolve(Collection responses) throws DigestMismatchException, IOException + public List resolve() throws DigestMismatchException, IOException { CollatingIterator collator = new CollatingIterator(new Comparator>() { @@ -110,11 +112,12 @@ public class RangeSliceResponseResolver public void preprocess(Message message) { + responses.add(message); } - public boolean isDataPresent(Collection responses) + public boolean isDataPresent() { - return responses.size() >= sources.size(); + return !responses.isEmpty(); } private static class RowIterator extends AbstractIterator> @@ -134,4 +137,14 @@ public class RangeSliceResponseResolver return iter.hasNext() ? new Pair(iter.next(), source) : endOfData(); } } + + public Iterable getMessages() + { + return responses; + } + + public int getMessageCount() + { + return responses.size(); + } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java Fri Dec 3 18:52:06 2010 @@ -58,14 +58,14 @@ public class ReadResponseResolver implem * repair request should be scheduled. * */ - public Row resolve(Collection responses) throws DigestMismatchException, IOException + public Row resolve() throws DigestMismatchException, IOException { if (logger_.isDebugEnabled()) - logger_.debug("resolving " + responses.size() + " responses"); + logger_.debug("resolving " + results.size() + " responses"); long startTime = System.currentTimeMillis(); - List versions = new ArrayList(responses.size()); - List endpoints = new ArrayList(responses.size()); + List versions = new ArrayList(); + List endpoints = new ArrayList(); DecoratedKey key = null; ByteBuffer digest = FBUtilities.EMPTY_BYTE_BUFFER; boolean isDigestQuery = false; @@ -76,11 +76,10 @@ public class ReadResponseResolver implem * query exists then we need to compare the digest with * the digest of the data that is received. */ - for (Message message : responses) - { - ReadResponse result = results.get(message); - if (result == null) - continue; // arrived after quorum already achieved + for (Map.Entry entry : results.entrySet()) + { + ReadResponse result = entry.getValue(); + Message message = entry.getKey(); if (result.isDigestQuery()) { digest = result.digest(); @@ -187,6 +186,8 @@ public class ReadResponseResolver implem try { ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn)); + if (logger_.isDebugEnabled()) + logger_.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data"); results.put(message, result); } catch (IOException e) @@ -201,16 +202,23 @@ public class ReadResponseResolver implem results.put(message, result); } - public boolean isDataPresent(Collection responses) + public boolean isDataPresent() { - for (Message message : responses) + for (ReadResponse result : results.values()) { - ReadResponse result = results.get(message); - if (result == null) - continue; // arrived concurrently if (!result.isDigestQuery()) return true; } return false; } + + public Iterable getMessages() + { + return results.keySet(); + } + + public int getMessageCount() + { + return results.size(); + } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Fri Dec 3 18:52:06 2010 @@ -314,7 +314,7 @@ public class StorageProxy implements Sto private static List strongRead(List commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException { List> quorumResponseHandlers = new ArrayList>(); - List commandEndpoints = new ArrayList(); + List> commandEndpoints = new ArrayList>(); List rows = new ArrayList(); // send out read requests @@ -327,25 +327,25 @@ public class StorageProxy implements Sto Message messageDigestOnly = readMessageDigestOnly.makeReadMessage(); InetAddress dataPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key); - List endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key); + List endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key); - InetAddress[] endpoints = new InetAddress[endpointList.size()]; - Message messages[] = new Message[endpointList.size()]; + AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy(); + QuorumResponseHandler handler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), consistency_level); + handler.assureSufficientLiveNodes(endpoints); + + Message messages[] = new Message[endpoints.size()]; // data-request message is sent to dataPoint, the node that will actually get // the data for us. The other replicas are only sent a digest query. int n = 0; - for (InetAddress endpoint : endpointList) + for (InetAddress endpoint : endpoints) { Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly; - endpoints[n] = endpoint; messages[n++] = m; if (logger.isDebugEnabled()) logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint); } - AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy(); - QuorumResponseHandler quorumResponseHandler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), consistency_level); - MessagingService.instance.sendRR(messages, endpoints, quorumResponseHandler); - quorumResponseHandlers.add(quorumResponseHandler); + MessagingService.instance.sendRR(messages, endpoints, handler); + quorumResponseHandlers.add(handler); commandEndpoints.add(endpoints); } @@ -369,14 +369,14 @@ public class StorageProxy implements Sto catch (DigestMismatchException ex) { AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy(); - QuorumResponseHandler qrhRepair = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), ConsistencyLevel.QUORUM); + QuorumResponseHandler handler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), ConsistencyLevel.QUORUM); if (logger.isDebugEnabled()) logger.debug("Digest mismatch:", ex); Message messageRepair = command.makeReadMessage(); - MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(i), qrhRepair); + MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(i), handler); if (repairResponseHandlers == null) repairResponseHandlers = new ArrayList>(); - repairResponseHandlers.add(qrhRepair); + repairResponseHandlers.add(handler); } } @@ -498,7 +498,7 @@ public class StorageProxy implements Sto final Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY); final CountDownLatch latch = new CountDownLatch(liveHosts.size()); // an empty message acts as a request to the SchemaCheckVerbHandler. - MessagingService.instance.sendRR(msg, liveHosts.toArray(new InetAddress[]{}), new IAsyncCallback() + MessagingService.instance.sendRR(msg, liveHosts, new IAsyncCallback() { public void response(Message msg) { @@ -775,7 +775,7 @@ public class StorageProxy implements Sto logger.debug("Starting to send truncate messages to hosts {}", allEndpoints); Truncation truncation = new Truncation(keyspace, cfname); Message message = truncation.makeTruncationMessage(); - MessagingService.instance.sendRR(message, allEndpoints.toArray(new InetAddress[]{}), responseHandler); + MessagingService.instance.sendRR(message, allEndpoints, responseHandler); // Wait for all logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor); Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1041951&r1=1041950&r2=1041951&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/thrift/CassandraServer.java Fri Dec 3 18:52:06 2010 @@ -138,6 +138,7 @@ public class CassandraServer implements } catch (TimeoutException e) { + logger.debug("... timed out"); throw new TimedOutException(); } catch (IOException e) @@ -442,11 +443,12 @@ public class CassandraServer implements try { - StorageProxy.mutate(mutations, consistency_level); + StorageProxy.mutate(mutations, consistency_level); } catch (TimeoutException e) { - throw new TimedOutException(); + logger.debug("... timed out"); + throw new TimedOutException(); } } finally @@ -512,6 +514,7 @@ public class CassandraServer implements } catch (TimeoutException e) { + logger.debug("... timed out"); throw new TimedOutException(); } catch (IOException e) @@ -556,6 +559,7 @@ public class CassandraServer implements } catch (TimeoutException e) { + logger.debug("... timed out"); throw new TimedOutException(); } return thriftifyKeySlices(rows, column_parent, column_predicate); Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1041951&view=auto ============================================================================== --- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (added) +++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Fri Dec 3 18:52:06 2010 @@ -0,0 +1,145 @@ +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + +import com.google.common.collect.HashMultimap; +import org.junit.Test; + +import org.apache.cassandra.CleanupHelper; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.SimpleSnitch; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.UnavailableException; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class ConsistencyLevelTest extends CleanupHelper +{ + @Test + public void testReadWriteConsistencyChecks() throws Exception + { + StorageService ss = StorageService.instance; + final int RING_SIZE = 3; + + TokenMetadata tmd = ss.getTokenMetadata(); + tmd.clearUnsafe(); + IPartitioner partitioner = new RandomPartitioner(); + + ss.setPartitionerUnsafe(partitioner); + + ArrayList endpointTokens = new ArrayList(); + ArrayList keyTokens = new ArrayList(); + List hosts = new ArrayList(); + + Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE); + + HashMultimap hintedNodes = HashMultimap.create(); + + + AbstractReplicationStrategy strategy; + + for (String table : DatabaseDescriptor.getNonSystemTables()) + { + strategy = getStrategy(table, tmd); + StorageService.calculatePendingRanges(strategy, table); + int replicationFactor = strategy.getReplicationFactor(); + if (replicationFactor < 2) + continue; + + for (ConsistencyLevel c : ConsistencyLevel.values()) + { + + if (c == ConsistencyLevel.EACH_QUORUM || c == ConsistencyLevel.LOCAL_QUORUM) + continue; + + for (int i = 0; i < replicationFactor; i++) + { + hintedNodes.clear(); + + for (int j = 0; j < i; j++) + { + hintedNodes.put(hosts.get(j), hosts.get(j)); + } + + IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c); + + QuorumResponseHandler readHandler = strategy.getQuorumResponseHandler(new ReadResponseResolver(table), c); + + boolean isWriteUnavailable = false; + boolean isReadUnavailable = false; + try + { + writeHandler.assureSufficientLiveNodes(); + } + catch (UnavailableException e) + { + isWriteUnavailable = true; + } + + try + { + readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet()); + } + catch (UnavailableException e) + { + isReadUnavailable = true; + } + + //these should always match (in this kind of test) + assertTrue(isWriteUnavailable == isReadUnavailable); + + switch (c) + { + case ALL: + if (isWriteUnavailable) + assertTrue(hintedNodes.size() < replicationFactor); + else + assertTrue(hintedNodes.size() >= replicationFactor); + + break; + case ONE: + case ANY: + if (isWriteUnavailable) + assertTrue(hintedNodes.size() == 0); + else + assertTrue(hintedNodes.size() > 0); + break; + case QUORUM: + if (isWriteUnavailable) + assertTrue(hintedNodes.size() < (replicationFactor / 2 + 1)); + else + assertTrue(hintedNodes.size() >= (replicationFactor / 2 + 1)); + break; + default: + fail("Unhandled CL: " + c); + + } + } + } + return; + } + + fail("Test requires at least one table with RF > 1"); + } + + private AbstractReplicationStrategy getStrategy(String table, TokenMetadata tmd) throws ConfigurationException + { + return AbstractReplicationStrategy.createReplicationStrategy(table, + "org.apache.cassandra.locator.SimpleStrategy", + tmd, + new SimpleSnitch(), + null); + } + +}