activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tommy Lindgren (Issue Comment Edited) (JIRA)" <j...@apache.org>
Subject [jira] [Issue Comment Edited] (AMQ-3605) NullPointerException in TransportConnection
Date Mon, 12 Dec 2011 14:34:30 GMT

    [ https://issues.apache.org/jira/browse/AMQ-3605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13167478#comment-13167478
] 

Tommy Lindgren edited comment on AMQ-3605 at 12/12/11 2:32 PM:
---------------------------------------------------------------

I've done some testing against 5.6-SNAPSHOT (as of 2011-11-23). I don't see the NPE anymore
but clients closing the socket in an unclean fashion seems to still be a problem. 

It looks like my affected client now hangs on the SEND call. I can reproduce this problem
within minutes by stressing my clients while a script repeatedly forks, connects to AMQ, subscribes
to a topic and then calls _exit() (to make sure the socket isn't closed by the garbage collector).

The resulting AMQ log is 200MB+ and may contain sensitive information, so I can't attach it
as it is. Just before the client hangs I see (log excerpts slightly edited, message bodies
and headers are removed):

{noformat}
2011-12-12 10:56:21,776 | TRACE | Received: 
SEND
[...]

2011-12-12 10:56:21,776 | DEBUG | message is ActiveMQTextMessage {commandId = 2262, responseRequired
= false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1:
2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1,
destination = topic://change.TRANSFER, trans
actionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775,
brokerOutTime = 1323683781775, correlationId = null, replyTo = n
ull, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence
= 0, targetConsumerId = null, compressed = false, userID = null,
 content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
size = 1507, properties = {[...]}, readOnlyProperties = true, readO
nlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?>
<atom:...atom:entry>
}
2011-12-12 10:56:21,776 | DEBUG | Message available, but continuation is already resumed.
 Buffer for next time.
2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.io.EOFException
java.io.EOFException
        at java.io.DataInputStream.readByte(DataInputStream.java:250)
        at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:155)
        at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:148)
        at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:170)
        at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:98)
        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:229)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:221)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
        at java.lang.Thread.run(Thread.java:662)
2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:58:1:1,
started=true } continuation=org.eclipse.jetty.s
erver.AsyncContinuation@e9b412d@IDLE,initial
2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:57:1:1,
started=true } continuation=org.eclipse.jetty.s
erver.AsyncContinuation@50afeae@IDLE,initial
2011-12-12 10:56:21,776 | TRACE | org.apache.activemq.broker.region.cursors.QueueStorePrefetch@73a2335d:daemon.archived,batchResetNeeded=false,storeHasMessages=true,si
ze=0,cacheEnabled=true - fillBatch
2011-12-12 10:56:21,777 | TRACE | ID:dev01-54928-1323379526715-8:58:1:1 received message:
MessageDispatch {commandId = 391, responseRequired = false, consumerId = I
D:dev01-54928-1323379526715-8:58:1:1, destination = topic://change.TRANSFER, message = ActiveMQTextMessage
{commandId = 2262, responseRequired = false, messageId = 
ID:dev01-54928-1323379526715-4:15980:-1:1:2254, originalDestination = null, originalTransactionId
= null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1,
 destination = topic://change.TRANSFER, transactionId = null, expiration = 0, timestamp =
1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 132
3683781775, correlationId = null, replyTo = null, persistent = false, type = EVENT.ResourceChangeEvent,
priority = 4, groupID = null, groupSequence = 0, targetConsumer
Id = null, compressed = false, userID = null, content = null, marshalledProperties = null,
dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text = <?xml version="1.0"
encoding="UTF-8"?>
<atom:...atom:entry>
}, redeliveryCounter = 0}
2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.net.SocketException: Broken pipe
java.net.SocketException: Broken pipe
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
        at java.io.DataOutputStream.flush(DataOutputStream.java:106)
        at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:184)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToStomp(StompTransportFilter.java:97)
        at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:99)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746)
        at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64)
        at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262)
        at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
        at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281)
        at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830)
        at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
{noformat}

The "Received: SEND" message is from my affected client. I see a bunch of such messages after
the SocketException before the client hangs two seconds later. I assume the client hangs because
some buffer at some level is full. I can't see anything obvious explaining why AMQ stops processing
the client's messages.

Occasionally I also see the following exception:

{noformat}
java.lang.IllegalStateException: Cannot lookup a producer from a connection that had not been
registered: ID:dev01-54928-1323379526715-4:59
        at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:93)
        at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1438)
        at org.apache.activemq.broker.TransportConnection.getProducerBrokerExchange(TransportConnection.java:1314)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:468)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87)
        at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:166)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:292)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:204)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:76)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
{noformat}

This exception is triggered more often when I do the stress test but sometimes it doesn't
appear to have been thrown at all when the client has hanged.

Not sure how useful these log excerpts are. I haven't tested the attached patch yet. If you
think it will help I can try to build AMQ myself.


                
      was (Author: tomyl):
    I've done some testing against 5.6-SNAPSHOT (as of 2011-11-23). I don't see the NPE anymore
but clients closing the socket in an unclean fashion seems to still be a problem. 

