activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Christopher L. Shannon (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-5851) Unmatched acknowledge: MessageAck {commandId = 77, responseRequired = false, ackType = 2, ...Could not find Message-ID XXX in dispatched-list (start of ack)
Date Sun, 21 Jun 2015 02:27:00 GMT

    [ https://issues.apache.org/jira/browse/AMQ-5851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14594922#comment-14594922
] 

Christopher L. Shannon commented on AMQ-5851:
---------------------------------------------

[~gtully] and [~tabish121],

Sorry in advance for the long message but this is a kind of complicated issue and was pretty
hard to track down.

There is a race condition of sorts and it is caused by the unique circumstances of this test
case and the environment.  The short version is that message acks are being received out of
order due to messages being handled concurrently in different threads.  Sometimes this causes
an exception because by the time some of the acks are received by the broker, the broker has
already expired the messages so the acknowledgement check fails.  This is happening in Wildfly
because the container is creating a pool of threads to handle multiple messages at the same
time for 1 subscription.  This could happen just as easily in other applications if someone
spins off new threads to handle multiple messages in parallel as well.

What's going on is as a message producer is sending messages to the broker with a TTL value
set at 10 milliseconds, the Wildfly consumer is quickly dequeing messages in parallel.  The
run() method in ActiveMQSession is being executed for each new message in different threads.
 The first thing that happens is the message is checked to see if it has expired (around line
885 of ActiveMQSession).  If it's not expired, then the code continues on and the messageListener
(in this case the MDB) eventually runs (around line 1037). The example that was submitted
here has the MDB sleeping for 1 second which is  longer than the TTL of the messages being
sent.  So a bunch of threads are sleeping for 1 second and then when they are finished the
run() method continues on and at the end sends a normal acknowledgement back to the broker
in the finally block (around line 1052)

However, as the consumer is rapidly consuming more messages and spinning off threads, eventually
one of the threads runs into a message that is expired because the time out period is so short.
 Since the message is expired, it is acked early and sent to the broker.  The MDB never fires
when the message is expired.   This triggers the broker to iterate through all of the dispatched
messages and to expire ones that it can.  This happens in the PrefetchSubscription class around
line 325 when an expired Ack is received.  So, all of the messages that were dispatched essentially
get removed from that dispatch list since all of the messages are expired.  Then, when some
of the threads in Wildfly finish sleeping they continue on and get around to sending back
an ack.  Except that by the time this acknowledgement is sent to the broker, the messages
have already been expired because the thread that detected the expired message already sent
an expired ack to the broker so the message attempting to be acked doesn't exist in the dispatched
list.

Below is a printout from my log file with some extra debug statements that I added.  The "handling
message" line is a debug statement that I added to print right after a message is dequeued
in the run() method.  You can see multiple threads executing at the same time in Wildfly.
 "Expired Message" prints out when a message is detected as being expired. "Sending Ack" prints
out when the thread finishes the normal execution and sends a normal ack back to the broker.
 As you can see below, we run into trouble when the thread handling the 4th message finishes
and sends an acknowledgement to the broker.  During the period of time that the thread handling
message 4 was sleeping for 1 second, 13 other messages were handled in parallel.  Just before
thread 4 finished, one of the other threads had detected that message 13 expired and sent
back an expiration ack which triggered all of the acks to be expired(including message 4)
so by the time the ack for the 4th message reached the broker that message was already removed
from the dispatch list and we get an exception.

So, as far as solutions go, I'm not sure what the best approach would be.  This situation
could happen anytime messages are processed slowly in parallel and there is a TTL set.  Obviously
one work around is to not concurrently process messages off the same subscription but that
is pretty common with MDBs and some other environments.  One solution could be to check a
second time to see if the message has expired after the messageListener has been called and
send an expired ack instead of a normal ack in that case.  But we'd need to change the server
logic to not fail if it couldn't find the message to ack because it was already expired. 
I think another solution could be to keep track of of which messages have already been expired
on a subscription for some window of time so that so that if an acknowledgement comes later
it could be detected as being for an expired message and no error would be thrown.  What do
you guys thing?  I can work on a PR and introduce a fix but wanted to discuss what you thought
would be the best approach.

{noformat}
20:20:52,347 INFO  [org.apache.activemq.ra.ActiveMQEndpointWorker] (default-threads - 1) Successfully
established connection to broker [tcp://localhost:61616]
20:21:00,745 INFO  [stdout] (default-threads - 2) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:3:1
20:21:00,747 INFO  [stdout] (default-threads - 2) Expired Ack (ActiveMQSession) for: null,ID:localhost.localdomain-53475-1434846046033-2:1:1:3:1
20:21:00,763 INFO  [stdout] (default-threads - 3) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1
20:21:00,868 INFO  [stdout] (default-threads - 4) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:5:1
20:21:00,980 INFO  [stdout] (default-threads - 5) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:6:1
20:21:01,087 INFO  [stdout] (default-threads - 6) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:7:1
20:21:01,199 INFO  [stdout] (default-threads - 7) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:8:1
20:21:01,315 INFO  [stdout] (default-threads - 8) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:9:1
20:21:01,316 INFO  [stdout] (default-threads - 8) Expired Ack (ActiveMQSession) for: null,ID:localhost.localdomain-53475-1434846046033-2:1:1:9:1
20:21:01,423 INFO  [stdout] (default-threads - 9) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:10:1
20:21:01,539 INFO  [stdout] (default-threads - 10) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:11:1
20:21:01,655 INFO  [stdout] (default-threads - 11) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:12:1
20:21:01,768 INFO  [stdout] (default-threads - 12) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:13:1
20:21:01,806 INFO  [stdout] (default-threads - 12) Expired Ack (ActiveMQSession) for: null,ID:localhost.localdomain-53475-1434846046033-2:1:1:13:1
20:21:01,809 INFO  [stdout] (default-threads - 3) Hello world!3
20:21:01,810 INFO  [stdout] (default-threads - 3) Sending Ack (ActiveMQSession line 1010)
for: 2; ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1,ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1
20:21:01,872 INFO  [stdout] (default-threads - 4) Hello world!4
20:21:01,872 INFO  [stdout] (default-threads - 4) Sending Ack (ActiveMQSession line 1010)
for: 2; ID:localhost.localdomain-53475-1434846046033-2:1:1:5:1,ID:localhost.localdomain-53475-1434846046033-2:1:1:5:1
20:21:01,874 INFO  [stdout] (default-threads - 13) handling message: ID:localhost.localdomain-53475-1434846046033-2:1:1:14:1
20:21:01,888 ERROR [org.apache.activemq.ra.ActiveMQEndpointWorker] (ActiveMQ Connection Executor:
tcp://localhost/127.0.0.1:61616@43165) Connection to broker failed: Unmatched acknowledge:
MessageAck {commandId = 16, responseRequired = false, ackType = 2, consumerId = ID:localhost.localdomain-46056-1434846051491-1:1:-1:2,
firstMessageId = ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1, lastMessageId = ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1,
destination = queue://TEST.FOO, transactionId = null, messageCount = 1, poisonCause = null};
Could not find Message-ID ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1 in dispatched-list
(start of ack): javax.jms.JMSException: Unmatched acknowledge: MessageAck {commandId = 16,
responseRequired = false, ackType = 2, consumerId = ID:localhost.localdomain-46056-1434846051491-1:1:-1:2,
firstMessageId = ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1, lastMessageId = ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1,
destination = queue://TEST.FOO, transactionId = null, messageCount = 1, poisonCause = null};
Could not find Message-ID ID:localhost.localdomain-53475-1434846046033-2:1:1:4:1 in dispatched-list
(start of ack)
{noformat}


> Unmatched acknowledge: MessageAck {commandId = 77, responseRequired = false, ackType
= 2, ...Could not find Message-ID XXX in dispatched-list (start of ack)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-5851
>                 URL: https://issues.apache.org/jira/browse/AMQ-5851
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.10.0, 5.11.0, 5.11.1
>            Reporter: Grijesh Saini
>              Labels: ttl
>         Attachments: AcknowledgeIssue.zip
>
>
> When lot of messages got expired because of JMS client Time to Live (TTL) property then
below error will appear and consumer will freeze
> {code:xml}
>  Connection to broker failed: Unmatched acknowledge: MessageAck {commandId = 77, responseRequired
= false, ackType = 2, consumerId =XXX firstMessageId = ID:XXX
>  lastMessageId = ID:XXX
> , destination = queue://abc, transactionId = null, messageCount = 1, poisonCause = null};
Could not find Message-ID in dispatched-list (start of ack)
>          at org.apache.activemq.broker.region.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:477)
[activemq-broker-5.11.1.jar:5.11.1
>  at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:212)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:441)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:484)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:87) [activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:87) [activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:277)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:97)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:550)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.command.MessageAck.visit(MessageAck.java:245) [activemq-client-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
[activemq-broker-5.11.1.jar:5.11.1]
>  at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) [activemq-client-5.11.1.jar:5.11.1]
>  at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
[activemq-client-5.11.1.jar:5.11.1]
>  at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
[activemq-client-5.11.1.jar:5.11.1]
>  at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
[activemq-client-5.11.1.jar:5.11.1]
>  at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) [activemq-client-5.11.1.jar:5.11.1]
>  at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) [activemq-client-5.11.1.jar:5.11.1]
>  at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_25]
> {code}
> Steps to reproduce :
> 1. Enable TTL property for JMS client
> 2. Keep TTL value very low say 5 sec
> 3. Send lot of messages so some message will get expired
> 4. Make sure that some message should expired when they are in MDB means running inside
MDB
> Then we will see above error in the logs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message