activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tom Kaitchuck (JIRA)" <j...@apache.org>
Subject [jira] Created: (AMQ-1070) Deadlock in Queue.java
Date Fri, 24 Nov 2006 23:13:02 GMT
Deadlock in Queue.java
----------------------

                 Key: AMQ-1070
                 URL: https://issues.apache.org/activemq/browse/AMQ-1070
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 4.0.2
            Reporter: Tom Kaitchuck


It is possible to have a deadlock as follows:

"ActiveMQ Transport: tcp:///127.0.0.1:53335":
        at org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:66)
        - waiting to lock <0x90786240> (a org.apache.activemq.broker.region.QueueSubscription)
        at org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:192)
        - locked <0x908fa480> (a java.util.LinkedList)
        at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:93)
        - locked <0x903b9b40> (a java.lang.Object)
        at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:221)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
        at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:142)
        at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
        at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:143)
        at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:182)
        - locked <0x908e6cb8> (a java.lang.Object)
        at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:297)
        at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
        at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:78)
        at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
        at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:87)
        at org.apache.activemq.broker.AbstractConnection.processAddConsumer(AbstractConnection.java:538)
        at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:296)
        at org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
        at java.lang.Thread.run(Thread.java:595)
"ActiveMQ Transport: tcp:///127.0.0.1:53315":
        at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:321)
        - waiting to lock <0x908fa480> (a java.util.LinkedList)
        at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:315)
        at org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:54)
        at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:125)
        - locked <0x90786240> (a org.apache.activemq.broker.region.QueueSubscription)
        at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:265)
        at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:366)
        at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
        at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
        at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:79)
        at org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:445)
        at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179)
        at org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
        at java.lang.Thread.run(Thread.java:595)


The simple solution is in AbstractReagion.java:
@@ -89,10 +89,12 @@
             // Add all consumers that are interested in the destination.
             for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
                 Subscription sub = (Subscription) iter.next();
+                synchronized (sub) {
                 if( sub.matches(destination) ) {
                     dest.addSubscription(context, sub);
                 }
             }
+            }
             return dest;
         }
     }
@@ -104,11 +106,13 @@
         if( timeout == 0 ) {
             for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
                 Subscription sub=(Subscription) iter.next();
+                synchronized (sub) {
                 if(sub.matches(destination)){
                     throw new JMSException("Destination still has an active subscription:
"+destination);
                 }
             }
         }
+        }

         if( timeout > 0 ) {
             // TODO: implement a way to notify the subscribers that we want to take the down
@@ -125,10 +129,12 @@
                 // timeout<0 or we timed out, we now force any remaining subscriptions
to un-subscribe.
                 for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
                     Subscription sub=(Subscription) iter.next();
+                    synchronized (sub) {
                     if(sub.matches(destination)){
                         dest.removeSubscription(context, sub);
                     }
                 }
+                }

                 destinationMap.removeAll(destination);
                 dest.dispose(context);
@@ -173,7 +179,8 @@
             // broker will not see a destination that exists in persistent store.  We may
want to
             // eagerly load all destinations into the broker but have an inactive state for
the
             // destination which has reduced memory usage.
-            //
+            synchronized (sub)
+            {
             if( persistenceAdapter!=null ) {
                 Set inactiveDests = getInactiveDestinations();
                 for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
@@ -183,6 +190,7 @@
                     }
                 }
             }
+            }

             subscriptions.put(info.getConsumerId(), sub);

@@ -193,16 +201,16 @@
             // no mutex held. Remove is only essentially run once
             // so everything after this point would be leaked.

+            synchronized (sub) {
             // Add the subscription to all the matching queues.
             for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();)
{
                 Destination dest = (Destination) iter.next();
                 dest.addSubscription(context, sub);
             }
-
             if( info.isBrowser() ) {
                 ((QueueBrowserSubscription)sub).browseDone();
             }
-
+            }
             return sub;
         }
     }


-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message