activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: Message Cursors and Dispatch
Date Wed, 09 Mar 2011 10:58:15 GMT
you are using a durable subscriber but with non persistent messages.
Because a non persistent message is not in the brokers messages store,
it needs to be locally persisted by the durable subscription cursor,
so it uses the filebased message cursor and replays the messages to
that.
I think what you need is to have non persistent messages use a store
based cursor that is backed by your recovery kahadb instance.

You would need to extend the StoreDurableSubscriberCursor to implement
that (set nonPersistent) and provide a custom
PendingDurableSubscriberMessageStoragePolicy to have it used a
runtime.

Is there a good reason you don't use persistent messages such that a
durable topic subscriber can work with the brokers store.

On 8 March 2011 18:40, Anirudha Khanna <anirudha@yahoo-inc.com> wrote:
> I haven't explicitly configured any cursors on the Topic. From the documentation it seems
like the default is the store based cursor, but when looking at the ActiveMQ Transport thread
stack trace I noticed it was also using the "FilePendingMessageCursor". Can I explicitly configure
the broker to only use the Store based cursor?
>
> attaching a part of the stack trace,
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.addMessageLast(FilePendingMessageCursor.java:205)
>   - locked org.apache.activemq.broker.region.cursors.FilePendingMessageCursor@56e48687
> org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.addRecoveredMessage(StoreDurableSubscriberCursor.java:183)
>   - locked org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor@3e539fc
> org.apache.activemq.broker.region.DurableTopicSubscription.doAddRecoveredMessage(DurableTopicSubscription.java:204)
>   - locked org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor@3e539fc
> org.apache.activemq.broker.region.AbstractSubscription.addRecoveredMessage(AbstractSubscription.java:174)
> com.yahoo.servinggrid.streaming.replay.ReplaySubscriptionRecoveryPolicy$1.recoverMessage(ReplaySubscriptionRecoveryPolicy.java:224)
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$4.execute(KahaDBStore.java:402)
> org.apache.kahadb.page.Transaction.execute(Transaction.java:728)
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:394)
>   - locked java.lang.Object@4edc024a
> com.yahoo.servinggrid.streaming.replay.ReplaySubscriptionRecoveryPolicy.recover(ReplaySubscriptionRecoveryPolicy.java:214)
>
> My broker.xml
>
> <beans>
>    <broker brokerName="replay-broker" dataDirectory="${activemq.base}/data"
>        useJmx="true" persistent="true" useShutdownHook="false">
>            <!--        deleteAllMessagesOnStartup="true"> -->
> <destinations>
>        <topic physicalName="streaming.replay.topic1" />
> </destinations>
>   <destinationPolicy>
>      <policyMap>
>        <policyEntries>
>          <policyEntry topic="streaming.replay.>">
>            <subscriptionRecoveryPolicy xmlns:spring="http://www.springframework.org/schema/beans">
>              <spring:bean id="replayPolicy" class="com.yahoo.servinggrid.streaming.replay.ReplaySubscriptionRecoveryPolicy">
>                <spring:property name="kahadbDir">
>                  <spring:value>${activemq.base}/data/replay-kahadb</spring:value>
>                </spring:property>
>                <spring:property name="indexCacheSize">
>                  <spring:value>5000</spring:value>
>              </spring:bean>
>            </subscriptionRecoveryPolicy>
>          </policyEntry>
>        </policyEntries>
>      </policyMap>
>  </destinationPolicy>
>    <persistenceAdapter>
>        <kahaDB directory="${activemq.base}/data/kahadb"
>                enableIndexWriteAsync="true"
>                indexCacheSize="5000"
>                cleanupInterval="900000"
>                checkpointInterval="300000"/>
>    </persistenceAdapter>
>    <transportConnectors>
>        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
>    </transportConnectors>
>   </broker>
> </beans>
>
>
> Thanks,
> Anirudha
>
> On 3/8/11 4:34 AM, "Gary Tully" <gary.tully@gmail.com> wrote:
>
> That is expected for the file based cursor. It tries to proxy the
> store, so the entire store is replayed through the cursor to ensure
> correct message order. This behavoour only makes sense if the
> underlying store is very slow, which in your case, kahaDB, is not.
>
> The store based cursor will be a better match a it does not attempt to
> proxy the whole store, rather it reads it in batches and only starts
> to cursor new messages in memory when the store is exhausted.
>
> On 8 March 2011 02:06, Anirudha Khanna <anirudha@yahoo-inc.com> wrote:
>> Hi,
>>
>> So I added a custom subscription recovery policy to ActiveMQ-5.3.2 that stores messages
in a different instance of Kaha Db. To load test this recovery policy we sent close to ~ 82
million messages(24 hours worth of messages) to the topic that has the recovery policy enabled
on it.
>> When connecting a single consumer after broker restart, I noticed that the consumer
takes a long time to even start getting the messages. Looking at the stack trace it seems
that ActiveMQ first attempts to add all the recovered messages to a Message Cursor(in this
case a File Message cursor) and only then start the dispatch(the dispatch thread is WAITING).
Before restarting the broker a single consumer was able to receive messages fairly quickly.
>> Is this the expected behaviour for ActiveMQ, that ALL messages will first be added
to the Message cursor before they are dispatched? Is there a way to get around this?
>>
>> Thanks,
>> Anirudha
>>
>>
>
>
>
> --
> http://blog.garytully.com
> http://fusesource.com
>
>



-- 
http://blog.garytully.com
http://fusesource.com

Mime
View raw message