cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r945333 - in /cassandra/trunk: conf/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/conf/ test/unit/org/apache/cassandra/locator/
Date Mon, 17 May 2010 19:58:40 GMT
Author: jbellis
Date: Mon May 17 19:58:39 2010
New Revision: 945333

URL: http://svn.apache.org/viewvc?rev=945333&view=rev
Log:
make DCQUORUM CLs actually wait for the nodes per DC rather than attempting a purely count-based approach that didn't really work as advertised.  patch by Vijay Parthasarathy and jbellis; reviewed by Jeremy Hanna for CASSANDRA-952

Added:
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
    cassandra/trunk/test/conf/cassandra-rack.properties
      - copied, changed from r945329, cassandra/trunk/conf/cassandra-rack.properties
    cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java
Removed:
    cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
Modified:
    cassandra/trunk/conf/cassandra-rack.properties
    cassandra/trunk/conf/datacenters.properties
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/trunk/test/conf/datacenters.properties

Modified: cassandra/trunk/conf/cassandra-rack.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra-rack.properties?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-rack.properties (original)
+++ cassandra/trunk/conf/cassandra-rack.properties Mon May 17 19:58:39 2010
@@ -14,9 +14,59 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# Cassandra Node IP:Port=Data Center:Rack
-192.168.1.200\:7000=dc1:r1
-192.168.2.300\:7000=dc2:rA
+# Cassandra Node IP=Data Center:Rack
+192.168.1.200=DC1:RAC1
+192.168.2.300=DC2:RAC2
+
+10.0.0.10=DC1:RAC1
+10.0.0.11=DC1:RAC1
+10.0.0.12=DC1:RAC2
+
+10.20.114.10=DC2:RAC1
+10.20.114.11=DC2:RAC1
+
+10.21.119.13=DC3:RAC1
+10.21.119.10=DC3:RAC1
+
+10.0.0.13=DC1:RAC2
+10.21.119.14=DC3:RAC2
+10.20.114.15=DC2:RAC2
+
+# default for unknown nodes
+default=DC1:r1
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Cassandra Node IP=Data Center:Rack
+192.168.1.200=DC1:RAC1
+192.168.2.300=DC2:RAC2
+
+10.0.0.10=DC1:RAC1
+10.0.0.11=DC1:RAC1
+10.0.0.12=DC1:RAC2
+
+10.20.114.10=DC2:RAC1
+10.20.114.11=DC2:RAC1
+
+10.21.119.13=DC3:RAC1
+10.21.119.10=DC3:RAC1
+
+10.0.0.13=DC1:RAC2
+10.21.119.14=DC3:RAC2
+10.20.114.15=DC2:RAC2
 
 # default for unknown nodes
-default=dc1:r1
+default=DC1:r1
\ No newline at end of file

Modified: cassandra/trunk/conf/datacenters.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/datacenters.properties?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/conf/datacenters.properties (original)
+++ cassandra/trunk/conf/datacenters.properties Mon May 17 19:58:39 2010
@@ -14,9 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# datacenter=replication factor
 # The sum of all the datacenter replication factor values should equal
 # the replication factor of the keyspace (i.e. sum(dc_rf) = RF)
-dc1=3
-dc2=5
-dc3=1
+
+# keyspace\:datacenter=replication factor
+Keyspace1\:DC1=3
+Keyspace1\:DC2=2
+Keyspace1\:DC3=1
\ No newline at end of file

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon May 17 19:58:39 2010
@@ -126,7 +126,7 @@ public class HintedHandOffManager
                 rm.add(cf);
         }
         Message message = rm.makeRowMutationMessage();
-        WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
+        WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint);
         MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
 
         try

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractRackAwareSnitch.java Mon May 17 19:58:39 2010
@@ -37,7 +37,7 @@ public abstract class AbstractRackAwareS
      * @return string of rack
      * @throws UnknownHostException
      */
-    abstract public String getRack(InetAddress endpoint) throws UnknownHostException;
+    abstract public String getRack(InetAddress endpoint);
 
     /**
      * Return the data center for which an endpoint resides in
@@ -45,7 +45,7 @@ public abstract class AbstractRackAwareS
      * @return string of data center
      * @throws UnknownHostException
      */
