qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r1324655 - in /qpid/branches/0.16/qpid/java: ./ client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/unit/close/
Date Wed, 11 Apr 2012 09:00:54 GMT
Author: robbie
Date: Wed Apr 11 09:00:54 2012
New Revision: 1324655

URL: http://svn.apache.org/viewvc?rev=1324655&view=rev
Log:
QPID-3911: Fix deadlock on concurrent invocation of MessageConsumer#close() and Session#rollback()
from consumer MessageListener
    
This patch contains the following changes:
- Add synchronization on AMSession#_messageDeliveryLock into MessageConsumer#close() in order
to block until message listener in progress has completed(as required in JMS javadoc for MessageConsumer#close()).
- Change the session dispatcher to stop messages delivery into consumer local message queue
if the consumer in the process of closing. This eliminates the need to stop the dispatcher
on rejecting pending messages for closing consumer.
- Remove the synchronization on the dispatcher lock from AMQSession.Dispatcher#rejectPending
and code to stop the dispatcher, as we are synchronizing on the deliveryLock now and incoming
messages are not dispatched into closing consumers anymore.
- Add a system test to reproduce the deadlock and verify its resolution.
    
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>

merged from trunk r1310275

Added:
    qpid/branches/0.16/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
      - copied unchanged from r1310275, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Modified:
    qpid/branches/0.16/qpid/java/   (props changed)
    qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Propchange: qpid/branches/0.16/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1310275

Modified: qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1324655&r1=1324654&r2=1324655&view=diff
==============================================================================
--- qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++ qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Apr 11 09:00:54 2012
@@ -3206,28 +3206,15 @@ public abstract class AMQSession<C exten
 
         public void rejectPending(C consumer)
         {
-            synchronized (_lock)
-            {
-                boolean stopped = connectionStopped();
+            // Reject messages on pre-receive queue
+            consumer.rollbackPendingMessages();
 
-                if (!stopped)
-                {
-                    setConnectionStopped(true);
-                }
+            // Reject messages on pre-dispatch queue
+            rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
 
-                // Reject messages on pre-receive queue
-                consumer.rollbackPendingMessages();
+            // closeConsumer
+            consumer.markClosed();
 
-                // Reject messages on pre-dispatch queue
-                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
-                //Let the dispatcher deal with this when it gets to them.
-
-                // closeConsumer
-                consumer.markClosed();
-
-                setConnectionStopped(stopped);
-
-            }
         }
 
         public void rollback()
@@ -3419,7 +3406,7 @@ public abstract class AMQSession<C exten
         {
             final C consumer = _consumers.get(message.getConsumerTag());
 
-            if ((consumer == null) || consumer.isClosed())
+            if ((consumer == null) || consumer.isClosed() || consumer.isClosing())
             {
                 if (_dispatcherLogger.isInfoEnabled())
                 {

Modified: qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1324655&r1=1324654&r2=1324655&view=diff
==============================================================================
--- qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++ qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Wed Apr 11 09:00:54 2012
@@ -590,7 +590,10 @@ public abstract class BasicMessageConsum
                         // no point otherwise as the connection will be gone
                         if (!_session.isClosed() || _session.isClosing())
                         {
-                            sendCancel();
+                            synchronized(_session.getMessageDeliveryLock())
+                            {
+                                sendCancel();
+                            }
                             cleanupQueue();
                         }
                     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message