activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gary Tully (JIRA)" <j...@apache.org>
Subject [jira] Commented: (AMQ-2123) Intermittent Test failure: DuplexNetworkTest.testDurableStoreAndForward (org.apache.activemq.network) - java.lang.IllegalStateException: Message id ID:... could not be recovered from the data store - already dispatched
Date Wed, 25 Feb 2009 22:56:00 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-2123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=49997#action_49997
] 

Gary Tully commented on AMQ-2123:
---------------------------------

the problem relates to the number of consumers associated with a topic message send. If 10
consumers are present when the message is persisted, and 11 when the message is dispatched,
the ack from the 11th will remove the message. redispatch to any of the others can then fail
with the above exception.

With the AMQPersistenceAdapter, writes are batched, so the reference store is updated async.
At the point of update, the reference store prepares the required acks for each subscriber
in order to keep the message reference around till all subscribers have acked.
The problem arises when the consumer list is updated and another consumer (one that is not
in the count that is persisted) gets the message. The set of subscribers used during dispatch
is independent of the set persisted. This is a problem. The logic that sets up the acks based
on the subscription list is at org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.addMessageReference(ConnectionContext,
MessageId, ReferenceData)
One fix is to serialize dispatch with a flush to the store and with subscriber additions.I
think this will lock up the dispatch logic quite a bit.

The logic in org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.acknowledgeReference(ConnectionContext,
String, String, MessageId) can deal with no reference, the case where a message has not been
persisted, but it cannot deal with the case of a persisted message and an additional subscriber.
Adding the logic to not remove a reference if it is referenced from another subscription resolves
the issue. 

> Intermittent Test failure: DuplexNetworkTest.testDurableStoreAndForward  (org.apache.activemq.network)
- java.lang.IllegalStateException: Message id ID:... could not be recovered from the data
store - already dispatched
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2123
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2123
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.2.0
>         Environment: linux rh
>            Reporter: Gary Tully
>            Assignee: Gary Tully
>             Fix For: 5.3.0
>
>
> {code}
> DuplexNetworkTest.testDurableStoreAndForward  (org.apache.activemq.network)
>     javax.jms.JMSException: java.lang.RuntimeException: java.lang.IllegalStateException:
Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data
store - already dispatched
>     at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
>     at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1255)
>     Show details »
>     « Hide details
>     javax.jms.JMSException: java.lang.RuntimeException: java.lang.IllegalStateException:
Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data
store - already dispatched
>     at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:49)
>     at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1255)
>     at org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1805)
>     at org.apache.activemq.ActiveMQMessageConsumer.&init&(ActiveMQMessageConsumer.java:225)
>     at org.apache.activemq.ActiveMQTopicSubscriber.&init&(ActiveMQTopicSubscriber.java:117)
>     at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1207)
>     at org.apache.activemq.ActiveMQSession.createDurableSubscriber(ActiveMQSession.java:1152)
>     at org.apache.activemq.network.SimpleNetworkTest.testDurableStoreAndForward(SimpleNetworkTest.java:127)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:585)
>     at junit.framework.TestCase.runTest(TestCase.java:154)
>     at junit.framework.TestCase.runBare(TestCase.java:127)
>     at junit.framework.TestResult$1.protect(TestResult.java:106)
>     at junit.framework.TestResult.runProtected(TestResult.java:124)
>     at junit.framework.TestResult.run(TestResult.java:109)
>     at junit.framework.TestCase.run(TestCase.java:118)
>     at junit.framework.TestSuite.runTest(TestSuite.java:208)
>     at junit.framework.TestSuite.run(TestSuite.java:203)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:585)
>     at org.apache.maven.surefire.junit.JUnitTestSet.execute(JUnitTestSet.java:210)
>     at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.executeTestSet(AbstractDirectoryTestSuite.java:135)
>     at org.apache.maven.surefire.suite.AbstractDirectoryTestSuite.execute(AbstractDirectoryTestSuite.java:160)
>     at org.apache.maven.surefire.Surefire.run(Surefire.java:81)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>     at java.lang.reflect.Method.invoke(Method.java:585)
>     at org.apache.maven.surefire.booter.SurefireBooter.runSuitesInProcess(SurefireBooter.java:182)
>     at org.apache.maven.surefire.booter.SurefireBooter.main(SurefireBooter.java:743)
>     Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException:
Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data
store - already dispatched
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:104)
>     at org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.reset(StoreDurableSubscriberCursor.java:225)
>     at org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:560)
>     at org.apache.activemq.broker.region.DurableTopicSubscription.activate(DurableTopicSubscription.java:130)
>     at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:105)
>     at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:376)
>     at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>     at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>     at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:83)
>     at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:86)
>     at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:93)
>     at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:546)
>     at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
>     at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:308)
>     at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:182)
>     at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:104)
>     at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>     at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:204)
>     at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
>     at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
>     at java.lang.Thread.run(Thread.java:595)
>     Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Message id
ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3 could not be recovered from the data store -
already dispatched
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:239)
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.reset(AbstractStoreCursor.java:101)
>     ... 22 more
>     Caused by: java.lang.IllegalStateException: Message id ID:pdrhas4_32-40202-1234575036513-14:8:1:1:3
could not be recovered from the data store - already dispatched
>     at org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:58)
>     at org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
>     at org.apache.activemq.store.kahadaptor.KahaTopicReferenceStore.recoverNextMessages(KahaTopicReferenceStore.java:262)
>     at org.apache.activemq.store.amq.AMQTopicMessageStore.recoverNextMessages(AMQTopicMessageStore.java:59)
>     at org.apache.activemq.broker.region.cursors.TopicStorePrefetch.doFillBatch(TopicStorePrefetch.java:91)
>     at org.apache.activemq.broker.region.cursors.AbstractStoreCursor.fillBatch(AbstractStoreCursor.java:236)
>     ... 23 more
>     « Hide details 
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message