-    abstract public String getDatacenter(InetAddress endpoint) throws UnknownHostException;
+    abstract public String getDatacenter(InetAddress endpoint);
 
     /**
      * Sorts the <tt>Collection</tt> of node addresses by proximity to the given address
@@ -72,35 +72,28 @@ public abstract class AbstractRackAwareS
         {
             public int compare(InetAddress a1, InetAddress a2)
             {
-                try
-                {
-                    if (address.equals(a1) && !address.equals(a2))
-                        return -1;
-                    if (address.equals(a2) && !address.equals(a1))
-                        return 1;
-
-                    String addressRack = getRack(address);
-                    String a1Rack = getRack(a1);
-                    String a2Rack = getRack(a2);
-                    if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack))
-                        return -1;
-                    if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack))
-                        return 1;
-
-                    String addressDatacenter = getDatacenter(address);
-                    String a1Datacenter = getDatacenter(a1);
-                    String a2Datacenter = getDatacenter(a2);
-                    if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter))
-                        return -1;
-                    if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter))
-                        return 1;
-
-                    return 0;
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                if (address.equals(a1) && !address.equals(a2))
+                    return -1;
+                if (address.equals(a2) && !address.equals(a1))
+                    return 1;
+
+                String addressRack = getRack(address);
+                String a1Rack = getRack(a1);
+                String a2Rack = getRack(a2);
+                if (addressRack.equals(a1Rack) && !addressRack.equals(a2Rack))
+                    return -1;
+                if (addressRack.equals(a2Rack) && !addressRack.equals(a1Rack))
+                    return 1;
+
+                String addressDatacenter = getDatacenter(address);
+                String a1Datacenter = getDatacenter(a1);
+                String a2Datacenter = getDatacenter(a2);
+                if (addressDatacenter.equals(a1Datacenter) && !addressDatacenter.equals(a2Datacenter))
+                    return -1;
+                if (addressDatacenter.equals(a2Datacenter) && !addressDatacenter.equals(a1Datacenter))
+                    return 1;
+
+                return 0;
             }
         });
         return addresses;

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon May 17 19:58:39 2010
@@ -22,16 +22,21 @@ import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.IResponseResolver;
+import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -41,7 +46,7 @@ public abstract class AbstractReplicatio
 {
     protected static final Logger logger_ = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
-    private TokenMetadata tokenMetadata_;
+    protected TokenMetadata tokenMetadata_;
     protected final IEndpointSnitch snitch_;
 
     AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
@@ -62,9 +67,12 @@ public abstract class AbstractReplicatio
         return getNaturalEndpoints(token, tokenMetadata_, table);
     }
 
-    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
+    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
+                                                                Multimap<InetAddress, InetAddress> hintedEndpoints,
+                                                                ConsistencyLevel consistencyLevel,
+                                                                String table)
     {
-        return new WriteResponseHandler(blockFor, table);
+        return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
     }
     
     /**
@@ -190,4 +198,8 @@ public abstract class AbstractReplicatio
         return getAddressRanges(temp, table).get(pendingAddress);
     }
 
+    public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver responseResolver, ConsistencyLevel consistencyLevel, String table)
+    {
+        return new QuorumResponseHandler(responseResolver, consistencyLevel, table);
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Mon May 17 19:58:39 2010
@@ -30,17 +30,20 @@ import java.net.UnknownHostException;
 import java.util.*;
 import java.util.Map.Entry;
 
+import com.google.common.collect.Multimap;
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * This Replication Strategy takes a property file that gives the intended
  * replication factor in each datacenter.  The sum total of the datacenter
  * replication factor values should be equal to the keyspace replication
  * factor.
- * <p>
+ * <p/>
  * So for example, if the keyspace replication factor is 6, the
  * datacenter replication factors could be 3, 2, and 1 - so 3 replicas in
  * one datacenter, 2 in another, and 1 in another - totalling 6.
@@ -50,91 +53,74 @@ import org.apache.cassandra.thrift.Consi
  */
 public class DatacenterShardStrategy extends AbstractReplicationStrategy
 {
-    private static Map<String, List<Token>> dcMap = new HashMap<String, List<Token>>();
-    private static Map<String, Integer> dcReplicationFactor = new HashMap<String, Integer>();
-    private static Map<String, Integer> quorumRepFactor = new HashMap<String, Integer>();
-    private static int locQFactor = 0;
-    ArrayList<Token> tokens;
-
-    private List<InetAddress> localEndpoints = new ArrayList<InetAddress>();
-    private static final String DATACENTER_PROPERTIES_FILENAME = "datacenters.properties";
-
-    private List<InetAddress> getLocalEndpoints()
-    {
-        return new ArrayList<InetAddress>(localEndpoints);
-    }
-
-    private Map<String, Integer> getQuorumRepFactor()
-    {
-        return new HashMap<String, Integer>(quorumRepFactor);
-    }
+    private static final String DATACENTER_PROPERTY_FILENAME = "datacenters.properties";
+    private Map<String, List<Token>> dcTokens;
+    private int tokensize = 0;
+    private AbstractRackAwareSnitch snitch;
+    private Map<String, Map<String, Integer>> datacenters = new HashMap<String, Map<String, Integer>>();
 
     private synchronized void loadEndpoints(TokenMetadata metadata) throws UnknownHostException
     {
-        this.tokens = new ArrayList<Token>(metadata.sortedTokens());
-        String localDC = ((AbstractRackAwareSnitch)snitch_).getDatacenter(InetAddress.getLocalHost());
-        dcMap = new HashMap<String, List<Token>>();
-        for (Token token : this.tokens)
-        {
-            InetAddress endpoint = metadata.getEndpoint(token);
-            String dataCenter = ((AbstractRackAwareSnitch)snitch_).getDatacenter(endpoint);
-            if (dataCenter.equals(localDC))
-            {
-                localEndpoints.add(endpoint);
-            }
-            List<Token> lst = dcMap.get(dataCenter);
+        String localDC = snitch.getDatacenter(DatabaseDescriptor.getListenAddress());
+        assert (localDC != null) : "Invalid configuration, Coldn't find the host: " + FBUtilities.getLocalAddress();
+        // re -init the map
+        dcTokens = new HashMap<String, List<Token>>();
+        List<Token> tokens = metadata.sortedTokens();
+        for (Token token : tokens)
+        {
+            InetAddress endPoint = metadata.getEndpoint(token);
+            String dataCenter = snitch.getDatacenter(endPoint);
+            // add tokens to dcmap.
+            List<Token> lst = dcTokens.get(dataCenter);
             if (lst == null)
             {
                 lst = new ArrayList<Token>();
             }
             lst.add(token);
-            dcMap.put(dataCenter, lst);
+            dcTokens.put(dataCenter, lst);
         }
-        for (Entry<String, List<Token>> entry : dcMap.entrySet())
+        for (Entry<String, List<Token>> entry : dcTokens.entrySet())
         {
             List<Token> valueList = entry.getValue();
             Collections.sort(valueList);
-            dcMap.put(entry.getKey(), valueList);
-        }
-        for (Entry<String, Integer> entry : dcReplicationFactor.entrySet())
-        {
-            String datacenter = entry.getKey();
-            int qFactor = (entry.getValue() / 2 + 1);
-            quorumRepFactor.put(datacenter, qFactor);
-            if (datacenter.equals(localDC))
-            {
-                locQFactor = qFactor;
-            }
+            dcTokens.put(entry.getKey(), valueList);
         }
+        tokensize = tokens.size();
     }
 
     public DatacenterShardStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch) throws ConfigurationException
     {
         super(tokenMetadata, snitch);
-        if (!(snitch instanceof AbstractRackAwareSnitch))
-        {
+        if ((!(snitch instanceof AbstractRackAwareSnitch)))
             throw new IllegalArgumentException("DatacenterShardStrategy requires a rack-aware endpointsnitch");
-        }
+        this.snitch = (AbstractRackAwareSnitch)snitch;
 
-        // load replication factors for each DC
         ClassLoader loader = PropertyFileSnitch.class.getClassLoader();
-        URL scpurl = loader.getResource(DATACENTER_PROPERTIES_FILENAME);
+        URL scpurl = loader.getResource(DATACENTER_PROPERTY_FILENAME);
         if (scpurl == null)
-            throw new ConfigurationException("unable to locate " + DATACENTER_PROPERTIES_FILENAME);
-
-        String rackPropertyFilename = scpurl.getFile();
+        {
+            throw new RuntimeException("unable to locate " + DATACENTER_PROPERTY_FILENAME);
+        }
+        String dcPropertyFile = scpurl.getFile();
         try
         {
-            Properties p = new Properties();
-            p.load(new FileReader(rackPropertyFilename));
-            for (Entry<Object, Object> entry : p.entrySet())
-            {
-                dcReplicationFactor.put((String)entry.getKey(), Integer.valueOf((String)entry.getValue()));
+            Properties props = new Properties();
+            props.load(new FileReader(dcPropertyFile));
+            for (Object key : props.keySet())
+            {
+                String[] keys = ((String)key).split(":");
+                Map<String, Integer> map = datacenters.get(keys[0]);
+                if (null == map)
+                {
+                    map = new HashMap<String, Integer>();
+                }
+                map.put(keys[1], Integer.parseInt((String)props.get(key)));
+                datacenters.put(keys[0], map);
             }
         }
         catch (IOException ioe)
         {
-            throw new ConfigurationException("Could not process " + rackPropertyFilename, ioe);
+            throw new IOError(ioe);
         }
     }
 
@@ -142,7 +128,7 @@ public class DatacenterShardStrategy ext
     {
         try
         {
-            return getNaturalEndpointsInternal(token, metadata);
+            return getNaturalEndpointsInternal(token, metadata, table);
         }
         catch (IOException e)
         {
@@ -150,37 +136,37 @@ public class DatacenterShardStrategy ext
         }
     }
 
-    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token searchToken, TokenMetadata metadata) throws IOException
+    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token searchToken, TokenMetadata metadata, String table) throws UnknownHostException
     {
         ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
 
-        if (metadata.sortedTokens().size() == 0)
+        if (metadata.sortedTokens().isEmpty())
             return endpoints;
 
-        if (null == tokens || tokens.size() != metadata.sortedTokens().size())
-        {
+        if (tokensize != metadata.sortedTokens().size())
             loadEndpoints(metadata);
-        }
 
-        for (String dc : dcMap.keySet())
+        for (String dc : dcTokens.keySet())
         {
-            int replicas_ = dcReplicationFactor.get(dc);
-            ArrayList<InetAddress> forloopReturn = new ArrayList<InetAddress>(replicas_);
-            List<Token> tokens = dcMap.get(dc);
+            int replicas = getReplicationFactor(dc, table);
+        	int dcEpCount = 0;
+            List<Token> tokens = dcTokens.get(dc);
             boolean bOtherRack = false;
             boolean doneDataCenterItr;
             // Add the node at the index by default
             Iterator<Token> iter = TokenMetadata.ringIterator(tokens, searchToken);
             InetAddress primaryHost = metadata.getEndpoint(iter.next());
-            forloopReturn.add(primaryHost);
-
-            while (forloopReturn.size() < replicas_ && iter.hasNext())
+            endpoints.add(primaryHost);
+            dcEpCount++;
+            
+            while (dcEpCount < replicas && iter.hasNext())
             {
                 Token t = iter.next();
-                InetAddress endpointOfInterest = metadata.getEndpoint(t);
-                if (forloopReturn.size() < replicas_ - 1)
+                InetAddress endPointOfInterest = metadata.getEndpoint(t);
+                if (dcEpCount < replicas - 1)
                 {
-                    forloopReturn.add(endpointOfInterest);
+                    endpoints.add(endPointOfInterest);
+                    dcEpCount++;
                     continue;
                 }
                 else
@@ -191,10 +177,10 @@ public class DatacenterShardStrategy ext
                 // Now try to find one on a different rack
                 if (!bOtherRack)
                 {
-                    AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch)snitch_;
-                    if (!snitch.getRack(primaryHost).equals(snitch.getRack(endpointOfInterest)))
+                    if (!snitch.getRack(primaryHost).equals(snitch.getRack(endPointOfInterest)))
                     {
-                        forloopReturn.add(metadata.getEndpoint(t));
+                        endpoints.add(metadata.getEndpoint(t));
+                        dcEpCount++;
                         bOtherRack = true;
                     }
                 }
@@ -210,24 +196,34 @@ public class DatacenterShardStrategy ext
             * exit. Otherwise just loop through the list and add until we
             * have N nodes.
             */
-            if (forloopReturn.size() < replicas_)
+            if (dcEpCount < replicas)
             {
                 iter = TokenMetadata.ringIterator(tokens, searchToken);
-                while (forloopReturn.size() < replicas_ && iter.hasNext())
+                while (dcEpCount < replicas && iter.hasNext())
                 {
                     Token t = iter.next();
-                    if (!forloopReturn.contains(metadata.getEndpoint(t)))
+                    if (!endpoints.contains(metadata.getEndpoint(t)))
                     {
-                        forloopReturn.add(metadata.getEndpoint(t));
+                    	endpoints.add(metadata.getEndpoint(t));
+                    	dcEpCount++;
                     }
                 }
             }
-            endpoints.addAll(forloopReturn);
         }
 
         return endpoints;
     }
 
