activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Anirudha Khanna <aniru...@yahoo-inc.com>
Subject Re: Message Cursors and Dispatch
Date Tue, 08 Mar 2011 18:40:01 GMT
I haven't explicitly configured any cursors on the Topic. From the documentation it seems like
the default is the store based cursor, but when looking at the ActiveMQ Transport thread stack
trace I noticed it was also using the "FilePendingMessageCursor". Can I explicitly configure
the broker to only use the Store based cursor?

attaching a part of the stack trace,
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.addMessageLast(FilePendingMessageCursor.java:205)
   - locked org.apache.activemq.broker.region.cursors.FilePendingMessageCursor@56e48687
org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor.addRecoveredMessage(StoreDurableSubscriberCursor.java:183)
   - locked org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor@3e539fc
org.apache.activemq.broker.region.DurableTopicSubscription.doAddRecoveredMessage(DurableTopicSubscription.java:204)
   - locked org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor@3e539fc
org.apache.activemq.broker.region.AbstractSubscription.addRecoveredMessage(AbstractSubscription.java:174)
com.yahoo.servinggrid.streaming.replay.ReplaySubscriptionRecoveryPolicy$1.recoverMessage(ReplaySubscriptionRecoveryPolicy.java:224)
org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$4.execute(KahaDBStore.java:402)
org.apache.kahadb.page.Transaction.execute(Transaction.java:728)
org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:394)
   - locked java.lang.Object@4edc024a
com.yahoo.servinggrid.streaming.replay.ReplaySubscriptionRecoveryPolicy.recover(ReplaySubscriptionRecoveryPolicy.java:214)

My broker.xml

<beans>
    <broker brokerName="replay-broker" dataDirectory="${activemq.base}/data"
        useJmx="true" persistent="true" useShutdownHook="false">
            <!--        deleteAllMessagesOnStartup="true"> -->
<destinations>
        <topic physicalName="streaming.replay.topic1" />
</destinations>
   <destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="streaming.replay.>">
            <subscriptionRecoveryPolicy xmlns:spring="http://www.springframework.org/schema/beans">
              <spring:bean id="replayPolicy" class="com.yahoo.servinggrid.streaming.replay.ReplaySubscriptionRecoveryPolicy">
                <spring:property name="kahadbDir">
                  <spring:value>${activemq.base}/data/replay-kahadb</spring:value>
                </spring:property>
                <spring:property name="indexCacheSize">
                  <spring:value>5000</spring:value>
              </spring:bean>
            </subscriptionRecoveryPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
  </destinationPolicy>
    <persistenceAdapter>
        <kahaDB directory="${activemq.base}/data/kahadb"
                enableIndexWriteAsync="true"
                indexCacheSize="5000"
                cleanupInterval="900000"
                checkpointInterval="300000"/>
    </persistenceAdapter>
    <transportConnectors>
        <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
    </transportConnectors>
   </broker>
</beans>


Thanks,
Anirudha

On 3/8/11 4:34 AM, "Gary Tully" <gary.tully@gmail.com> wrote:

That is expected for the file based cursor. It tries to proxy the
store, so the entire store is replayed through the cursor to ensure
correct message order. This behavoour only makes sense if the
underlying store is very slow, which in your case, kahaDB, is not.

The store based cursor will be a better match a it does not attempt to
proxy the whole store, rather it reads it in batches and only starts
to cursor new messages in memory when the store is exhausted.

On 8 March 2011 02:06, Anirudha Khanna <anirudha@yahoo-inc.com> wrote:
> Hi,
>
> So I added a custom subscription recovery policy to ActiveMQ-5.3.2 that stores messages
in a different instance of Kaha Db. To load test this recovery policy we sent close to ~ 82
million messages(24 hours worth of messages) to the topic that has the recovery policy enabled
on it.
> When connecting a single consumer after broker restart, I noticed that the consumer takes
a long time to even start getting the messages. Looking at the stack trace it seems that ActiveMQ
first attempts to add all the recovered messages to a Message Cursor(in this case a File Message
cursor) and only then start the dispatch(the dispatch thread is WAITING). Before restarting
the broker a single consumer was able to receive messages fairly quickly.
> Is this the expected behaviour for ActiveMQ, that ALL messages will first be added to
the Message cursor before they are dispatched? Is there a way to get around this?
>
> Thanks,
> Anirudha
>
>



--
http://blog.garytully.com
http://fusesource.com


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message