cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r787765 - in /incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net: SelectionKeyHandler.java TcpConnection.java UdpConnection.java http/HttpConnection.java
Date Tue, 23 Jun 2009 17:55:39 GMT
Author: jbellis
Date: Tue Jun 23 17:55:39 2009
New Revision: 787765

URL: http://svn.apache.org/viewvc?rev=787765&view=rev
Log:
Under heavy load and large column values, we still saw lockups in tcp connection. Here is
the problem. The
following code that sets the interest ops seems innocent, but it's the source of the problem.
The reason is
that this operation is not atomic. Another thread could sneak in between the reading of the
ops and the
setting of it. As a result, some wrong bits could be set.
   key_.interestOps(key_.interestOps() | SelectionKey.OP_READ)

This is a sequence that demonstrates how we can lose the OP_READ bit forever and thus jam
the read channel:
1. Thread 1: we want to write a message and in write(Message) we are about to turn on OP_WRITE
because the message can't be written in one shot.
2. Thread 2: a read comes in and in read(SelectionKey), we turn off OP_READ and submit the
read request to ReadWorkItem in Thread 3.
3. Thread 1: read interestOps and see OP_READ as off.
4. Thread 3: finished processing the read request and turn OP_READ on
5. Thread 1: resumes and turn on OP_WRITE. However, by doing that, we also turned off OP_READ.
The read channel is thus blocked forever after this.

patch by Jun Rao; reviewed by jbellis for CASSANDRA-220

Modified:
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java
    incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
(original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/SelectionKeyHandler.java
Tue Jun 23 17:55:39 2009
@@ -65,4 +65,20 @@
     {
         throw new UnsupportedOperationException("write() cannot be called on " + getClass().getName()
+ "!");
     }
+    
+    protected static void turnOnInterestOps(SelectionKey key, int ops)
+    {
+        synchronized(key)
+        {
+            key.interestOps(key.interestOps() | ops);
+        }
+    }
+    
+    protected static void turnOffInterestOps(SelectionKey key, int ops)
+    {
+        synchronized(key)
+        {
+            key.interestOps(key.interestOps() & (~ops) );
+        }
+    }
 }

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
(original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/TcpConnection.java
Tue Jun 23 17:55:39 2009
@@ -183,7 +183,7 @@
                 if (buffer.remaining() > 0) 
                 {                   
                     pendingWrites_.add(buffer);
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                 }
             }
         }
@@ -229,7 +229,7 @@
                     if (buffer.remaining() > 0)
                     {
                         pendingWrites_.add(buffer);
-                        key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                         condition_.await();
                     }
                 }
@@ -245,7 +245,7 @@
                 */
                 if ( bytesTransferred < limit && bytesWritten != total )
                 {                    
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                     condition_.await();
                 }
             }
@@ -346,17 +346,20 @@
     // called in the selector thread
     public void connect(SelectionKey key)
     {       
-        key.interestOps(key.interestOps() & (~SelectionKey.OP_CONNECT));
+        turnOffInterestOps(key, SelectionKey.OP_CONNECT);
         try
         {
             if (socketChannel_.finishConnect())
             {
-                key.interestOps(key.interestOps() | SelectionKey.OP_READ);
+                turnOnInterestOps(key, SelectionKey.OP_READ);
                 
-                // this will flush the pending                
-                if (!pendingWrites_.isEmpty()) 
+                synchronized(this)
                 {
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    // this will flush the pending                
+                    if (!pendingWrites_.isEmpty()) 
+                    {
+                        turnOnInterestOps(key_, SelectionKey.OP_WRITE);
+                    }
                 }
                 resumeStreaming();
             } 
@@ -376,7 +379,7 @@
     // called in the selector thread
     public void write(SelectionKey key)
     {   
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );          
     
+        turnOffInterestOps(key, SelectionKey.OP_WRITE);                
         doPendingWrites();
         /*
          * This is executed only if we are in streaming mode.
@@ -415,7 +418,7 @@
             {    
                 if (!pendingWrites_.isEmpty())
                 {                    
-                    key_.interestOps(key_.interestOps() | SelectionKey.OP_WRITE);
+                    turnOnInterestOps(key_, SelectionKey.OP_WRITE);
                 }
             }
         }
@@ -424,7 +427,7 @@
     // called in the selector thread
     public void read(SelectionKey key)
     {
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+        turnOffInterestOps(key, SelectionKey.OP_READ);
         // publish this event onto to the TCPReadEvent Queue.
         MessagingService.getReadExecutor().execute(readWork_);
     }
@@ -486,7 +489,7 @@
             }
             finally
             {
-                key_.interestOps(key_.interestOps() | SelectionKey.OP_READ);
+                turnOnInterestOps(key_, SelectionKey.OP_READ);
             }
         }
         

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java
(original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/UdpConnection.java
Tue Jun 23 17:55:39 2009
@@ -131,7 +131,7 @@
     
     public void read(SelectionKey key)
     {        
-        key.interestOps( key.interestOps() & (~SelectionKey.OP_READ) );
+        turnOffInterestOps(key, SelectionKey.OP_READ);
         ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
         try
         {
@@ -160,7 +160,7 @@
         }
         finally
         {
-            key.interestOps( key_.interestOps() | SelectionKey.OP_READ );
+            turnOnInterestOps(key_, SelectionKey.OP_READ );
         }
     }
 }

Modified: incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java?rev=787765&r1=787764&r2=787765&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java
(original)
+++ incubator/cassandra/branches/cassandra-0.3/src/java/org/apache/cassandra/net/http/HttpConnection.java
Tue Jun 23 17:55:39 2009
@@ -136,7 +136,7 @@
             httpChannel_ = (SocketChannel)key.channel();
         }
         /* deregister interest for read */
-        key.interestOps( key.interestOps() & ( ~SelectionKey.OP_READ ) );
+        turnOffInterestOps(key, SelectionKey.OP_READ);
         /* Add a task to process the HTTP request */
         MessagingService.getReadExecutor().execute(httpReader_);
     }
@@ -330,7 +330,7 @@
         }
         finally
         {
-            httpKey_.interestOps(httpKey_.interestOps() | SelectionKey.OP_READ);
+            turnOnInterestOps(httpKey_, SelectionKey.OP_READ);
         }
     }
 



Mime
View raw message