activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnluo1 <johnl...@gmail.com>
Subject Re: persistence data files not deleted
Date Thu, 17 Dec 2009 22:32:55 GMT

(Background on my setup)
I am trying to evaluate ActiveMQ's reliabilities for our messaging need. So
in my three node cluster setup. I have two producers published a random
number of messages between 0 and 2000 to node 1 and 2 in the cluster every
30 seconds. And I have two consumers remove messages from node 2 and node 3
in the cluster same every 30 seconds.

I was NOT able to have this setup run without any problems for over a day or
two. One consistent thing reproducible is that persistence data files are
not being removed. Even from admin interface, I have purged all the
messages. As a result, I have 1gb of disk configured as system limit all
used, and because of producerFlowControl="true", producers would hang. 

Following are my config and testing code. The reason I am using stomp only
is because we have an outdated compile env, e.g. gcc 2.9.6, and there is no
way we can use the shipped C++ client. We need to write a stomp C++ client
library ourselves. For now I am just testing with python.

My activemq.xml (I changed the producerFlowControl="false" in my latest
testing.)
=============
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
  http://activemq.apache.org/camel/schema/spring
http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">

    <!-- Allows us to use system properties as variables in this
configuration file -->
    <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
         <property name="locations">
           
<value>file:///${activemq.base}/conf/credentials.properties</value>
         </property>
    </bean>

    <broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.base}/data"
persistent="true" useShutdownHook="true" deleteAllMessagesOnStartup="false"
>

        <!-- The store and forward broker networks ActiveMQ will listen to
-->
        <networkConnectors>
              <networkConnector uri="multicast://default"
                dynamicOnly="true"
                networkTTL="3"
                prefetchSize="1"
                decreaseNetworkConsumerPriority="true" />

        </networkConnectors>

        <!-- The transport connectors ActiveMQ will listen to -->
        <transportConnectors>
            <!-- static
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613" />
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" />
             end -->

            <!-- multicast -->
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"
discoveryUri="multicast://default"/>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"
discoveryUri="multicast://default"/>

        </transportConnectors>

        <destinationInterceptors>
          <virtualDestinationInterceptor>
            <virtualDestinations>
              <compositeQueue name="relayd_composite">
                <forwardTo>
                  <queue physicalName="relayd_physical3" />
                  <queue physicalName="relayd_physical4" />
                </forwardTo>
              </compositeQueue>
            </virtualDestinations>
          </virtualDestinationInterceptor>
        </destinationInterceptors>
        <destinations>
           <queue physicalName="relayd_physical3" />
           <queue physicalName="relayd_physical4" />
        </destinations>

        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry queue=">" producerFlowControl="false"
memoryLimit="20mb">
                        <deadLetterStrategy>
                          <individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true" />
                        </deadLetterStrategy>
                    </policyEntry>
                    <policyEntry topic=">" producerFlowControl="false"
memoryLimit="20mb">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <!-- Use the following to configure how ActiveMQ is exposed in JMX
-->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>


        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data"
journalMaxFileLength="64mb" indexCacheSize="100000"/>
        </persistenceAdapter>

        <sslContext>
            <sslContext keyStore="file:${activemq.base}/conf/broker.ks"
keyStorePassword="password"
trustStore="file:${activemq.base}/conf/broker.ts"
trustStorePassword="password"/>
        </sslContext>

        <!--  The maximum about of space the broker will use before slowing
down producers -->
        <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb" name="foo"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100 mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

    </broker>
    <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
        <connectors>
            <nioConnector port="8161"/>
        </connectors>

        <handlers>
            <webAppContext contextPath="/admin"
resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/>
            <webAppContext contextPath="/demo"
resourceBase="${activemq.base}/webapps/demo" logUrlOnStart="true"/>
            <webAppContext contextPath="/fileserver"
resourceBase="${activemq.base}/webapps/fileserver" logUrlOnStart="true"/>
        </handlers>
    </jetty>

</beans>



My stomp producer code: (I am using stomp.py-2.0.4)
====================
import time
import unittest

import stomp
import random


class TestBasicSend(unittest.TestCase):

    def setUp(self):
        pass

    def testbasic(self):
        conn = stomp.Connection([('10.244.186.37', 61613)])

        conn.start()
        conn.connect(wait=True)

        # messsage size is 1k
        message = ''
        for i in range(1000):
                message = message + '0'

        clock_time = time.time()
        num_message = random.randint(1, 2000)
        for i in range(num_message):
                conn.send("message " + str(i) +  ' , ' + message,
destination='/queue/relayd', persistent='true')
                print "sending message ", str(i)
                #print message + str(i)
                i = i + 1
                # time.sleep(1)

        delta_clock_time =  time.time() - clock_time
        print "it takes " + str(delta_clock_time) + " seconds to enqueue " +
str(num_message) + " messages"

        conn.disconnect()


suite = unittest.TestLoader().loadTestsFromTestCase(TestBasicSend)
unittest.TextTestRunner(verbosity=2).run(suite)


My stomp consumer code:
====================
import time
import unittest

import stomp
from stomp import ConnectionListener
import random


class MyListener(ConnectionListener):
    start_time = 0
    num_messages = 0
    messages = 0

    def __init__(self):
        self.errors = 0
        self.connections = 0
        self.messages = 0
        self.start_time = 0
        self.num_messages = random.randint(0, 2000)

    def on_error(self, headers, message):
        print 'received an error %s' % message
        self.errors = self.errors + 1

    def on_connecting(self, host_and_port):
        print 'connecting %s %s' % host_and_port
        self.connections = self.connections + 1

    def on_message(self, headers, message):
        if self.messages == 0:
            self.start_time = time.time()
        # print 'received a message %s' % message
        print 'received message ', str(self.messages)
        self.messages = self.messages + 1
        if self.messages == self.num_messages:
            delta_clock_time =  time.time() - self.start_time
            print "it takes " + str(delta_clock_time) + " seconds to dequeue
" + str(self.num_messages) + " messages"
            time.sleep(10)

class TestBasicSend(unittest.TestCase):

    def setUp(self):
        pass

    def testbasic(self):

        conn = stomp.Connection([('10.244.186.38', 61613)])

        listener = MyListener()
        conn.set_listener('', listener)
        conn.start()
        conn.connect(wait=True)

        conn.subscribe(destination='/queue/relayd', ack='auto')

        while (listener.num_messages > listener.messages):
                time.sleep(10)

        conn.disconnect()

        self.assert_(listener.errors == 0, 'should not have received any
errors')

suite = unittest.TestLoader().loadTestsFromTestCase(TestBasicSend)
unittest.TextTestRunner(verbosity=2).run(suite)


-- 
View this message in context: http://old.nabble.com/persistence-data-files-not-deleted-tp26821266p26836222.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message