cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r942842 - in /cassandra/trunk: conf/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/locator/
Date Mon, 10 May 2010 18:53:50 GMT
Author: jbellis
Date: Mon May 10 18:53:49 2010
New Revision: 942842

URL: http://svn.apache.org/viewvc?rev=942842&view=rev
Log:
revert unreviewed patch to CASSANDRA-952

Removed:
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterQuorumResponseHandler.java
Modified:
    cassandra/trunk/conf/datacenters.properties
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java

Modified: cassandra/trunk/conf/datacenters.properties
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/datacenters.properties?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/conf/datacenters.properties (original)
+++ cassandra/trunk/conf/datacenters.properties Mon May 10 18:53:49 2010
@@ -14,10 +14,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+# datacenter=replication factor
 # The sum of all the datacenter replication factor values should equal
 # the replication factor of the keyspace (i.e. sum(dc_rf) = RF)
-
-# keyspace\:datacenter=replication factor
-Keyspace1\:dc1=3
-Keyspace1\:dc2=5
-keyspace1\:dc3=1
+dc1=3
+dc2=5
+dc3=1

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon May 10 18:53:49 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.db;
 
 import java.util.Collection;
 import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.ExecutorService;
 import java.io.IOException;
@@ -38,20 +37,15 @@ import java.net.InetAddress;
 
 import org.apache.commons.lang.ArrayUtils;
 
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.utils.FBUtilities;
 import static org.apache.cassandra.utils.FBUtilities.UTF8;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import com.google.common.collect.Multimap;
-
 
 /**
  * For each table (keyspace), there is a row in the system hints CF.
@@ -110,7 +104,7 @@ public class HintedHandOffManager
         }, "Hint delivery").start();
     }
 
-    private static boolean sendMessage(InetAddress endpoint, String tableName, byte[] key) throws IOException, UnavailableException
+    private static boolean sendMessage(InetAddress endpoint, String tableName, byte[] key) throws IOException
     {
         if (!Gossiper.instance.isKnownEndpoint(endpoint))
         {
@@ -132,12 +126,8 @@ public class HintedHandOffManager
                 rm.add(cf);
         }
         Message message = rm.makeRowMutationMessage();
-        InetAddress [] endpoints = new InetAddress[] { endpoint };
-        AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(tableName);
-        List<InetAddress> endpointlist = Arrays.asList(endpoints);
-        Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(endpointlist);
-        WriteResponseHandler responseHandler = new WriteResponseHandler(endpointlist, hintedEndpoints, ConsistencyLevel.ALL, tableName);
-        MessagingService.instance.sendRR(message, endpoints, responseHandler);
+        WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
+        MessagingService.instance.sendRR(message, new InetAddress[] { endpoint }, responseHandler);
 
         try
         {
@@ -164,9 +154,8 @@ public class HintedHandOffManager
         rm.apply();
     }
 
-    /** hintStore must be the hints columnfamily from the system table 
-     * @throws UnavailableException */
-    private static void deliverAllHints() throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException, UnavailableException
+    /** hintStore must be the hints columnfamily from the system table */
+    private static void deliverAllHints() throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Started deliverAllHints");
@@ -234,7 +223,7 @@ public class HintedHandOffManager
                || (hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null);
     }
 
-    private static void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException, UnavailableException
+    private static void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Started hinted handoff for endpoint " + endpoint);

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Mon May 10 18:53:49 2010
@@ -27,15 +27,11 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.service.IResponseResolver;
-import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -45,7 +41,7 @@ public abstract class AbstractReplicatio
 {
     protected static final Logger logger_ = LoggerFactory.getLogger(AbstractReplicationStrategy.class);
 
-    protected TokenMetadata tokenMetadata_;
+    private TokenMetadata tokenMetadata_;
     protected final IEndpointSnitch snitch_;
 
     AbstractReplicationStrategy(TokenMetadata tokenMetadata, IEndpointSnitch snitch)
@@ -194,8 +190,4 @@ public abstract class AbstractReplicatio
         return getAddressRanges(temp, table).get(pendingAddress);
     }
 
