activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From coryplusplus <coryplusp...@gmail.com>
Subject Remove Unacknowledged Heartbeat Message
Date Fri, 03 Oct 2014 16:08:03 GMT
Hi,
Currently I am using apache-activemq-5.5.1 and am having some trouble with
memory usage filling up for a particular topic.

2014-08-12 22:33:44,881 | INFO  | Usage Manager memory limit reached.
Stopping producer (XXXXX) to prevent flooding topic://SERVER_PONG. See
http://activemq.apache.org/producer-flow-control.html for more info
(blocking for: 1s) | org.apache.activemq.broker.region.Topic | ActiveMQ
Transport: tcp:///127.0.0.1:51252

This blocking continues until activeMQ is restarted.

We basically have a client/server application where activemq is hosted on
the same machine as the server.
The server has multiple components and each produce a SERVER_PONG message
which is sent in response to a SERVER_PING message from the client. Our
current theory is that somehow our client is getting into an unexpected
state where it is able to send SERVER_PING messages but unable to consume
SERVER_PONG messages. This results in the SERVER_PONG messages backing up
and eventually causing us to exhaust our memory resources as mentioned
above.

Our first attempt at a solution was to utilize the timeToLive variable and
set it for each SERVER_PONG message that was sent. That would have made sure
that we weren’t keeping SERVER_PONG messages around that were not
acknowledged and would stop us from flooding the broker. This however was
not a possibility as our clients can potentially be based in different time
zones than our server. I saw a lot of posts recommending the timestamp
plugin but that does not work in our case since our producer and broker are
on the same server and our consumer is the one checking the expiration time
before processing. (see
http://activemq.2283324.n4.nabble.com/timeToLive-with-out-of-sync-clocks-td3009739.html)

So the question now has become, is there a way to remove these server pong
messages that have not been acknowledged after x amount of time. Does
ActiveMQ provide an API that will allow us to check number of inFlight
messages per topic and remove them accordingly if we have too many?

Here is our activemq.xml configuration file if needed.


<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">

    
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>

    
    <broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.base}/data"
persistent="false" >

        

                <destinationPolicy>
                                <policyMap>
                                                <policyEntries>
                                                                <policyEntry
topic="SERVER_PONG.>">
                                                                               
<pendingMessageLimitStrategy>
                                                                                         
     
<constantPendingMessageLimitStrategy limit="50" />
                                                                               
</pendingMessageLimitStrategy>
                                                               
</policyEntry>
                                                                <policyEntry
topic="FROM_STATUS_SERVER.>">
                                                                               
<pendingMessageLimitStrategy>
                                                                                         
     
<constantPendingMessageLimitStrategy limit="5000" />
                                                                               
</pendingMessageLimitStrategy>
                                                               
</policyEntry>
                                                                <policyEntry
topic=">" producerFlowControl="false" memoryLimit="1 kb">
                                                                               
<pendingSubscriberPolicy>
                                                                                         
     
<vmCursor />
                                                                               
</pendingSubscriberPolicy>
                                                               
</policyEntry>
                                                                <policyEntry
queue=">" producerFlowControl="false" memoryLimit="1 kb">
                                                                                
                                                               
</policyEntry>
                                                </policyEntries>
                                </policyMap>
                </destinationPolicy>


        
        <managementContext>
            <managementContext createConnector="true"/>
        </managementContext>

        


          
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="3072 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="5 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="8 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        
        <transportConnectors>
                        <transportConnector name="openwire"
uri="tcp://0.0.0.0:60000?transport.keepAlive=true&amp;wireformat.maxInactivityDuration=0&amp;transport.connectionTimeout=0"/>
        </transportConnectors>

    </broker>

    

</beans>





--
View this message in context: http://activemq.2283324.n4.nabble.com/Remove-Unacknowledged-Heartbeat-Message-tp4686129.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message