activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (JIRA)" <j...@apache.org>
Subject [jira] Assigned: (AMQ-3185) Closing a VMTransport can cause all other VMTransports to be prematurely closed
Date Thu, 24 Feb 2011 16:50:38 GMT

     [ https://issues.apache.org/jira/browse/AMQ-3185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Timothy Bish reassigned AMQ-3185:
---------------------------------

    Assignee: Timothy Bish

> Closing a VMTransport can cause all other VMTransports to be prematurely closed
> -------------------------------------------------------------------------------
>
>                 Key: AMQ-3185
>                 URL: https://issues.apache.org/jira/browse/AMQ-3185
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Assignee: Timothy Bish
>            Priority: Critical
>         Attachments: VMTransportClosureTest.java, patch.diff
>
>
> Symptom
> =======
> We have eight servers running AMQ 5.3.1 connected in a network-of-brokers over HTTP.
 Each broker maintains local connections to internal consumers using the VM transport.  We
were noticing that about once every day, all the local VM connections on a broker and the
outbound network bridge connections would fail with the following error:
> 2010-12-10 04:29:11,663 [processBroker-process-pool-thread-4] ERROR - The worker encountered
an exception and will pause for 5 seconds before continuing.
> javax.jms.JMSException: Peer (vm://broker-mbus-200005#1052452) disposed.
> 	at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
> 	at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:453)
> 	at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:570)
> 	at com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:33)
> 	at com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:95)
> 	at com.invoqsystems.foundation.component.communication.jms.worker.MessageProcessingWorker.getTask(MessageProcessingWorker.java:9)
> 	at com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.iterate(AbstractWorker.java:14)
> 	at com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.runUntilStop(AbstractWorker.java:17)
> 	at com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.run(AbstractWorker.java:41)
> 	at java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.activemq.transport.TransportDisposedIOException: Peer (vm://broker-mbus-200005#1052452)
disposed.
> 	at org.apache.activemq.transport.vm.VMTransport.stop(VMTransport.java:70)
> 	at org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
> 	at org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
> 	at org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:132)
> 	at org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:956)
> 	at org.apache.activemq.broker.TransportConnection$3.run(TransportConnection.java:918)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> 	... 1 more
> This was quite unexpected since we create the VM transport connection at broker startup
and maintain it (the single connection) throughout the life of the application.  The connection
is only closed when the process terminates and the broker is stopped.
> Since we wrote our consumers against the JMS spec, we handle periodic connection failures
by creating a new connection.  This works fine sometimes; however, because of AMQ-3127, the
sudden reregistration of our consumers simultaneously occurring with bridge re-creation causes
frequent deadlock that can only be resolved by restarting the systems.
> Cause
> =====
> We were unable to reliably recreate the failure, so it became clear that a timing issue
was involved.  Eventually, we determined the cause of the VM transport failure was due to
the following code in VMTransportServer:
> {code:title=VMTransportServer.java}
>     public VMTransport connect() throws IOException {
>         TransportAcceptListener al;
>         synchronized (this) {
>             if (disposed) {
>                 throw new IOException("Server has been disposed.");
>             }
>             al = acceptListener;
>         }
>         if (al == null) {
>             throw new IOException("Server TransportAcceptListener is null.");
>         }
>         connectionCount.incrementAndGet();
>         VMTransport client = new VMTransport(location) {
>             public void stop() throws Exception {
>                 if (disposed) {
>                     return;
>                 }
>                 super.stop();
>                 if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect)
{
>                     VMTransportServer.this.stop();
>                 }
>             };
>         };
>         VMTransport server = new VMTransport(location);
>         client.setPeer(server);
>         server.setPeer(client);
>         al.onAccept(configure(server));
>         return client;
>     }
> {code}
> At issue is the override VMTransport.stop() method:
> {code:title=VMTransportServer.java}
>             public void stop() throws Exception {
>                 if (disposed) {
>                     return;
>                 }
>                 super.stop();
>                 if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect)
{
>                     VMTransportServer.this.stop();
>                 }
>             };
> {code}
> Note that VMTransport.disposed is used to protect against multiple calls and subsequently
multiple decrements of connectionCount.  However, in the implementation of super.stop(), the
disposed flag is only set after the peer transport is informed of the stop:
> {code:title=VMTransport.java}
>     public void stop() throws Exception {
>         stopping.set(true);
>         
>         // If stop() is called while being start()ed.. then we can't stop until we return
to the start() method.
>         if( enqueueValve.isOn() ) {
>         	
>             // let the peer know that we are disconnecting..
>             try {
> >>>                peer.transportListener.onCommand(new ShutdownInfo());
>             } catch (Exception ignore) {
>             }
>         	
>         	
>             TaskRunner tr = null;
>             try {
>                 enqueueValve.turnOff();
>                 if (!disposed) {
>                     started = false;
> >>>                    disposed = true;
>                     if (taskRunner != null) {
>                         tr = taskRunner;
>                         taskRunner = null;
>                     }
>                 }
>             } finally {
>                 stopping.set(false);
>                 enqueueValve.turnOn();
>             }
>             if (tr != null) {
>                 tr.shutdown(1000);
>             }
>             
>         }
>         
>     }
> {code}
> TransportConnection implements of peer.transportListener.onCommand(new ShutdownInfo())
by launching a asynchronous task that eventually calls back to the same transport that initiated
the closure.  If the timing is right, VMTransportServer's VMTransport.stop() method is called
a second time before the disposed flag is set to true.  As a result, the connectionCount is
decremented *TWICE* instead of just once.
> In other words, the diposed check and decrement as implemented by VMTransport's anonymous
VMTransport subclass are not thread-safe.  If VMTransportServer miscounts the connections,
it can end up stopping itself while there are still live connections.  The result is that
the live connections see their peer (the server part of the VMTransport) unexpectedly closed.
> Solution
> ========
> The attached patch prevents multiple decrements of the connectionCount by preventing
reentrant calls to VMTransportServer's VMTransport stop() method.
> A patch is included which demonstrates the problem with the existing AMQ trunk code.

-- 
This message is automatically generated by JIRA.
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message