+    public int getReplicationFactor(String dc, String table)
+    {
+        return datacenters.get(table).get(dc);
+    }
+
+    public Set<String> getDatacenters(String table)
+    {
+        return datacenters.get(table).keySet();
+    }
+
     /**
      * This method will generate the QRH object and returns. If the Consistency
      * level is DCQUORUM then it will return a DCQRH with a map of local rep
@@ -235,21 +231,31 @@ public class DatacenterShardStrategy ext
      * return a DCQRH with a map of all the DC rep factor.
      */
     @Override
-    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
+    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistency_level, String table)
     {
         if (consistency_level == ConsistencyLevel.DCQUORUM)
         {
-            return new DatacenterWriteResponseHandler(locQFactor, table);
+            // block for in this context will be localnodes block.
+            return new DatacenterWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
         }
         else if (consistency_level == ConsistencyLevel.DCQUORUMSYNC)
         {
-            return new DatacenterSyncWriteResponseHandler(getQuorumRepFactor(), table);
+            return new DatacenterSyncWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
         }
-        return super.getWriteResponseHandler(blockFor, consistency_level, table);
+        return super.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
     }
 
-    int getReplicationFactor(String datacenter)
+    /**
+     * This method will generate the WRH object and returns. If the Consistency
+     * level is DCQUORUM/DCQUORUMSYNC then it will return a DCQRH.
+     */
+    @Override
+    public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver responseResolver, ConsistencyLevel consistencyLevel, String table)
     {
-        return dcReplicationFactor.get(datacenter);
+        if (consistencyLevel.equals(ConsistencyLevel.DCQUORUM) || consistencyLevel.equals(ConsistencyLevel.DCQUORUMSYNC))
+        {
+            return new DatacenterQuorumResponseHandler(responseResolver, consistencyLevel, table);
+        }
+        return super.getQuorumResponseHandler(responseResolver, consistencyLevel, table);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java Mon May 17 19:58:39 2010
@@ -78,7 +78,7 @@ public class PropertyFileSnitch extends 
      * @return a array of string with the first index being the data center and the second being the rack
      */
     public String[] getEndpointInfo(InetAddress endpoint) {
-        String key = endpoint.toString();
+        String key = endpoint.getHostAddress();
         String value = hostProperties.getProperty(key);
         if (value == null)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackAwareStrategy.java Mon May 17 19:58:39 2010
@@ -60,37 +60,30 @@ public class RackAwareStrategy extends A
         boolean bOtherRack = false;
         while (endpoints.size() < replicas && iter.hasNext())
         {
-            try
-            {
-                AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch)snitch_;
+            AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch)snitch_;
 
-                // First try to find one in a different data center
-                Token t = iter.next();
-                if (!snitch.getDatacenter(metadata.getEndpoint(primaryToken)).equals(snitch.getDatacenter(metadata.getEndpoint(t))))
-                {
-                    // If we have already found something in a diff datacenter no need to find another
-                    if (!bDataCenter)
-                    {
-                        endpoints.add(metadata.getEndpoint(t));
-                        bDataCenter = true;
-                    }
-                    continue;
-                }
-                // Now  try to find one on a different rack
-                if (!snitch.getRack(metadata.getEndpoint(primaryToken)).equals(snitch.getRack(metadata.getEndpoint(t))) &&
-                    snitch.getDatacenter(metadata.getEndpoint(primaryToken)).equals(snitch.getDatacenter(metadata.getEndpoint(t))))
+            // First try to find one in a different data center
+            Token t = iter.next();
+            if (!snitch.getDatacenter(metadata.getEndpoint(primaryToken)).equals(snitch.getDatacenter(metadata.getEndpoint(t))))
+            {
+                // If we have already found something in a diff datacenter no need to find another
+                if (!bDataCenter)
                 {
-                    // If we have already found something in a diff rack no need to find another
-                    if (!bOtherRack)
-                    {
-                        endpoints.add(metadata.getEndpoint(t));
-                        bOtherRack = true;
-                    }
+                    endpoints.add(metadata.getEndpoint(t));
+                    bDataCenter = true;
                 }
+                continue;
             }
-            catch (UnknownHostException e)
+            // Now  try to find one on a different rack
+            if (!snitch.getRack(metadata.getEndpoint(primaryToken)).equals(snitch.getRack(metadata.getEndpoint(t))) &&
+                snitch.getDatacenter(metadata.getEndpoint(primaryToken)).equals(snitch.getDatacenter(metadata.getEndpoint(t))))
             {
-                throw new RuntimeException(e);
+                // If we have already found something in a diff rack no need to find another
+                if (!bOtherRack)
+                {
+                    endpoints.add(metadata.getEndpoint(t));
+                    bOtherRack = true;
+                }
             }
 
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java Mon May 17 19:58:39 2010
@@ -19,8 +19,6 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
 
 /**
  * A simple endpoint snitch implementation that assumes datacenter and rack information is encoded
@@ -28,12 +26,12 @@ import java.util.*;
  */
 public class RackInferringSnitch extends AbstractRackAwareSnitch
 {
-    public String getRack(InetAddress endpoint) throws UnknownHostException
+    public String getRack(InetAddress endpoint)
     {
         return Byte.toString(endpoint.getAddress()[2]);
     }
 
-    public String getDatacenter(InetAddress endpoint) throws UnknownHostException
+    public String getDatacenter(InetAddress endpoint)
     {
         return Byte.toString(endpoint.getAddress()[1]);
     }

Added: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=945333&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java Mon May 17 19:58:39 2010
@@ -0,0 +1,55 @@
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.SimpleCondition;
+
+public abstract class AbstractWriteResponseHandler implements IAsyncCallback
+{
+    protected final SimpleCondition condition = new SimpleCondition();
+    protected final long startTime;
+    protected final Collection<InetAddress> writeEndpoints;
+    protected final Multimap<InetAddress, InetAddress> hintedEndpoints;
+    protected final ConsistencyLevel consistencyLevel;
+
+    public AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
+    {
+        startTime = System.currentTimeMillis();
+        this.consistencyLevel = consistencyLevel;
+        this.hintedEndpoints = hintedEndpoints;
+        this.writeEndpoints = writeEndpoints;
+    }
+
+    public void get() throws TimeoutException
+    {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        boolean success;
+        try
+        {
+            success = condition.await(timeout, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException ex)
+        {
+            throw new AssertionError(ex);
+        }
+
+        if (!success)
+        {
+            throw new TimeoutException();
+        }
+    }
+
+    /** null message means "response from local write" */
+    public abstract void response(Message msg);
+
+    public abstract void assureSufficientLiveNodes() throws UnavailableException;
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Mon May 17 19:58:39 2010
@@ -115,7 +115,7 @@ class ConsistencyChecker implements Runn
 
                 if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
+                    IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_);
                     IAsyncCallback responseHandler;
                     if (replicas_.contains(FBUtilities.getLocalAddress()))
                         responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);

Added: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java?rev=945333&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java Mon May 17 19:58:39 2010
@@ -0,0 +1,50 @@
+package org.apache.cassandra.service;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.AbstractRackAwareSnitch;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
+import org.apache.cassandra.locator.RackInferringSnitch;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Datacenter Quorum response handler blocks for a quorum of responses from the local DC
+ */
+public class DatacenterQuorumResponseHandler<T> extends QuorumResponseHandler<T>
+{
+    private static final AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
+	private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+    private AtomicInteger localResponses;
+    
+    public DatacenterQuorumResponseHandler(IResponseResolver<T> responseResolver, ConsistencyLevel consistencyLevel, String table)
+    {
+        super(responseResolver, consistencyLevel, table);
+        localResponses = new AtomicInteger(blockfor);
+    }
+    
+    @Override
+    public void response(Message message)
+    {
+        responses.add(message); // we'll go ahead and resolve a reply from anyone, even if it's not from this dc
+
+        int n;
+        n = localdc.equals(snitch.getDatacenter(message.getFrom())) 
+                ? localResponses.decrementAndGet()
+                : localResponses.get();
+
+        if (n == 0 && responseResolver.isDataPresent(responses))
+        {
+            condition.signal();
+        }
+    }
+    
+    @Override
+    public int determineBlockFor(ConsistencyLevel consistency_level, String table)
+	{
+		DatacenterShardStrategy stategy = (DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
+		return (stategy.getReplicationFactor(localdc, table) / 2) + 1;
+	}
+}
\ No newline at end of file

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Mon May 17 19:58:39 2010
@@ -24,73 +24,92 @@ package org.apache.cassandra.service;
  */
 
 
+import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.AbstractRackAwareSnitch;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
+
+import com.google.common.collect.Multimap;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
- * This class will block for the replication factor which is
- * provided in the input map. it will block till we recive response from
- * n nodes in each of our data centers.
+ * This class blocks for a quorum of responses _in all datacenters_ (CL.DCQUORUMSYNC).
  */
-public class DatacenterSyncWriteResponseHandler extends WriteResponseHandler
+public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHandler
 {
-    private final Map<String, Integer> dcResponses = new HashMap<String, Integer>();
-    private final Map<String, Integer> responseCounts;
-    private final AbstractRackAwareSnitch endpointSnitch;
+    private static final AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
+
+    private static final String localdc;
+    static
+    {
+        localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+    }
 
-    public DatacenterSyncWriteResponseHandler(Map<String, Integer> responseCounts, String table)
+	private final DatacenterShardStrategy strategy;
+    private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
+    private final String table;
+
+    public DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(1, table);
-        this.responseCounts = responseCounts;
-        endpointSnitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
+        super(writeEndpoints, hintedEndpoints, consistencyLevel);
+        assert consistencyLevel == ConsistencyLevel.DCQUORUM;
+
+        this.table = table;
+        strategy = (DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
+
+        for (String dc : strategy.getDatacenters(table))
+        {
+            int rf = strategy.getReplicationFactor(dc, table);
+            responses.put(dc, new AtomicInteger((rf / 2) + 1));
+        }
     }
 
-    @Override
-    // synchronized for the benefit of dcResponses and responseCounts.  "responses" itself
-    // is inherited from WRH and is concurrent.
-    // TODO can we use concurrent structures instead?
-    public synchronized void response(Message message)
+    public void response(Message message)
     {
-        try
+        String dataCenter = message == null
+                            ? localdc
+                            : snitch.getDatacenter(message.getFrom());
+
+        responses.get(dataCenter).getAndDecrement();
+
+        for (AtomicInteger i : responses.values())
         {
-            String dataCenter = endpointSnitch.getDatacenter(message.getFrom());
-            Object blockFor = responseCounts.get(dataCenter);
-            // If this DC needs to be blocked then do the below.
-            if (blockFor != null)
-            {
-                Integer quorumCount = dcResponses.get(dataCenter);
-                if (quorumCount == null)
-                {
-                    // Intialize and recognize the first response
-                    dcResponses.put(dataCenter, 1);
-                }
-                else if ((Integer) blockFor > quorumCount)
-                {
-                    // recognize the consequtive responses.
-                    dcResponses.put(dataCenter, quorumCount + 1);
-                }
-                else
-                {
-                    // No need to wait on it anymore so remove it.
-                    responseCounts.remove(dataCenter);
-                }
-            }
+            if (0 < i.get())
+                return;
         }
-        catch (UnknownHostException e)
+
+        // all the quorum conditionas are met
+        condition.signal();
+    }
+
+    public void assureSufficientLiveNodes() throws UnavailableException
+    {   
+		Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>();
+        for (String dc: strategy.getDatacenters(table))
+            dcEndpoints.put(dc, new AtomicInteger());
+        for (InetAddress destination : hintedEndpoints.keySet())
         {
-            throw new RuntimeException(e);
+            assert writeEndpoints.contains(destination);
+            // figure out the destination dc
+            String destinationDC = snitch.getDatacenter(destination);
+            dcEndpoints.get(destinationDC).incrementAndGet();
         }
-        responses.add(message);
-        // If done then the response count will be empty
-        if (responseCounts.isEmpty())
+
+        // Throw exception if any of the DC doesnt have livenodes to accept write.
+        for (String dc: strategy.getDatacenters(table)) 
         {
-            condition.signal();
+        	if (dcEndpoints.get(dc).get() != responses.get(dc).get())
+                throw new UnavailableException();
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Mon May 17 19:58:39 2010
@@ -26,56 +26,70 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.AbstractRackAwareSnitch;
+import org.apache.cassandra.locator.DatacenterShardStrategy;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.google.common.collect.Multimap;
+
 /**
- * This class will basically will block for the replication factor which is
- * provided in the input map. it will block till we recive response from (DC, n)
- * nodes.
+ * This class blocks for a quorum of responses _in the local datacenter only_ (CL.DCQUORUM).
  */
 public class DatacenterWriteResponseHandler extends WriteResponseHandler
 {
-    private final AtomicInteger blockFor;
-    private final AbstractRackAwareSnitch endpointsnitch;
-    private final InetAddress localEndpoint;
+    private static final AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
+
+    private static final String localdc;
+    static
+    {
+        localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
+    }
+
+    public DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    {
+        super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        assert consistencyLevel == ConsistencyLevel.DCQUORUM;
+    }
 
-    public DatacenterWriteResponseHandler(int blockFor, String table)
+
+    @Override
+    protected int determineBlockFor(String table)
     {
-        // Response is been managed by the map so the waitlist size really doesnt matter.
-        super(blockFor, table);
-        this.blockFor = new AtomicInteger(blockFor);
-        endpointsnitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
-        localEndpoint = FBUtilities.getLocalAddress();
+        DatacenterShardStrategy strategy = (DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
+        return (strategy.getReplicationFactor(localdc, table) / 2) + 1;
     }
 
+
     @Override
     public void response(Message message)
     {
-        //Is optimal to check if same datacenter than comparing Arrays.
-        int b = -1;
-        try
+        if (message == null || localdc.equals(snitch.getDatacenter(message.getFrom())))
         {
-            if (endpointsnitch.getDatacenter(localEndpoint).equals(endpointsnitch.getDatacenter(message.getFrom())))
-            {
-                b = blockFor.decrementAndGet();
-            }
+            if (responses.decrementAndGet() == 0)
+                condition.signal();
         }
-        catch (UnknownHostException e)
+    }
+    
+    @Override
+    public void assureSufficientLiveNodes() throws UnavailableException
+    {
+        int liveNodes = 0;
+        for (InetAddress destination : writeEndpoints)
         {
-            throw new RuntimeException(e);
+            if (localdc.equals(snitch.getDatacenter(destination)))
+                liveNodes++;
         }
-        responses.add(message);
-        if (b == 0)
+
+        if (liveNodes < responses.get())
         {
-            //Singnal when Quorum is recived.
-            condition.signal();
+            throw new UnavailableException();
         }
-        if (logger.isDebugEnabled())
-            logger.debug("Processed Message: " + message.toString());
     }
-}
+}
\ No newline at end of file

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Mon May 17 19:58:39 2010
@@ -19,9 +19,6 @@
 package org.apache.cassandra.service;
 
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -31,6 +28,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.SimpleCondition;
 
 import org.slf4j.Logger;
@@ -40,15 +38,19 @@ public class QuorumResponseHandler<T> im
 {
     protected static final Logger logger = LoggerFactory.getLogger( QuorumResponseHandler.class );
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final Collection<Message> responses;
-    private IResponseResolver<T> responseResolver;
+    protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();;
+    protected IResponseResolver<T> responseResolver;
     private final long startTime;
-
-    public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver)
+    protected int blockfor;
+    
+    /**
+     * Constructor when response count has to be calculated and blocked for.
+     */
+    public QuorumResponseHandler(IResponseResolver<T> responseResolver, ConsistencyLevel consistencyLevel, String table)
     {
-        responses = new LinkedBlockingQueue<Message>();
+        this.blockfor = determineBlockFor(consistencyLevel, table);
         this.responseResolver = responseResolver;
-        startTime = System.currentTimeMillis();
+        this.startTime = System.currentTimeMillis();
     }
     
     public T get() throws TimeoutException, DigestMismatchException, IOException
@@ -90,9 +92,28 @@ public class QuorumResponseHandler<T> im
     public void response(Message message)
     {
         responses.add(message);
+        if (responses.size() < blockfor) {
+            return;
+        }
         if (responseResolver.isDataPresent(responses))
         {
             condition.signal();
         }
     }
+    
+    public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
+    {
+        switch (consistencyLevel)
+        {
+            case ONE:
+            case ANY:
+                return 1;
+            case QUORUM:
+                return (DatabaseDescriptor.getQuorum(table)/ 2) + 1;
+            case ALL:
+                return DatabaseDescriptor.getReplicationFactor(table);
+            default:
+                throw new UnsupportedOperationException("invalid consistency level: " + table.toString());
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon May 17 19:58:39 2010
@@ -32,7 +32,6 @@ import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,17 +44,12 @@ public class ReadResponseResolver implem
 {
 	private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
     private final String table;
-    private final int responseCount;
 
-    public ReadResponseResolver(String table, int responseCount)
+    public ReadResponseResolver(String table)
     {
-        assert 1 <= responseCount && responseCount <= DatabaseDescriptor.getReplicationFactor(table)
-            : "invalid response count " + responseCount;
-
-        this.responseCount = responseCount;
         this.table = table;
     }
-
+    
     /*
       * This method for resolving read data should look at the timestamps of each
       * of the columns that are read and should pick up columns with the latest
@@ -170,9 +164,6 @@ public class ReadResponseResolver implem
 
 	public boolean isDataPresent(Collection<Message> responses)
 	{
-        if (responses.size() < responseCount)
-            return false;
-
         boolean isDataPresent = false;
         for (Message response : responses)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon May 17 19:58:39 2010
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -43,7 +42,6 @@ import javax.management.ObjectName;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RangeSliceCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
@@ -198,10 +196,11 @@ public class StorageProxy implements Sto
     public static void mutateBlocking(List<RowMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
     {
         long startTime = System.nanoTime();
-        ArrayList<WriteResponseHandler> responseHandlers = new ArrayList<WriteResponseHandler>();
+        ArrayList<AbstractWriteResponseHandler> responseHandlers = new ArrayList<AbstractWriteResponseHandler>();
 
         RowMutation mostRecentRowMutation = null;
         StorageService ss = StorageService.instance;
+        
         try
         {
             for (RowMutation rm : mutations)
@@ -213,13 +212,11 @@ public class StorageProxy implements Sto
                 List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, rm.key());
                 Collection<InetAddress> writeEndpoints = rs.getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table, naturalEndpoints);
                 Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
-                int blockFor = determineBlockFor(writeEndpoints.size(), consistency_level);
-
-                // avoid starting a write we know can't achieve the required consistency
-                assureSufficientLiveNodes(blockFor, writeEndpoints, hintedEndpoints, consistency_level);
                 
                 // send out the writes, as in mutate() above, but this time with a callback that tracks responses
-                final WriteResponseHandler responseHandler = ss.getWriteResponseHandler(blockFor, consistency_level, table);
+                final AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
+                responseHandler.assureSufficientLiveNodes();
+
                 responseHandlers.add(responseHandler);
                 Message unhintedMessage = null;
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
@@ -268,7 +265,7 @@ public class StorageProxy implements Sto
                 }
             }
             // wait for writes.  throws timeoutexception if necessary
-            for( WriteResponseHandler responseHandler : responseHandlers )
+            for (AbstractWriteResponseHandler responseHandler : responseHandlers)
             {
                 responseHandler.get();
             }
@@ -287,30 +284,7 @@ public class StorageProxy implements Sto
 
     }
 
-    private static void assureSufficientLiveNodes(int blockFor, Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
-            throws UnavailableException
-    {
-        if (consistencyLevel == ConsistencyLevel.ANY)
-        {
-            // ensure there are blockFor distinct living nodes (hints are ok).
-            if (hintedEndpoints.keySet().size() < blockFor)
-                throw new UnavailableException();
-        }
-
-        // count destinations that are part of the desired target set
-        int liveNodes = 0;
-        for (InetAddress destination : hintedEndpoints.keySet())
-        {
-            if (writeEndpoints.contains(destination))
-                liveNodes++;
-        }
-        if (liveNodes < blockFor)
-        {
-            throw new UnavailableException();
-        }
-    }
-
-    private static void insertLocalMessage(final RowMutation rm, final WriteResponseHandler responseHandler)
+    private static void insertLocalMessage(final RowMutation rm, final AbstractWriteResponseHandler responseHandler)
     {
         if (logger.isDebugEnabled())
             logger.debug("insert writing local key " + FBUtilities.bytesToHex(rm.key()) + " (keyspace: " + rm.getTable() + ", CFs:" + rm.columnFamilyNames() + ")");
@@ -319,32 +293,12 @@ public class StorageProxy implements Sto
             public void runMayThrow() throws IOException
             {
                 rm.apply();
-                responseHandler.localResponse();
+                responseHandler.response(null);
             }
         };
         StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
     }
 
-    private static int determineBlockFor(int expandedTargets, ConsistencyLevel consistency_level)
-    {
-        switch (consistency_level)
-        {
-            case ONE:
-            case ANY:
-                return 1;
-            case QUORUM:
-                return (expandedTargets / 2) + 1;
-            case DCQUORUM:
-            case DCQUORUMSYNC:
-                // TODO this is broken
-                return expandedTargets;
-            case ALL:
-                return expandedTargets;
-            default:
-                throw new UnsupportedOperationException("invalid consistency level " + consistency_level);
-        }
-    }
-
     /**
      * Read the data from one replica.  When we get
      * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
@@ -461,10 +415,6 @@ public class StorageProxy implements Sto
 
             InetAddress dataPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
             List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
-            final String table = command.table;
-            int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), consistency_level);
-            if (endpointList.size() < responseCount)
-                throw new UnavailableException();
 
             InetAddress[] endpoints = new InetAddress[endpointList.size()];
             Message messages[] = new Message[endpointList.size()];
@@ -479,7 +429,9 @@ public class StorageProxy implements Sto
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
-            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(command.table), new ReadResponseResolver(command.table, responseCount));
+            AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.table);
+            ReadResponseResolver resolver = new ReadResponseResolver(command.table);
+            QuorumResponseHandler<Row> quorumResponseHandler = rs.getQuorumResponseHandler(resolver, consistency_level, command.table);
             MessagingService.instance.sendRR(messages, endpoints, quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndpoints.add(endpoints);
@@ -503,10 +455,9 @@ public class StorageProxy implements Sto
             {
                 if (randomlyReadRepair(command))
                 {
-                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum(command.table));
-                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
-                            DatabaseDescriptor.getQuorum(command.table),
-                            readResponseResolverRepair);
+                    IResponseResolver<Row> resolver = new ReadResponseResolver(command.table);
+                    AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.table);
+                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = rs.getQuorumResponseHandler(resolver, ConsistencyLevel.QUORUM, command.table);
                     logger.info("DigestMismatchException: " + ex.getMessage());
                     Message messageRepair = command.makeReadMessage();
                     MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(commandIndex), quorumResponseHandlerRepair);
@@ -566,9 +517,7 @@ public class StorageProxy implements Sto
         long startTime = System.nanoTime();
 
         final String table = command.keyspace;
-        int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), consistency_level);
-
-        List<Pair<AbstractBounds, List<InetAddress>>> ranges = getRestrictedRanges(command.range, command.keyspace, responseCount);
+        List<Pair<AbstractBounds, List<InetAddress>>> ranges = getRestrictedRanges(command.range, command.keyspace);
 
         // now scan until we have enough results
         List<Row> rows = new ArrayList<Row>(command.max_keys);
@@ -581,8 +530,8 @@ public class StorageProxy implements Sto
 
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, endpoints);
-            QuorumResponseHandler<List<Row>> handler = new QuorumResponseHandler<List<Row>>(responseCount, resolver);
-
+            AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(table);
+            QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, table);
             for (InetAddress endpoint : endpoints)
             {
                 MessagingService.instance.sendRR(message, endpoint, handler);
@@ -678,7 +627,7 @@ public class StorageProxy implements Sto
      *     D, but we don't want any other results from it until after the (D, T] range.  Unwrapping so that
      *     the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
      */
-    private static List<Pair<AbstractBounds, List<InetAddress>>> getRestrictedRanges(AbstractBounds queryRange, String keyspace, int responseCount)
+    private static List<Pair<AbstractBounds, List<InetAddress>>> getRestrictedRanges(AbstractBounds queryRange, String keyspace)
     throws UnavailableException
     {
         TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
@@ -689,11 +638,8 @@ public class StorageProxy implements Sto
             Token nodeToken = iter.next();
             Range nodeRange = new Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
-            if (endpoints.size() < responseCount)
-                throw new UnavailableException();
 
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
-            List<InetAddress> endpointsForCL = endpoints.subList(0, responseCount);
             Set<AbstractBounds> restrictedRanges = queryRange.restrictTo(nodeRange);
             for (AbstractBounds range : restrictedRanges)
             {
@@ -701,7 +647,7 @@ public class StorageProxy implements Sto
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("Adding to restricted ranges " + unwrapped + " for " + nodeRange);
-                    ranges.add(new Pair<AbstractBounds, List<InetAddress>>(unwrapped, endpointsForCL));
+                    ranges.add(new Pair<AbstractBounds, List<InetAddress>>(unwrapped, endpoints));
                 }
             }
         }
@@ -811,13 +757,13 @@ public class StorageProxy implements Sto
 
         Set<InetAddress> allEndpoints = Gossiper.instance.getLiveMembers();
         int blockFor = allEndpoints.size();
-        allEndpoints.remove(FBUtilities.getLocalAddress());
-        final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor, keyspace);
+        final TruncateResponseHandler responseHandler = new TruncateResponseHandler(blockFor);
 
         // Send out the truncate calls and track the responses with the callbacks.
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
-        truncateLocal(keyspace, cfname, responseHandler);
-        truncateRemotes(keyspace, cfname, allEndpoints, responseHandler);
+        Truncation truncation = new Truncation(keyspace, cfname);
+        Message message = truncation.makeTruncationMessage();
+        MessagingService.instance.sendRR(message, allEndpoints.toArray(new InetAddress[]{}), responseHandler);
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);
@@ -825,14 +771,6 @@ public class StorageProxy implements Sto
         logger.debug("truncate done");
     }
 
-    private static void truncateRemotes(String keyspace, String cfname, Set<InetAddress> hosts,
-                                        TruncateResponseHandler responseHandler) throws IOException
-    {
-        Truncation truncation = new Truncation(keyspace, cfname);
-        Message message = truncation.makeTruncationMessage();
-        MessagingService.instance.sendRR(message, hosts.toArray(new InetAddress[]{}), responseHandler);
-    }
-
     /**
      * Asks the gossiper if there are any nodes that are currently down.
      * @return true if the gossiper thinks all nodes are up.
@@ -841,22 +779,4 @@ public class StorageProxy implements Sto
     {
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
-
-    private static void truncateLocal(final String keyspace, final String cfname,
-                                      final TruncateResponseHandler responseHandler)
-    {
-        logger.debug("truncating locally (keyspace: {}, CF: {})", keyspace, cfname);
-        Runnable runnable = new WrappedRunnable()
-        {
-            public void runMayThrow() throws IOException, InterruptedException, ExecutionException
-            {
-                // truncate, blocking
-                Table.open(keyspace).truncate(cfname);
-                responseHandler.localResponse();
-                logger.debug("Finished truncating local (keyspace: {}, CF: {})", keyspace, cfname);
-            }
-        };
-        // TODO(ran): Is this correct? Should the truncate operation operate on the row mutation stage?
-        StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
-    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon May 17 19:58:39 2010
@@ -50,7 +50,6 @@ import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -1521,11 +1520,6 @@ public class StorageService implements I
         Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter + token.toString()));
     }
 
-    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
-    {
-        return getReplicationStrategy(table).getWriteResponseHandler(blockFor, consistency_level, table);
-    }
-
     public boolean isClientMode()
     {
         return isClientMode;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java Mon May 17 19:58:39 2010
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.service;
 
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,7 +25,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,67 +34,42 @@ public class TruncateResponseHandler imp
     protected static final Logger logger = LoggerFactory.getLogger(TruncateResponseHandler.class);
     protected final SimpleCondition condition = new SimpleCondition();
     private final int responseCount;
-    protected final Collection<Message> responses;
-    protected AtomicInteger localResponses = new AtomicInteger(0);
+    protected AtomicInteger responses = new AtomicInteger(0);
     private final long startTime;
 
-    public TruncateResponseHandler(int responseCount, String table)
+    public TruncateResponseHandler(int responseCount)
     {
         // at most one node per range can bootstrap at a time, and these will be added to the write until
         // bootstrap finishes (at which point we no longer need to write to the old ones).
         assert 1 <= responseCount: "invalid response count " + responseCount;
 
         this.responseCount = responseCount;
-        responses = new LinkedBlockingQueue<Message>();
         startTime = System.currentTimeMillis();
     }
 
     public void get() throws TimeoutException
     {
+        long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
+        boolean success;
         try
         {
-            long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
-            boolean success;
-            try
-            {
-                success = condition.await(timeout, TimeUnit.MILLISECONDS);
-            }
-            catch (InterruptedException ex)
-            {
-                throw new AssertionError(ex);
-            }
-
-            if (!success)
-            {
-                throw new TimeoutException("Operation timed out - received only " + responses.size() + localResponses + " responses");
-            }
+            success = condition.await(timeout, TimeUnit.MILLISECONDS); // TODO truncate needs a much longer timeout
         }
-        finally
+        catch (InterruptedException ex)
         {
-            for (Message response : responses)
-            {
-                MessagingService.removeRegisteredCallback(response.getMessageId());
-            }
+            throw new AssertionError(ex);
         }
-    }
-
-    public void response(Message message)
-    {
-        responses.add(message);
-        maybeSignal();
-    }
 
-    public void localResponse()
-    {
-        localResponses.addAndGet(1);
-        maybeSignal();
+        if (!success)
+        {
+            throw new TimeoutException("Truncate timed out - received only " + responses.get() + " responses");
+        }
     }
 
-    private void maybeSignal()
+    public void response(Message message)
     {
-        if (responses.size() + localResponses.get() >= responseCount)
-        {
+        responses.incrementAndGet();
+        if (responses.get() >= responseCount)
             condition.signal();
-        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Mon May 17 19:58:39 2010
@@ -18,92 +18,95 @@
 
 package org.apache.cassandra.service;
 
+import java.net.InetAddress;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.SimpleCondition;
-
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class WriteResponseHandler implements IAsyncCallback
+/**
+ * Handles blocking writes for ONE, ANY, QUORUM, and ALL consistency levels.
+ */
+public class WriteResponseHandler extends AbstractWriteResponseHandler
 {
-    protected static final Logger logger = LoggerFactory.getLogger( WriteResponseHandler.class );
-    protected final SimpleCondition condition = new SimpleCondition();
-    private final int responseCount;
-    protected final Collection<Message> responses;
-    protected AtomicInteger localResponses = new AtomicInteger(0);
-    private final long startTime;
+    protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class);
 
-    public WriteResponseHandler(int responseCount, String table)
-    {
-        // at most one node per range can bootstrap at a time, and these will be added to the write until
-        // bootstrap finishes (at which point we no longer need to write to the old ones).
-        assert 1 <= responseCount && responseCount <= 2 * DatabaseDescriptor.getReplicationFactor(table)
-            : "invalid response count " + responseCount;
+    protected AtomicInteger responses;
 
-        this.responseCount = responseCount;
-        responses = new LinkedBlockingQueue<Message>();
-        startTime = System.currentTimeMillis();
+    public WriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    {
+        super(writeEndpoints, hintedEndpoints, consistencyLevel);
+        responses = new AtomicInteger(determineBlockFor(table));
     }
 
-    public void get() throws TimeoutException
+    public WriteResponseHandler(InetAddress endpoint)
     {
-        try
-        {
-            long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
-            boolean success;
-            try
-            {
-                success = condition.await(timeout, TimeUnit.MILLISECONDS);
-            }
-            catch (InterruptedException ex)
-            {
-                throw new AssertionError(ex);
-            }
-
-            if (!success)
-            {
-                throw new TimeoutException("Operation timed out - received only " + responses.size() + localResponses + " responses");
-            }
-        }
-        finally
-        {
-            for (Message response : responses)
-            {
-                MessagingService.removeRegisteredCallback(response.getMessageId());
-            }
-        }
+        super(Arrays.asList(endpoint),
+              ImmutableMultimap.<InetAddress, InetAddress>builder().put(endpoint, endpoint).build(),
+              ConsistencyLevel.ALL);
+        responses = new AtomicInteger(1);
     }
 
-    public void response(Message message)
+    public void response(Message m)
     {
-        responses.add(message);
-        maybeSignal();
+        if (responses.decrementAndGet() == 0)
+            condition.signal();
     }
 
-    public void localResponse()
+    protected int determineBlockFor(String table)
     {
-        localResponses.addAndGet(1);
-        maybeSignal();
+        int blockFor = 0;
+        switch (consistencyLevel)
+        {
+            case ONE:
+                blockFor = 1;
+                break;
+            case ANY:
+                blockFor = 1;
+                break;
+            case QUORUM:
+                blockFor = (writeEndpoints.size() / 2) + 1;
+                break;
+            case ALL:
+                blockFor = writeEndpoints.size();
+                break;
+            default:
+                throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel.toString());
+        }
+        // at most one node per range can bootstrap at a time, and these will be added to the write until
+        // bootstrap finishes (at which point we no longer need to write to the old ones).
+        assert 1 <= blockFor && blockFor <= 2 * DatabaseDescriptor.getReplicationFactor(table)
+            : "invalid response count " + blockFor;
+        return blockFor;
     }
 
-    private void maybeSignal()
+    public void assureSufficientLiveNodes() throws UnavailableException
     {
-        if (responses.size() + localResponses.get() >= responseCount)
+        if (consistencyLevel == ConsistencyLevel.ANY)
         {
-            condition.signal();
+            // ensure there are blockFor distinct living nodes (hints are ok).
+            if (hintedEndpoints.keySet().size() < responses.get())
+                throw new UnavailableException();
+        }
+
+        // count destinations that are part of the desired target set
+        int liveNodes = 0;
+        for (InetAddress destination : hintedEndpoints.keySet())
+        {
+            if (writeEndpoints.contains(destination))
+                liveNodes++;
+        }
+        if (liveNodes < responses.get())
+        {
+            throw new UnavailableException();
         }
     }
 }

Copied: cassandra/trunk/test/conf/cassandra-rack.properties (from r945329, cassandra/trunk/conf/cassandra-rack.properties)
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra-rack.properties?p2=cassandra/trunk/test/conf/cassandra-rack.properties&p1=cassandra/trunk/conf/cassandra-rack.properties&r1=945329&r2=945333&rev=945333&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra-rack.properties (original)
+++ cassandra/trunk/test/conf/cassandra-rack.properties Mon May 17 19:58:39 2010
@@ -14,9 +14,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# Cassandra Node IP:Port=Data Center:Rack
-192.168.1.200\:7000=dc1:r1
-192.168.2.300\:7000=dc2:rA
+# Cassandra Node IP=Data Center:Rack
+192.168.1.200=DC1:RAC1
+192.168.2.300=DC2:RAC2
+
+10.0.0.10=DC1:RAC1
+10.0.0.11=DC1:RAC1
+10.0.0.12=DC1:RAC2
+
+10.20.114.10=DC2:RAC1
+10.20.114.11=DC2:RAC1
+
+10.21.119.13=DC3:RAC1
+10.21.119.10=DC3:RAC1
+
+10.0.0.13=DC1:RAC2
+10.21.119.14=DC3:RAC2
+10.20.114.15=DC2:RAC2
 
 # default for unknown nodes
-default=dc1:r1
+default=DC1:r1
\ No newline at end of file

Modified: cassandra/trunk/test/conf/datacenters.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/datacenters.properties?rev=945333&r1=945332&r2=945333&view=diff
==============================================================================
--- cassandra/trunk/test/conf/datacenters.properties (original)
+++ cassandra/trunk/test/conf/datacenters.properties Mon May 17 19:58:39 2010
@@ -14,7 +14,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-# datacenter=replication factor
-dc1=3
-dc2=5
-dc3=1
+# The sum of all the datacenter replication factor values should equal
+# the replication factor of the keyspace (i.e. sum(dc_rf) = RF)
+
+# keyspace\:datacenter=replication factor
+Keyspace1\:DC1=3
+Keyspace1\:DC2=2
+Keyspace1\:DC3=1
\ No newline at end of file

Added: cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java?rev=945333&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterShardStrategyTest.java Mon May 17 19:58:39 2010
@@ -0,0 +1,62 @@
+package org.apache.cassandra.locator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.dht.StringToken;
+import org.apache.cassandra.dht.Token;
+
+import org.junit.Test;
+
+import org.xml.sax.SAXException;
+
+public class DatacenterShardStrategyTest
+{
+    private String table = "Keyspace1";
+
+    @Test
+    public void testProperties() throws IOException, ParserConfigurationException, SAXException, ConfigurationException
+    {
+        PropertyFileSnitch snitch = new PropertyFileSnitch();
+        TokenMetadata metadata = new TokenMetadata();
+        createDummyTokens(metadata);
+        // Set the localhost to the tokenmetadata. Embeded cassandra way?
+        DatacenterShardStrategy strategy = new DatacenterShardStrategy(metadata, snitch);
+        assert strategy.getReplicationFactor("DC1", table) == 3;
+        assert strategy.getReplicationFactor("DC2", table) == 2;
+        assert strategy.getReplicationFactor("DC3", table) == 1;
+        // Query for the natural hosts
+        ArrayList<InetAddress> endpoints = strategy.getNaturalEndpoints(new StringToken("123"), table);
+        assert 6 == endpoints.size();
+    }
+
+    public void createDummyTokens(TokenMetadata metadata) throws UnknownHostException
+    {
+        // DC 1
+        tokenFactory(metadata, "123", new byte[]{ 10, 0, 0, 10 });
+        tokenFactory(metadata, "234", new byte[]{ 10, 0, 0, 11 });
+        tokenFactory(metadata, "345", new byte[]{ 10, 0, 0, 12 });
+        // Tokens for DC 2
+        tokenFactory(metadata, "789", new byte[]{ 10, 20, 114, 10 });
+        tokenFactory(metadata, "890", new byte[]{ 10, 20, 114, 11 });
+        //tokens for DC3
+        tokenFactory(metadata, "456", new byte[]{ 10, 21, 119, 13 });
+        tokenFactory(metadata, "567", new byte[]{ 10, 21, 119, 10 });
+        // Extra Tokens
+        tokenFactory(metadata, "90A", new byte[]{ 10, 0, 0, 13 });
+        tokenFactory(metadata, "0AB", new byte[]{ 10, 21, 119, 14 });
+        tokenFactory(metadata, "ABC", new byte[]{ 10, 20, 114, 15 });
+    }
+
+    public void tokenFactory(TokenMetadata metadata, String token, byte[] bytes) throws UnknownHostException
+    {
+        Token token1 = new StringToken(token);
+        InetAddress add1 = InetAddress.getByAddress(bytes);
+        metadata.updateNormalToken(token1, add1);
+    }
+}



Mime
View raw message