-    public QuorumResponseHandler getQuorumResponseHandler(IResponseResolver responseResolver, ConsistencyLevel consistencyLevel, String table)
-    {
-        return new QuorumResponseHandler(responseResolver, consistencyLevel, table);
-    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DatacenterShardStrategy.java Mon May 10 18:53:49 2010
@@ -150,36 +150,37 @@ public class DatacenterShardStrategy ext
         }
     }
 
-    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token searchToken, TokenMetadata metadata, String table) throws UnknownHostException
+    private ArrayList<InetAddress> getNaturalEndpointsInternal(Token searchToken, TokenMetadata metadata) throws IOException
     {
         ArrayList<InetAddress> endpoints = new ArrayList<InetAddress>();
 
         if (metadata.sortedTokens().size() == 0)
             return endpoints;
 
-        if (tokensize != metadata.sortedTokens().size())
+        if (null == tokens || tokens.size() != metadata.sortedTokens().size())
         {
-            loadEndPoints(metadata);
+            loadEndpoints(metadata);
         }
 
-        for (String dc : dcTokens.keySet())
+        for (String dc : dcMap.keySet())
         {
-            int replicas_ = getReplicationFactor(dc, table);
-            List<Token> tokens = dcTokens.get(dc);
+            int replicas_ = dcReplicationFactor.get(dc);
+            ArrayList<InetAddress> forloopReturn = new ArrayList<InetAddress>(replicas_);
+            List<Token> tokens = dcMap.get(dc);
             boolean bOtherRack = false;
             boolean doneDataCenterItr;
             // Add the node at the index by default
             Iterator<Token> iter = TokenMetadata.ringIterator(tokens, searchToken);
             InetAddress primaryHost = metadata.getEndpoint(iter.next());
-            endpoints.add(primaryHost);
+            forloopReturn.add(primaryHost);
 
-            while (endpoints.size() < replicas_ && iter.hasNext())
+            while (forloopReturn.size() < replicas_ && iter.hasNext())
             {
                 Token t = iter.next();
-                InetAddress endPointOfInterest = metadata.getEndpoint(t);
-                if (endpoints.size() < replicas_ - 1)
+                InetAddress endpointOfInterest = metadata.getEndpoint(t);
+                if (forloopReturn.size() < replicas_ - 1)
                 {
-                	endpoints.add(endPointOfInterest);
+                    forloopReturn.add(endpointOfInterest);
                     continue;
                 }
                 else
@@ -190,9 +191,10 @@ public class DatacenterShardStrategy ext
                 // Now try to find one on a different rack
                 if (!bOtherRack)
                 {
-                    if (!snitch.getRack(primaryHost).equals(snitch.getRack(endPointOfInterest)))
+                    AbstractRackAwareSnitch snitch = (AbstractRackAwareSnitch)snitch_;
+                    if (!snitch.getRack(primaryHost).equals(snitch.getRack(endpointOfInterest)))
                     {
-                    	endpoints.add(metadata.getEndpoint(t));
+                        forloopReturn.add(metadata.getEndpoint(t));
                         bOtherRack = true;
                     }
                 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/RackInferringSnitch.java Mon May 10 18:53:49 2010
@@ -19,6 +19,8 @@
 package org.apache.cassandra.locator;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
 
 /**
  * A simple endpoint snitch implementation that assumes datacenter and rack information is encoded
@@ -26,12 +28,12 @@ import java.net.InetAddress;
  */
 public class RackInferringSnitch extends AbstractRackAwareSnitch
 {
-    public String getRack(InetAddress endpoint)
+    public String getRack(InetAddress endpoint) throws UnknownHostException
     {
         return Byte.toString(endpoint.getAddress()[2]);
     }
 
-    public String getDatacenter(InetAddress endpoint)
+    public String getDatacenter(InetAddress endpoint) throws UnknownHostException
     {
         return Byte.toString(endpoint.getAddress()[1]);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Mon May 10 18:53:49 2010
@@ -112,7 +112,7 @@ class ConsistencyChecker implements Runn
 
                 if (!Arrays.equals(ColumnFamily.digest(row_.cf), digest))
                 {
-                    IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_);
+                    IResponseResolver<Row> readResponseResolver = new ReadResponseResolver(table_, replicas_.size());
                     IAsyncCallback responseHandler;
                     if (replicas_.contains(FBUtilities.getLocalAddress()))
                         responseHandler = new DataRepairHandler(row_, replicas_.size(), readResponseResolver);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Mon May 10 18:53:49 2010
@@ -24,19 +24,13 @@ package org.apache.cassandra.service;
  */
 
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.locator.DatacenterShardStrategy;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.AbstractRackAwareSnitch;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
-
-import com.google.common.collect.Multimap;
 
 /**
  * This class will block for the replication factor which is
@@ -45,85 +39,58 @@ import com.google.common.collect.Multima
  */
 public class DatacenterSyncWriteResponseHandler extends WriteResponseHandler
 {
-	private final DatacenterShardStrategy stategy = (DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
-    private HashMap<String, AtomicInteger> dcResponses;
+    private final Map<String, Integer> dcResponses = new HashMap<String, Integer>();
+    private final Map<String, Integer> responseCounts;
+    private final AbstractRackAwareSnitch endpointSnitch;
 
-    public DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
-    throws UnavailableException
+    public DatacenterSyncWriteResponseHandler(Map<String, Integer> responseCounts, String table)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        super(1, table);
+        this.responseCounts = responseCounts;
+        endpointSnitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
     }
 
+    @Override
     // synchronized for the benefit of dcResponses and responseCounts.  "responses" itself
     // is inherited from WRH and is concurrent.
-    @Override
-    public void response(Message message)
+    // TODO can we use concurrent structures instead?
+    public synchronized void response(Message message)
     {
-        responses.add(message);
         try
         {
-            String dataCenter = endpointsnitch.getDatacenter(message.getFrom());
+            String dataCenter = endpointSnitch.getDatacenter(message.getFrom());
+            Object blockFor = responseCounts.get(dataCenter);
             // If this DC needs to be blocked then do the below.
-            dcResponses.get(dataCenter).getAndDecrement();
+            if (blockFor != null)
+            {
+                Integer quorumCount = dcResponses.get(dataCenter);
+                if (quorumCount == null)
+                {
+                    // Intialize and recognize the first response
+                    dcResponses.put(dataCenter, 1);
+                }
+                else if ((Integer) blockFor > quorumCount)
+                {
+                    // recognize the consequtive responses.
+                    dcResponses.put(dataCenter, quorumCount + 1);
+                }
+                else
+                {
+                    // No need to wait on it anymore so remove it.
+                    responseCounts.remove(dataCenter);
+                }
+            }
         }
         catch (UnknownHostException e)
         {
             throw new RuntimeException(e);
         }
-        maybeSignal();
-    }
-    
-    private void maybeSignal()
-    {
-    	for(AtomicInteger i : dcResponses.values()) {
-    		if (0 < i.get()) {
-    			return;
-    		}
-    	}
-    	// If all the quorum conditionas are met then return back.
-    	condition.signal();
-    }
-    
-    @Override
-    public int determineBlockFor(Collection<InetAddress> writeEndpoints)
-    {        
-        this.dcResponses = new HashMap<String, AtomicInteger>();
-        for (String dc: stategy.getDatacenters(table)) {
-        	int rf = stategy.getReplicationFactor(dc, table);
-        	dcResponses.put(dc, new AtomicInteger((rf/2) + 1));
-        }
-    	// Do nothing, there is no 'one' integer to block for
-        return 0;
-    }
-    
-    @Override
-    public void assureSufficientLiveNodes(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints) throws UnavailableException
-    {   
-		Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>();
-		try
-		{
-			for (String dc: stategy.getDatacenters(table))
-				dcEndpoints.put(dc, new AtomicInteger());
-			for (InetAddress destination : hintedEndpoints.keySet())
-			{
-				// If not just go to the next endpoint
-				if (!writeEndpoints.contains(destination))
-					continue;
-				// figure out the destination dc
-				String destinationDC = endpointsnitch.getDatacenter(destination);
-				dcEndpoints.get(destinationDC).incrementAndGet();
-			}
-		}
-		catch (UnknownHostException e)
-		{
-			throw new UnavailableException();
-		}
-        // Throw exception if any of the DC doesnt have livenodes to accept write.
-        for (String dc: stategy.getDatacenters(table)) {
-        	if (dcEndpoints.get(dc).get() != dcResponses.get(dc).get()) {
-                throw new UnavailableException();
-        	}
+        responses.add(message);
+        // If done then the response count will be empty
+        if (responseCounts.isEmpty())
+        {
+            condition.signal();
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Mon May 10 18:53:49 2010
@@ -26,19 +26,13 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.DatacenterShardStrategy;
-import org.apache.cassandra.locator.RackInferringSnitch;
+import org.apache.cassandra.locator.AbstractRackAwareSnitch;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 
-import com.google.common.collect.Multimap;
-
 /**
  * This class will basically will block for the replication factor which is
  * provided in the input map. it will block till we recive response from (DC, n)
@@ -46,26 +40,27 @@ import com.google.common.collect.Multima
  */
 public class DatacenterWriteResponseHandler extends WriteResponseHandler
 {
-    private static final RackInferringSnitch snitch = (RackInferringSnitch) DatabaseDescriptor.getEndpointSnitch();
-	private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
-	private final AtomicInteger blockFor;
+    private final AtomicInteger blockFor;
+    private final AbstractRackAwareSnitch endpointsnitch;
+    private final InetAddress localEndpoint;
 
-    public DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table) throws UnavailableException
+    public DatacenterWriteResponseHandler(int blockFor, String table)
     {
         // Response is been managed by the map so the waitlist size really doesnt matter.
-        super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
-        blockFor = new AtomicInteger(responseCount);
+        super(blockFor, table);
+        this.blockFor = new AtomicInteger(blockFor);
+        endpointsnitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
+        localEndpoint = FBUtilities.getLocalAddress();
     }
 
     @Override
     public void response(Message message)
     {
-        responses.add(message);
         //Is optimal to check if same datacenter than comparing Arrays.
         int b = -1;
         try
         {
-            if (localdc.equals(endpointsnitch.getDatacenter(message.getFrom())))
+            if (endpointsnitch.getDatacenter(localEndpoint).equals(endpointsnitch.getDatacenter(message.getFrom())))
             {
                 b = blockFor.decrementAndGet();
             }
@@ -74,6 +69,7 @@ public class DatacenterWriteResponseHand
         {
             throw new RuntimeException(e);
         }
+        responses.add(message);
         if (b == 0)
         {
             //Singnal when Quorum is recived.
@@ -82,35 +78,4 @@ public class DatacenterWriteResponseHand
         if (logger.isDebugEnabled())
             logger.debug("Processed Message: " + message.toString());
     }
-    
-    @Override
-    public void assureSufficientLiveNodes(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
-    throws UnavailableException
-    {
-        int liveNodes = 0;
-        try {
-            // count destinations that are part of the desired target set
-            for (InetAddress destination : hintedEndpoints.keySet())
-            {
-                if (writeEndpoints.contains(destination) && localdc.equals(endpointsnitch.getDatacenter(destination)))
-                    liveNodes++;
-            }
-        } catch (Exception ex) {
-            throw new UnavailableException();
-        }
-        if (liveNodes < responseCount)
-        {
-            throw new UnavailableException();
-        }
-    }
-    
-    @Override
-    public int determineBlockFor(Collection<InetAddress> writeEndpoints)
-    {
-    	DatacenterShardStrategy stategy = (DatacenterShardStrategy) StorageService.instance.getReplicationStrategy(table);
-        if (consistencyLevel.equals(ConsistencyLevel.DCQUORUM)) {
-            return (stategy.getReplicationFactor(localdc, table)/2) + 1;
-        }
-        return super.determineBlockFor(writeEndpoints);
-    }
-}
\ No newline at end of file
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Mon May 10 18:53:49 2010
@@ -19,6 +19,9 @@
 package org.apache.cassandra.service;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -28,7 +31,6 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.SimpleCondition;
 
 import org.slf4j.Logger;
@@ -38,19 +40,15 @@ public class QuorumResponseHandler<T> im
 {
     protected static final Logger logger = LoggerFactory.getLogger( QuorumResponseHandler.class );
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final Collection<Message> responses = new LinkedBlockingQueue<Message>();;
-    protected IResponseResolver<T> responseResolver;
+    protected final Collection<Message> responses;
+    private IResponseResolver<T> responseResolver;
     private final long startTime;
-    protected int blockfor;
-    
-    /**
-     * Constructor when response count has to be calculated and blocked for.
-     */
-    public QuorumResponseHandler(IResponseResolver<T> responseResolver, ConsistencyLevel consistencyLevel, String table)
+
+    public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver)
     {
-        this.blockfor = determineBlockFor(consistencyLevel, table);
+        responses = new LinkedBlockingQueue<Message>();
         this.responseResolver = responseResolver;
-        this.startTime = System.currentTimeMillis();
+        startTime = System.currentTimeMillis();
     }
     
     public T get() throws TimeoutException, DigestMismatchException, IOException
@@ -92,28 +90,9 @@ public class QuorumResponseHandler<T> im
     public void response(Message message)
     {
         responses.add(message);
-        if (responses.size() < blockfor) {
-            return;
-        }
         if (responseResolver.isDataPresent(responses))
         {
             condition.signal();
         }
     }
-    
-    public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
-    {
-        switch (consistencyLevel)
-        {
-            case ONE:
-            case ANY:
-                return 1;
-            case QUORUM:
-                return (DatabaseDescriptor.getQuorum(table)/ 2) + 1;
-            case ALL:
-                return DatabaseDescriptor.getReplicationFactor(table);
-            default:
-                throw new UnsupportedOperationException("invalid consistency level: " + table.toString());
-        }
-    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Mon May 10 18:53:49 2010
@@ -30,6 +30,7 @@ import org.apache.cassandra.db.*;
 import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.config.DatabaseDescriptor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,12 +43,17 @@ public class ReadResponseResolver implem
 {
 	private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
     private final String table;
+    private final int responseCount;
 
-    public ReadResponseResolver(String table)
+    public ReadResponseResolver(String table, int responseCount)
     {
+        assert 1 <= responseCount && responseCount <= DatabaseDescriptor.getReplicationFactor(table)
+            : "invalid response count " + responseCount;
+
+        this.responseCount = responseCount;
         this.table = table;
     }
-    
+
     /*
       * This method for resolving read data should look at the timestamps of each
       * of the columns that are read and should pick up columns with the latest
@@ -153,6 +159,9 @@ public class ReadResponseResolver implem
 
 	public boolean isDataPresent(Collection<Message> responses)
 	{
+        if (responses.size() < responseCount)
+            return false;
+
         boolean isDataPresent = false;
         for (Message response : responses)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon May 10 18:53:49 2010
@@ -213,9 +213,13 @@ public class StorageProxy implements Sto
                 List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, rm.key());
                 Collection<InetAddress> writeEndpoints = rs.getWriteEndpoints(StorageService.getPartitioner().getToken(rm.key()), table, naturalEndpoints);
                 Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
+                int blockFor = determineBlockFor(writeEndpoints.size(), consistency_level);
+
+                // avoid starting a write we know can't achieve the required consistency
+                assureSufficientLiveNodes(blockFor, writeEndpoints, hintedEndpoints, consistency_level);
                 
                 // send out the writes, as in mutate() above, but this time with a callback that tracks responses
-                final WriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level, table);
+                final WriteResponseHandler responseHandler = ss.getWriteResponseHandler(blockFor, consistency_level, table);
                 responseHandlers.add(responseHandler);
                 Message unhintedMessage = null;
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
@@ -283,6 +287,29 @@ public class StorageProxy implements Sto
 
     }
 
+    private static void assureSufficientLiveNodes(int blockFor, Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
+            throws UnavailableException
+    {
+        if (consistencyLevel == ConsistencyLevel.ANY)
+        {
+            // ensure there are blockFor distinct living nodes (hints are ok).
+            if (hintedEndpoints.keySet().size() < blockFor)
+                throw new UnavailableException();
+        }
+
+        // count destinations that are part of the desired target set
+        int liveNodes = 0;
+        for (InetAddress destination : hintedEndpoints.keySet())
+        {
+            if (writeEndpoints.contains(destination))
+                liveNodes++;
+        }
+        if (liveNodes < blockFor)
+        {
+            throw new UnavailableException();
+        }
+    }
+
     private static void insertLocalMessage(final RowMutation rm, final WriteResponseHandler responseHandler)
     {
         if (logger.isDebugEnabled())
@@ -298,6 +325,26 @@ public class StorageProxy implements Sto
         StageManager.getStage(StageManager.MUTATION_STAGE).execute(runnable);
     }
 
+    private static int determineBlockFor(int expandedTargets, ConsistencyLevel consistency_level)
+    {
+        switch (consistency_level)
+        {
+            case ONE:
+            case ANY:
+                return 1;
+            case QUORUM:
+                return (expandedTargets / 2) + 1;
+            case DCQUORUM:
+            case DCQUORUMSYNC:
+                // TODO this is broken
+                return expandedTargets;
+            case ALL:
+                return expandedTargets;
+            default:
+                throw new UnsupportedOperationException("invalid consistency level " + consistency_level);
+        }
+    }
+
     /**
      * Read the data from one replica.  When we get
      * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
@@ -414,6 +461,10 @@ public class StorageProxy implements Sto
 
             InetAddress dataPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
             List<InetAddress> endpointList = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
+            final String table = command.table;
+            int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), consistency_level);
+            if (endpointList.size() < responseCount)
+                throw new UnavailableException();
 
             InetAddress[] endpoints = new InetAddress[endpointList.size()];
             Message messages[] = new Message[endpointList.size()];
@@ -428,9 +479,7 @@ public class StorageProxy implements Sto
                 if (logger.isDebugEnabled())
                     logger.debug("strongread reading " + (m == message ? "data" : "digest") + " for " + command + " from " + m.getMessageId() + "@" + endpoint);
             }
-            AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.table);
-            QuorumResponseHandler<Row> quorumResponseHandler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table), 
-                                                                                               consistency_level, command.table);
+            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(command.table), new ReadResponseResolver(command.table, responseCount));
             MessagingService.instance.sendRR(messages, endpoints, quorumResponseHandler);
             quorumResponseHandlers.add(quorumResponseHandler);
             commandEndpoints.add(endpoints);
@@ -454,10 +503,10 @@ public class StorageProxy implements Sto
             {
                 if (randomlyReadRepair(command))
                 {
-                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table);
-                    AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(command.table);
-                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = rs.getQuorumResponseHandler(readResponseResolverRepair, ConsistencyLevel.QUORUM, 
-                                                                                                         command.table);
+                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver(command.table, DatabaseDescriptor.getQuorum(command.table));
+                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
+                            DatabaseDescriptor.getQuorum(command.table),
+                            readResponseResolverRepair);
                     logger.info("DigestMismatchException: " + ex.getMessage());
                     Message messageRepair = command.makeReadMessage();
                     MessagingService.instance.sendRR(messageRepair, commandEndpoints.get(commandIndex), quorumResponseHandlerRepair);
@@ -517,7 +566,9 @@ public class StorageProxy implements Sto
         long startTime = System.nanoTime();
 
         final String table = command.keyspace;
-        List<Pair<AbstractBounds, List<InetAddress>>> ranges = getRestrictedRanges(command.range, command.keyspace);
+        int responseCount = determineBlockFor(DatabaseDescriptor.getReplicationFactor(table), consistency_level);
+
+        List<Pair<AbstractBounds, List<InetAddress>>> ranges = getRestrictedRanges(command.range, command.keyspace, responseCount);
 
         // now scan until we have enough results
         List<Row> rows = new ArrayList<Row>(command.max_keys);
@@ -530,8 +581,8 @@ public class StorageProxy implements Sto
 
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, endpoints);
-            AbstractReplicationStrategy rs = StorageService.instance.getReplicationStrategy(table);
-            QuorumResponseHandler<List<Row>> handler = rs.getQuorumResponseHandler(resolver, consistency_level, table);
+            QuorumResponseHandler<List<Row>> handler = new QuorumResponseHandler<List<Row>>(responseCount, resolver);
+
             for (InetAddress endpoint : endpoints)
             {
                 MessagingService.instance.sendRR(message, endpoint, handler);
@@ -627,7 +678,7 @@ public class StorageProxy implements Sto
      *     D, but we don't want any other results from it until after the (D, T] range.  Unwrapping so that
      *     the ranges we consider are (D, T], (T, MIN], (MIN, D] fixes this.
      */
-    private static List<Pair<AbstractBounds, List<InetAddress>>> getRestrictedRanges(AbstractBounds queryRange, String keyspace)
+    private static List<Pair<AbstractBounds, List<InetAddress>>> getRestrictedRanges(AbstractBounds queryRange, String keyspace, int responseCount)
     throws UnavailableException
     {
         TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
@@ -638,8 +689,11 @@ public class StorageProxy implements Sto
             Token nodeToken = iter.next();
             Range nodeRange = new Range(tokenMetadata.getPredecessor(nodeToken), nodeToken);
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, nodeToken);
+            if (endpoints.size() < responseCount)
+                throw new UnavailableException();
 
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
+            List<InetAddress> endpointsForCL = endpoints.subList(0, responseCount);
             Set<AbstractBounds> restrictedRanges = queryRange.restrictTo(nodeRange);
             for (AbstractBounds range : restrictedRanges)
             {
@@ -647,7 +701,7 @@ public class StorageProxy implements Sto
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("Adding to restricted ranges " + unwrapped + " for " + nodeRange);
-                    ranges.add(new Pair<AbstractBounds, List<InetAddress>>(unwrapped, endpoints));
+                    ranges.add(new Pair<AbstractBounds, List<InetAddress>>(unwrapped, endpointsForCL));
                 }
             }
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon May 10 18:53:49 2010
@@ -50,6 +50,7 @@ import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
 import org.apache.cassandra.streaming.*;
+import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
@@ -1520,6 +1521,11 @@ public class StorageService implements I
         Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + REMOVE_TOKEN + Delimiter + token.toString()));
     }
 
+    public WriteResponseHandler getWriteResponseHandler(int blockFor, ConsistencyLevel consistency_level, String table)
+    {
+        return getReplicationStrategy(table).getWriteResponseHandler(blockFor, consistency_level, table);
+    }
+
     public boolean isClientMode()
     {
         return isClientMode;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Mon May 10 18:53:49 2010
@@ -19,45 +19,41 @@
 package org.apache.cassandra.service;
 
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.net.InetAddress;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.AbstractRackAwareSnitch;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Multimap;
-
 public class WriteResponseHandler implements IAsyncCallback
 {
     protected static final Logger logger = LoggerFactory.getLogger( WriteResponseHandler.class );
-    protected static final AbstractRackAwareSnitch endpointsnitch = (AbstractRackAwareSnitch) DatabaseDescriptor.getEndpointSnitch();
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final int responseCount;
+    private final int responseCount;
     protected final Collection<Message> responses;
     protected AtomicInteger localResponses = new AtomicInteger(0);
-    protected final long startTime;
-	protected final ConsistencyLevel consistencyLevel;
-	protected final String table;
+    private final long startTime;
 
-    public WriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table) 
-    throws UnavailableException
+    public WriteResponseHandler(int responseCount, String table)
     {
-    	this.table = table;
-    	this.consistencyLevel = consistencyLevel;
-        this.responseCount = determineBlockFor(writeEndpoints);
-        assureSufficientLiveNodes(writeEndpoints, hintedEndpoints);
+        // at most one node per range can bootstrap at a time, and these will be added to the write until
+        // bootstrap finishes (at which point we no longer need to write to the old ones).
+        assert 1 <= responseCount && responseCount <= 2 * DatabaseDescriptor.getReplicationFactor(table)
+            : "invalid response count " + responseCount;
+
+        this.responseCount = responseCount;
         responses = new LinkedBlockingQueue<Message>();
         startTime = System.currentTimeMillis();
     }
@@ -110,54 +106,4 @@ public class WriteResponseHandler implem
             condition.signal();
         }
     }
-    
-    public int determineBlockFor(Collection<InetAddress> writeEndpoints)
-    {
-        int blockFor = 0;
-        switch (consistencyLevel)
-        {
-            case ONE:
-                blockFor = 1;
-                break;
-            case ANY:
-                blockFor = 1;
-                break;
-            case QUORUM:
-                blockFor = (writeEndpoints.size() / 2) + 1;
-                break;
-            case ALL:
-                blockFor = writeEndpoints.size();
-                break;
-            default:
-                throw new UnsupportedOperationException("invalid consistency level: " + consistencyLevel.toString());
-        }
-        // at most one node per range can bootstrap at a time, and these will be added to the write until
-        // bootstrap finishes (at which point we no longer need to write to the old ones).
-        assert 1 <= blockFor && blockFor <= 2 * DatabaseDescriptor.getReplicationFactor(table)
-            : "invalid response count " + responseCount;
-        return blockFor;
-    }
-    
-    public void assureSufficientLiveNodes(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints)
-    throws UnavailableException
-    {
-        if (consistencyLevel == ConsistencyLevel.ANY)
-        {
-            // ensure there are blockFor distinct living nodes (hints are ok).
-            if (hintedEndpoints.keySet().size() < responseCount)
-                throw new UnavailableException();
-        }
-        
-        // count destinations that are part of the desired target set
-        int liveNodes = 0;
-        for (InetAddress destination : hintedEndpoints.keySet())
-        {
-            if (writeEndpoints.contains(destination))
-                liveNodes++;
-        }
-        if (liveNodes < responseCount)
-        {
-            throw new UnavailableException();
-        }
-    }
-}
\ No newline at end of file
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java?rev=942842&r1=942841&r2=942842&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/locator/DatacenterStrategyTest.java Mon May 10 18:53:49 2010
@@ -1,31 +1,18 @@
 package org.apache.cassandra.locator;
 
-import java.io.IOException;
-import java.net.InetAddress;
-
-import javax.xml.parsers.ParserConfigurationException;
-
 import org.junit.Test;
-import org.xml.sax.SAXException;
+
+import org.apache.cassandra.config.ConfigurationException;
 
 public class DatacenterStrategyTest
 {
-    private String table = "Keyspace1";
-
-	@Test
-    public void testProperties() throws IOException, ParserConfigurationException, SAXException
+    @Test
+    public void testProperties() throws ConfigurationException
     {
-    	XMLFileSnitch snitch = new XMLFileSnitch();
-    	TokenMetadata metadata = new TokenMetadata();
-    	InetAddress localhost = InetAddress.getLocalHost();
-    	// Set the localhost to the tokenmetadata. Embeded cassandra way?
-    	// metadata.addBootstrapToken();
-        DatacenterShardStrategy strategy = new DatacenterShardStrategy(new TokenMetadata(), snitch);
-        assert strategy.getReplicationFactor("dc1", table) == 3;
-        assert strategy.getReplicationFactor("dc2", table) == 5;
-        assert strategy.getReplicationFactor("dc3", table) == 1;
-        // Query for the natural hosts
-        // strategy.getNaturalEndpoints(token, table)
+        DatacenterShardStrategy strategy = new DatacenterShardStrategy(new TokenMetadata(), new RackInferringSnitch());
+        assert strategy.getReplicationFactor("dc1") == 3;
+        assert strategy.getReplicationFactor("dc2") == 5;
+        assert strategy.getReplicationFactor("dc3") == 1;
     }
 
 }



Mime
View raw message