It looks like my affected client now hangs on the SEND call. I can reproduce this problem
within minutes by stressing my clients while a script repeatedly forks, connects to AMQ, subscribes
to a topic and then calls _exit() (to make sure the socket isn't closed by the garbage collector).

The resulting AMQ log is 200MB+ and may contain sensitive information, so I can't attach it
as it is. Just before the client hangs I see (log excerpts slightly edited):

{noformat}
2011-12-12 10:56:21,776 | TRACE | Received: 
SEND
[...]

2011-12-12 10:56:21,776 | DEBUG | message is ActiveMQTextMessage {commandId = 2262, responseRequired
= false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1:
2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1,
destination = topic://change.TRANSFER, trans
actionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775,
brokerOutTime = 1323683781775, correlationId = null, replyTo = n
ull, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence
= 0, targetConsumerId = null, compressed = false, userID = null,
 content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
size = 1507, properties = {[...]}, readOnlyProperties = true, readO
nlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?>
<atom:...atom:entry>
}
2011-12-12 10:56:21,776 | DEBUG | Message available, but continuation is already resumed.
 Buffer for next time.
2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.io.EOFException
java.io.EOFException
        at java.io.DataInputStream.readByte(DataInputStream.java:250)
        at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:155)
        at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:148)
        at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:170)
        at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:98)
        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:229)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:221)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
        at java.lang.Thread.run(Thread.java:662)
2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:58:1:1,
started=true } continuation=org.eclipse.jetty.s
erver.AsyncContinuation@e9b412d@IDLE,initial
2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:57:1:1,
started=true } continuation=org.eclipse.jetty.s
erver.AsyncContinuation@50afeae@IDLE,initial
2011-12-12 10:56:21,776 | TRACE | org.apache.activemq.broker.region.cursors.QueueStorePrefetch@73a2335d:daemon.archived,batchResetNeeded=false,storeHasMessages=true,si
ze=0,cacheEnabled=true - fillBatch
2011-12-12 10:56:21,777 | TRACE | ID:dev01-54928-1323379526715-8:58:1:1 received message:
MessageDispatch {commandId = 391, responseRequired = false, consumerId = I
D:dev01-54928-1323379526715-8:58:1:1, destination = topic://change.TRANSFER, message = ActiveMQTextMessage
{commandId = 2262, responseRequired = false, messageId = 
ID:dev01-54928-1323379526715-4:15980:-1:1:2254, originalDestination = null, originalTransactionId
= null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1,
 destination = topic://change.TRANSFER, transactionId = null, expiration = 0, timestamp =
1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 132
3683781775, correlationId = null, replyTo = null, persistent = false, type = EVENT.ResourceChangeEvent,
priority = 4, groupID = null, groupSequence = 0, targetConsumer
Id = null, compressed = false, userID = null, content = null, marshalledProperties = null,
dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]},
readOnlyProperties = true, readOnlyBody = true, droppable = false, text = <?xml version="1.0"
encoding="UTF-8"?>
<atom:...atom:entry>
}, redeliveryCounter = 0}
2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.net.SocketException: Broken pipe
java.net.SocketException: Broken pipe
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
        at java.io.DataOutputStream.flush(DataOutputStream.java:106)
        at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:184)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToStomp(StompTransportFilter.java:97)
        at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:99)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746)
        at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64)
        at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262)
        at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
        at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281)
        at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830)
        at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
{noformat}

The "Received: SEND" message is from my affected client. I see a bunch of such messages after
the SocketException before the client hangs two seconds later. I assume the client hangs because
some buffer at some level is full. I can't see anything obvious explaining why AMQ stops processing
the client's messages.

Occasionally I also see the following exception:

{noformat}
java.lang.IllegalStateException: Cannot lookup a producer from a connection that had not been
registered: ID:dev01-54928-1323379526715-4:59
        at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:93)
        at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1438)
        at org.apache.activemq.broker.TransportConnection.getProducerBrokerExchange(TransportConnection.java:1314)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:468)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87)
        at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:166)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:292)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:204)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:76)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
{noformat}

This exception is triggered more often when I do the stress test but sometimes it doesn't
appear to have been thrown at all when the client has hanged.

Not sure how useful these log excerpts are. I haven't tested the attached patch yet. If you
think it will help I can try to build AMQ myself.


                  
> NullPointerException in TransportConnection
> -------------------------------------------
>
>                 Key: AMQ-3605
>                 URL: https://issues.apache.org/jira/browse/AMQ-3605
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.5.0
>         Environment: SLES 11 SP1
> java version "1.6.0_23"
> perl 5.10.0 + Net::Stomp 0.38_99
>            Reporter: Tommy Lindgren
>            Priority: Critical
>              Labels: stomp
>         Attachments: AMQ-3605-Patch.txt, StompDedicatedTaskRunnerTests.java, StompDedicatedTaskRunnerTests.java
>
>
> I'm running ActiveMQ 5.5.0 and clients using Net::Stomp 0.38_99 and I'm seeing infrequent
NullPointerExceptions in TransportConnection:
> {noformat}
> Exception in thread "ActiveMQ Connection Dispatcher: /172.31.201.11:50607" java.lang.NullPointerException
>         at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:327)
>         at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>         at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
>         at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
>         at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:79)
>         at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:596)
>         at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:58)
>         at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>         at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
>         at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
>         at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:851)
>         at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:104)
>         at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)
> {noformat}
> This seems to happen 1-2 times per month or so but the result is dire: new messages aren't
delivered to the affected client (you can see the number of pending messages increasing in
the admin web interface) until the client or ActiveMQ is restarted.
> Relevant code snippet from TransportConnection.java,
> {noformat}
> 326         if (context != null) {
> 327             if (context.isDontSendReponse()) {
> {noformat}
> implies that we are dealing with a race condition. I'm not familiar with the ActiveMQ
code base but it looks like it grabs a lock (serviceLock) before entering that function, so
not sure what's going on.
> Since there's no timestamp associated with the stack trace I'm not completly sure what's
going on on the client side. I've tried to reproduce it by writing a small script that uses
Net::Stomp in a similar way to my real clients, but no luck so far.
> No idea if it's relevant, but my affected clients have been both consuming and producing,
and sending/receiving on both topics and queues.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message