cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1206095 - in /cassandra/branches/cassandra-1.0: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
Date Fri, 25 Nov 2011 09:40:01 GMT
Author: slebresne
Date: Fri Nov 25 09:39:58 2011
New Revision: 1206095

URL: http://svn.apache.org/viewvc?rev=1206095&view=rev
Log:
Avoid race in OutboundTcpConnection in multi-DC setups
patch by jbellis; reviewed by slebresne for CASSANDRA-3530

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Header.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Message.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1206095&r1=1206094&r2=1206095&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Fri Nov 25 09:39:58 2011
@@ -12,6 +12,7 @@
  * fix incorrect query results due to invalid max timestamp (CASSANDRA-3510)
  * fix ConcurrentModificationException in Table.all() (CASSANDRA-3529)
  * make sstableloader recognize compressed sstables (CASSANDRA-3521)
+ * avoids race in OutboundTcpConnection in multi-DC setups (CASSANDRA-3530)
 Merged from 0.8:
  * fix concurrence issue in the FailureDetector (CASSANDRA-3519)
  * fix array out of bounds error in counter shard removal (CASSANDRA-3514)

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1206095&r1=1206094&r2=1206095&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Fri Nov 25 09:39:58 2011
@@ -18,20 +18,16 @@
 
 package org.apache.cassandra.db;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
 
@@ -69,7 +65,7 @@ public class RowMutationVerbHandler impl
     private void forwardToLocalNodes(Message message, byte[] forwardBytes) throws UnknownHostException
     {
         // remove fwds from message to avoid infinite loop
-        message.removeHeader(RowMutation.FORWARD_HEADER);
+        Message messageCopy = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
 
         int bytesPerInetAddress = FBUtilities.getBroadcastAddress().getAddress().length;
         assert forwardBytes.length >= bytesPerInetAddress;
@@ -89,7 +85,7 @@ public class RowMutationVerbHandler impl
 
             // Send the original message to the address specified by the FORWARD_HINT
             // Let the response go back to the coordinator
-            MessagingService.instance().sendOneWay(message, address);
+            MessagingService.instance().sendOneWay(messageCopy, address);
 
             offset += bytesPerInetAddress;
         }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Header.java?rev=1206095&r1=1206094&r2=1206095&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Header.java Fri Nov
25 09:39:58 2011
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
 
 import java.io.*;
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.Hashtable;
 import java.util.Map;
 
@@ -27,6 +28,9 @@ import org.apache.cassandra.io.IVersione
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
 public class Header
 {
     private static IVersionedSerializer<Header> serializer_;
@@ -46,21 +50,21 @@ public class Header
     // and RowMutationVerbHandler.forwardToLocalNodes)
     private final InetAddress from_;
     private final StorageService.Verb verb_;
-    protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
+    protected final Map<String, byte[]> details_;
 
     Header(InetAddress from, StorageService.Verb verb)
     {
+        this(from, verb, Collections.<String, byte[]>emptyMap());
+    }
+
+    Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
+    {
         assert from != null;
         assert verb != null;
 
         from_ = from;
         verb_ = verb;
-    }
-
-    Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
-    {
-        this(from, verb);
-        details_ = details;
+        details_ = ImmutableMap.copyOf(details);
     }
 
     InetAddress getFrom()
@@ -78,14 +82,20 @@ public class Header
         return details_.get(key);
     }
 
-    void setDetail(String key, byte[] value)
+    Header withDetailsAdded(String key, byte[] value)
     {
-        details_.put(key, value);
+        Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
+        detailsCopy.put(key, value);
+        return new Header(from_, verb_, detailsCopy);
     }
 
-    void removeDetail(String key)
+    Header withDetailsRemoved(String key)
     {
-        details_.remove(key);
+        if (!details_.containsKey(key))
+            return this;
+        Map<String, byte[]> detailsCopy = Maps.newHashMap(details_);
+        detailsCopy.remove(key);
+        return new Header(from_, verb_, detailsCopy);
     }
 
     public int serializedSize()

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Message.java?rev=1206095&r1=1206094&r2=1206095&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/Message.java Fri Nov
25 09:39:58 2011
@@ -50,14 +50,14 @@ public class Message
         return header_.getDetail(key);
     }
     
-    public void setHeader(String key, byte[] value)
+    public Message withHeaderAdded(String key, byte[] value)
     {
-        header_.setDetail(key, value);
+        return new Message(header_.withDetailsAdded(key, value), body_, version);
     }
     
-    public void removeHeader(String key)
+    public Message withHeaderRemoved(String key)
     {
-        header_.removeDetail(key);
+        return new Message(header_.withDetailsRemoved(key), body_, version);
     }
 
     public byte[] getMessageBody()

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=1206095&r1=1206094&r2=1206095&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Fri Nov 25 09:39:58 2011
@@ -118,9 +118,12 @@ public class OutboundTcpConnection exten
                 out.flush();
             }
         }
-        catch (IOException e)
+        catch (Exception e)
         {
-            if (logger.isDebugEnabled())
+            // Non IO exceptions is likely a programming error so let's not silence it
+            if (!(e instanceof IOException))
+                logger.error("error writing to " + poolReference.endPoint(), e);
+            else if (logger.isDebugEnabled())
                 logger.debug("error writing to " + poolReference.endPoint(), e);
             disconnect();
         }

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1206095&r1=1206094&r2=1206095&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
Fri Nov 25 09:39:58 2011
@@ -397,7 +397,7 @@ public class StorageProxy implements Sto
                 Message message = messages.getKey();
                 // a single message object is used for unhinted writes, so clean out any
forwards
                 // from previous loop iterations
-                message.removeHeader(RowMutation.FORWARD_HEADER);
+                message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
 
                 if (dataCenter.equals(localDataCenter))
                 {
@@ -411,21 +411,14 @@ public class StorageProxy implements Sto
                     Iterator<InetAddress> iter = messages.getValue().iterator();
                     InetAddress target = iter.next();
                     // Add all the other destinations of the same message as a header in
the primary message.
+                    FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
+                    DataOutputStream dos = new DataOutputStream(bos);
                     while (iter.hasNext())
                     {
                         InetAddress destination = iter.next();
-                        // group all nodes in this DC as forward headers on the primary message
-                        FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-                        DataOutputStream dos = new DataOutputStream(bos);
-
-                        // append to older addresses
-                        byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
-                        if (previousHints != null)
-                            dos.write(previousHints);
-
                         dos.write(destination.getAddress());
-                        message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
                     }
+                    message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
                     // send the combined message + forward headers
                     MessagingService.instance().sendRR(message, target, handler);
                 }



Mime
View raw message