cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r921456 - in /incubator/cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
Date Wed, 10 Mar 2010 17:17:39 GMT
Author: jbellis
Date: Wed Mar 10 17:17:38 2010
New Revision: 921456

URL: http://svn.apache.org/viewvc?rev=921456&view=rev
Log:
fix thread-safety problems with IAsyncCallback.  patch by jbellis; reviewed by Roger Schildmeijer
and gdusbabek for CASSANDRA-864

Modified:
    incubator/cassandra/branches/cassandra-0.6/CHANGES.txt
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java

Modified: incubator/cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.6/CHANGES.txt Wed Mar 10 17:17:38 2010
@@ -18,6 +18,8 @@
  * fix classpath in cassandra-cli.bat for Windows (CASSANDRA-858)
  * allow re-specifying host, port to cassandra-cli if invalid ones
    are first tried (CASSANDRA-867)
+ * fix race condition handling rpc timeout in the coordinator
+   (CASSANDRA-864)
 
 
 0.6.0-beta1/beta2

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncCallback.java
Wed Mar 10 17:17:38 2010
@@ -18,11 +18,16 @@
 
 package org.apache.cassandra.net;
 
+/**
+ * implementors of IAsyncCallback need to make sure that any public methods
+ * are threadsafe with respect to response() being called from the message
+ * service.  In particular, if any shared state is referenced, making
+ * response alone synchronized will not suffice.
+ */
 public interface IAsyncCallback 
 {
 	/**
 	 * @param msg response received.
-     * Calls to response() are serialized by ResponseVerbHandler.
 	 */
 	public void response(Message msg);
 }

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Wed Mar 10 17:17:38 2010
@@ -32,10 +32,7 @@ public class ResponseVerbHandler impleme
         {
             if (logger_.isDebugEnabled())
                 logger_.debug("Processing response on a callback from " + message.getMessageId()
+ "@" + message.getFrom());
-            synchronized (cb)
-            {
-                cb.response(message);
-            }
+            cb.response(message);
         }
         else
         {

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyManager.java
Wed Mar 10 17:17:38 2010
@@ -21,23 +21,26 @@ package org.apache.cassandra.service;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.cache.ICacheExpungeHook;
+import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.ColumnFamily;
-import java.net.InetAddress;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.*;
-
-import org.apache.log4j.Logger;
-import org.apache.commons.lang.StringUtils;
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.FBUtilities;
 
 
 class ConsistencyManager implements Runnable
@@ -47,9 +50,10 @@ class ConsistencyManager implements Runn
 
     class DigestResponseHandler implements IAsyncCallback
 	{
-		List<Message> responses_ = new ArrayList<Message>();
+		Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
 
-		public void response(Message msg)
+        // syncronized so "size() == " works
+		public synchronized void response(Message msg)
 		{
 			responses_.add(msg);
             if (responses_.size() == ConsistencyManager.this.replicas_.size())
@@ -94,17 +98,18 @@ class ConsistencyManager implements Runn
 	
 	static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
 	{
-		private List<Message> responses_ = new ArrayList<Message>();
-		private IResponseResolver<Row> readResponseResolver_;
-		private int majority_;
+		private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
+		private final IResponseResolver<Row> readResponseResolver_;
+		private final int majority_;
 		
 		DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver)
 		{
 			readResponseResolver_ = readResponseResolver;
 			majority_ = (responseCount / 2) + 1;  
 		}
-		
-		public void response(Message message)
+
+        // synchronized so the " == majority" is safe
+		public synchronized void response(Message message)
 		{
 			if (logger_.isDebugEnabled())
 			  logger_.debug("Received responses in DataRepairHandler : " + message.toString());
@@ -120,7 +125,7 @@ class ConsistencyManager implements Runn
 		{
             try
 			{
-				readResponseResolver_.resolve(new ArrayList<Message>(responses_));
+				readResponseResolver_.resolve(responses_);
             }
             catch (Exception ex)
             {

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -53,12 +53,11 @@ public class DatacenterSyncWriteResponse
     }
 
     @Override
-    public void response(Message message)
+    // synchronized for the benefit of dcResponses and responseCounts.  "responses" itself
+    // is inherited from WRH and is concurrent.
+    // TODO can we use concurrent structures instead?
+    public synchronized void response(Message message)
     {
-        if (condition.isSignaled())
-        {
-            return;
-        }
         try
         {
             String dataCenter = endPointSnitch.getLocation(message.getFrom());
@@ -89,8 +88,7 @@ public class DatacenterSyncWriteResponse
             throw new RuntimeException(e);
         }
         responses.add(message);
-        // If done then the response count will be empty after removing
-        // everything.
+        // If done then the response count will be empty
         if (responseCounts.isEmpty())
         {
             condition.signal();

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -26,6 +26,7 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.IEndPointSnitch;
@@ -40,15 +41,15 @@ import org.apache.cassandra.utils.FBUtil
  */
 public class DatacenterWriteResponseHandler extends WriteResponseHandler
 {
-    private int blockFor;
-    private DatacenterEndPointSnitch endpointsnitch;
-    private InetAddress localEndpoint;
+    private final AtomicInteger blockFor;
+    private final DatacenterEndPointSnitch endpointsnitch;
+    private final InetAddress localEndpoint;
 
     public DatacenterWriteResponseHandler(int blockFor, String table)
     {
         // Response is been managed by the map so the waitlist size really doesnt matter.
         super(blockFor, table);
-        this.blockFor = blockFor;
+        this.blockFor = new AtomicInteger(blockFor);
         endpointsnitch = (DatacenterEndPointSnitch) DatabaseDescriptor.getEndPointSnitch(table);
         localEndpoint = FBUtilities.getLocalAddress();
     }
@@ -56,17 +57,13 @@ public class DatacenterWriteResponseHand
     @Override
     public void response(Message message)
     {
-        // IF done look no futher.
-        if (condition.isSignaled())
-        {
-            return;
-        }
-            //Is optimal to check if same datacenter than comparing Arrays.
+        //Is optimal to check if same datacenter than comparing Arrays.
+        int b = -1;
         try
         {
             if (endpointsnitch.isInSameDataCenter(localEndpoint, message.getFrom()))
             {
-                blockFor--;
+                b = blockFor.decrementAndGet();
             }
         }
         catch (UnknownHostException e)
@@ -74,7 +71,7 @@ public class DatacenterWriteResponseHand
             throw new RuntimeException(e);
         }
         responses.add(message);
-        if (blockFor <= 0)
+        if (b == 0)
         {
             //Singnal when Quorum is recived.
             condition.signal();

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
Wed Mar 10 17:17:38 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service;
 
+import java.util.Collection;
 import java.util.List;
 import java.io.IOException;
 
@@ -33,7 +34,7 @@ public interface IResponseResolver<T> {
 	 * repairs . Hence you need to derive a response resolver based on your
 	 * needs from this interface.
 	 */
-	public T resolve(List<Message> responses) throws DigestMismatchException, IOException;
-	public boolean isDataPresent(List<Message> responses);
+	public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException;
+	public boolean isDataPresent(Collection<Message> responses);
 
 }

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -18,8 +18,11 @@
 
 package org.apache.cassandra.service;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.io.IOException;
@@ -36,14 +39,14 @@ public class QuorumResponseHandler<T> im
 {
     protected static final Logger logger = Logger.getLogger( QuorumResponseHandler.class
);
     protected final SimpleCondition condition = new SimpleCondition();
-    protected final List<Message> responses;
+    protected final Collection<Message> responses;
     private IResponseResolver<T> responseResolver;
     private final long startTime;
 
     public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver)
     {
-        responses = new ArrayList<Message>(responseCount);
-        this.responseResolver =  responseResolver;
+        responses = new LinkedBlockingQueue<Message>();
+        this.responseResolver = responseResolver;
         startTime = System.currentTimeMillis();
     }
     
@@ -85,9 +88,6 @@ public class QuorumResponseHandler<T> im
     
     public void response(Message message)
     {
-        if (condition.isSignaled())
-            return;
-
         responses.add(message);
         if (responseResolver.isDataPresent(responses))
         {

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Wed Mar 10 17:17:38 2010
@@ -51,7 +51,7 @@ public class RangeSliceResponseResolver 
         this.table = table;
     }
 
-    public List<Row> resolve(List<Message> responses) throws DigestMismatchException,
IOException
+    public List<Row> resolve(Collection<Message> responses) throws DigestMismatchException,
IOException
     {
         CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
         {
@@ -99,7 +99,7 @@ public class RangeSliceResponseResolver 
         return resolvedRows;
     }
 
-    public boolean isDataPresent(List<Message> responses)
+    public boolean isDataPresent(Collection<Message> responses)
     {
         return responses.size() >= sources.size();
     }

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Wed Mar 10 17:17:38 2010
@@ -23,6 +23,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.db.ColumnFamily;
@@ -63,7 +64,7 @@ public class ReadResponseResolver implem
       * repair request should be scheduled.
       *
       */
-	public Row resolve(List<Message> responses) throws DigestMismatchException, IOException
+	public Row resolve(Collection<Message> responses) throws DigestMismatchException,
IOException
     {
         long startTime = System.currentTimeMillis();
 		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
@@ -159,7 +160,7 @@ public class ReadResponseResolver implem
         return resolved;
     }
 
-	public boolean isDataPresent(List<Message> responses)
+	public boolean isDataPresent(Collection<Message> responses)
 	{
         if (responses.size() < responseCount)
             return false;

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=921456&r1=921455&r2=921456&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
Wed Mar 10 17:17:38 2010
@@ -18,11 +18,15 @@
 
 package org.apache.cassandra.service;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -37,8 +41,8 @@ public class WriteResponseHandler implem
     protected static final Logger logger = Logger.getLogger( WriteResponseHandler.class );
     protected final SimpleCondition condition = new SimpleCondition();
     private final int responseCount;
-    protected final List<Message> responses;
-    protected int localResponses;
+    protected final Collection<Message> responses;
+    protected AtomicInteger localResponses = new AtomicInteger(0);
     private final long startTime;
 
     public WriteResponseHandler(int responseCount, String table)
@@ -49,7 +53,7 @@ public class WriteResponseHandler implem
             : "invalid response count " + responseCount;
 
         this.responseCount = responseCount;
-        responses = new ArrayList<Message>(responseCount);
+        responses = new LinkedBlockingQueue<Message>();
         startTime = System.currentTimeMillis();
     }
 
@@ -82,25 +86,21 @@ public class WriteResponseHandler implem
         }
     }
 
-    public synchronized void response(Message message)
+    public void response(Message message)
     {
-        if (condition.isSignaled())
-            return;
         responses.add(message);
         maybeSignal();
     }
 
-    public synchronized void localResponse()
+    public void localResponse()
     {
-        if (condition.isSignaled())
-            return;
-        localResponses++;
+        localResponses.addAndGet(1);
         maybeSignal();
     }
 
     private void maybeSignal()
     {
-        if (responses.size() + localResponses >= responseCount)
+        if (responses.size() + localResponses.get() >= responseCount)
         {
             condition.signal();
         }



Mime
View raw message