activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tomas Pavelka (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AMQ-6863) NPE when expiring messages with FilePendingMessageCursor and durable topic subscriptions
Date Mon, 27 Nov 2017 12:11:00 GMT

     [ https://issues.apache.org/jira/browse/AMQ-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Tomas Pavelka updated AMQ-6863:
-------------------------------
    Attachment: TestActiveSubscriptions.java
                TestHighMemoryWaterMark.java

I have attached two tests:
*TestActiveSubscriptions* shows that if we set org.apache.activemq.broker.BrokerService#setKeepDurableSubsActive
to false, then messages do not flow on durable topics. The test passes if the setting is back
at the default "true".
*TestHighMemoryWatermark* shows how we would expect the highWatermark settings to work, i.e.
when one topic hits its own watermark we can still send messages on another topic.

Let me know if there is something we are not setting correctly.

Thanks,
Tomas 

> NPE when expiring messages with FilePendingMessageCursor and durable topic subscriptions
> ----------------------------------------------------------------------------------------
>
>                 Key: AMQ-6863
>                 URL: https://issues.apache.org/jira/browse/AMQ-6863
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.15.2
>            Reporter: Tomas Pavelka
>         Attachments: TestActiveSubscriptions.java, TestCursors.java, TestHighMemoryWaterMark.java
>
>
> I am using a file based cursor with durable topic subscriptions because in my
> tests the broker would run out of memory when dealing with large numbers of
> messages without an active consumer.
> I have run into a NullPointerException when the messages meant for the topic
> with an active durable subscription expire. Here is part of the stack trace:
> java.lang.NullPointerException: null
>         at org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:586)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:810)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.discardExpiredMessage(FilePendingMessageCursor.java:489)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.tryAddMessageLast(FilePendingMessageCursor.java:247)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.addMessageLast(AbstractPendingMessageCursor.java:93)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:157)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:279)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic$2.recoverMessage(Topic.java:314)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.execute(KahaDBStore.java:1012)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:999)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(ProxyTopicMessageStore.java:108)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic.activate(Topic.java:307)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:123)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:164)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.TopicRegion.addSubscriptionsForDestination(TopicRegion.java:287)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:162)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:339)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:239)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:104)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:200)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:119)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:747)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:733)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.BrokerService.start(BrokerService.java:636)
> [activemq-broker-5.15.2.jar:5.15.2]
> I looked at the code and it seems to me that this is caused by the method
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#discardExpiredMessage:
>    private void discardExpiredMessage(MessageReference reference) {
>         LOG.debug("Discarding expired message {}", reference);
>         if (reference.isExpired() && broker.isExpired(reference)) {
>             ConnectionContext context = new ConnectionContext(new
> NonCachedMessageEvaluationContext());
>             context.setBroker(broker);
>            
> ((Destination)reference.getRegionDestination()).messageExpired(context,
> null, new IndirectMessageReference(reference.getMessage()));
>         }
>     }
> There the subscription passed to Destination#messageExpired is set to null.
> If the destination is a topic, then later in
> org.apache.activemq.broker.region.Topic#acknowledge:
>     @Override
>     public void acknowledge(ConnectionContext context, Subscription sub,
> final MessageAck ack,
>             final MessageReference node) throws IOException {
>         if (topicStore != null && node.isPersistent()) {
>             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
>             SubscriptionKey key = dsub.getSubscriptionKey();
>             topicStore.acknowledge(context, key.getClientId(),
> key.getSubscriptionName(), node.getMessageId(),
>                     convertToNonRangedAck(ack, node));
>         }
>         messageConsumed(context, node);
>     }
> The code dsub.getSubscriptionKey() throws an NPE. 
> I suspect that this problem also applies to the default org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor
which uses org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor internally
but so far I was not able to reproduce it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message