activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joakim von Brandis <joa...@mnemonic.no>
Subject Messages stuck in queue after queue fills up
Date Thu, 22 May 2014 16:33:21 GMT
Hi all!

I am struggeling to understand/make sense of a situation in ActiveMQ:

* a high-volume producer delivers messages to a queue (several thousand messages per second)
* one or several high-volume consumers consume messages from the same queue (CEF in the example
below)
* as long as the consumers consume messages in an orderly fashion, the producer flow control
will make sure that the queue is not flooded.
* however, if the consumers go offline/away for a short while, the producers are not limited,
and will quickly fill the queue up to its resource limits
* When the queue is full, the producers hang on send() to ActiveMQ, which makes sense. If
enabling sendFailIfNoSpaceAfterTimeout, the producers are interrupted on send() if waiting
more than 5000 millis. This is ok.
* However, the _consumers_ do not receive any messages, hanging on receive(wait) for the specified
time, and then returning null. 
* So, the queue is full with messages, but the consumers are unable to consume them!

I have tried disconnecting/restarting the consumers, producers, without luck.
I am using memoryPeristenceAdapter to provide max throughput, so restarting activemq clears
the queues, but then the problem starts all over again….
Using LevelDB persistence adapter causes the same behaviour, only that restarting activemq
brings back the queue with all the messages still there, and the consumers are still not able
to consume them!

I the producer is stopped before the queue reaches resource limits, the consumers are able
to consume the messages in the queue when they get back online.

What am I not understanding?
I have tried playing around with per-destination-policies (storeUsageHighWaterMark, cursorMemoryHighWaterMark,
memoryLimit, useCache), systemUsage limits, and different persistence adapters, but I am unable
to get this to work properly.

My current broker test config is shown below. I am using version 5.9.1.

Any suggestions are welcome.

Best regards
Joakim von Brandis

--

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="router1" dataDirectory="${activemq.data}"
useJmx="true">
        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry queue="STATS" producerFlowControl="true" storeUsageHighWaterMark="70">
</policyEntry>
                <policyEntry queue="AGGR" producerFlowControl="true" storeUsageHighWaterMark="70">
</policyEntry>
                <policyEntry queue="CEF" producerFlowControl="true" storeUsageHighWaterMark="50"
cursorMemoryHighWaterMark="50"> </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" storeUsageHighWaterMark="50">
</policyEntry>
                <policyEntry topic=">" producerFlowControl="true" storeUsageHighWaterMark="50">
</policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        <persistenceAdapter>
         <!-- <levelDB directory="activemq-data"/> -->
            <memoryPersistenceAdapter/>
        </persistenceAdapter>


          <systemUsage>
            <systemUsage sendFailIfNoSpaceAfterTimeout="5000">
                <memoryUsage>
                    <memoryUsage limit="1000 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1000 mb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
        <transportConnectors>
            <transportConnector name="listener" uri="tcp://0.0.0.0:4001?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
        </transportConnectors>

        <networkConnectors>
        </networkConnectors>

        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"
/>
        </shutdownHooks>

    </broker>


Mime
View raw message