activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: How to configure 5.3 broker over KahaDB to support lots of unconsumed persistent msgs?
Date Tue, 16 Feb 2010 11:38:03 GMT
There is something not right here. let me build a test case to investigate a
bit.

On 16 February 2010 01:19, scot.hale <scot.hale@gmail.com> wrote:

>
> A.) I tried using the FilePendingQueueMessageStoragePolicy.  I assume that
> this needs to be added to the queue destination policy specifically.
> However I added it to default and Topic just to be sure (not shown here).
>
> I turned on flow control, but was unable to figure out what memory settings
> are needed.  What I gathered from your post is that I need to set the queue
> destination memory limit higher than the default SystemUsage memory limit.
> Is that right?  For example:
>
>
>
>
> brokerService.getSystemUsage().getMemoryUsage().setLimit(32*1024*1024);
>
> brokerService.getSystemUsage().getTempUsage().setLimit(128*1024*1024);
>
>        PolicyMap policyMap = new PolicyMap();
>
>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>
>        PolicyEntry policy = new PolicyEntry();
>        policy.setProducerFlowControl(true);
>         policy.setPendingQueuePolicy(new
> FilePendingQueueMessageStoragePolicy());
>        policy.setQueue(">");
>        policy.setMemoryLimit(64*1024*1024);
>         entries.add(policy);
>        policyMap.setPolicyEntries(entries);
>
>        brokerService.setDestinationPolicy(policyMap);
>
>
> I tried it the other way around as well and it still stops (meaning
> producers are blocked or resourceAllocationExceptions are thrown from
> Queue.send()) when it gets to the lower of the two memory limits. I am
> definitely missing something.
>
> B.) Would using the StorePendingQueueMessageStoragePolicy provide the same
> behavior I am looking for?
>
> C.) I didn't understand the last sentence in your post.  Does this mean
> that
> when the brokerService.getSystemUsage().getTempUsage() is the disk usage
> limit that should generate ResourceAllocationExceptions (assuming
> SendFailIfNoSpace is set to true)?  In my configureation, it would mean
> when
> the 128MB is used up by the temp cursor references on disk, then no more
> resources are available?
>
>
>
>
>
>
> Gary Tully wrote:
> >
> > First thing is you need to use the FilePendingMessageStoragePolicy() as
> > that
> > will off load message references to the file system when
> > SystemUsage.MemoryUsage limit is reached.
> >
> > So 1) add the following to the broker policy entry
> >         PendingQueueMessageStoragePolicy pendingQueuePolicy = new
> > FilePendingQueueMessageStoragePolicy();
> >         policy.setPendingQueuePolicy(pendingQueuePolicy);
> >
> > With flow control on, you need to configure a lower SystemUsage as the
> use
> > of disk space by the file based cursors is determined by the shared
> > SystemUsage.memoryLimit, which by default is the same value as the memory
> > limit for a destination. With a single destination, the flowcontroll
> kicks
> > in before the system usage so no spooling to disk occurs.
> >
> > 2) Configure a SystemUsage.MemoryLimit that is less than the default
> > destination memory limit of 64M
> >    brokerService.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 *
> > 63);
> >
> > This should do it once you add a TempStore() limit to implement 5.
> >
> >
> > On 15 February 2010 17:22, scot.hale <scot.hale@gmail.com> wrote:
> >
> >>
> >> I am trying to setup a queue with the following requirements:
> >>
> >>
> >> ActiveMQ 5.1 or 5.3  ( I have been testing with 5.3 )
> >> 1. ) VM Transport
> >> 2. ) Persistent with KahaPersistenceAdaptor
> >> 4. ) JVM Memory usage is capped at something like 64MB
> >>        - When this limit is reached the producers should continue to
> >> store
> >> incoming messages to disk  (StoreBasedCursor or FileBasedCursor will
> >> work,
> >> since the former is the default that is the one I have been using.)
> >> 5. ) File System usage is capped at something like 10GB
> >>        - When this limit is reached the producers should start throwing
> >> javax.jms.ResourceAllocationExceptions to the Producers
> >>
> >> Number 5 is the least important, as it will be difficult to fill up disk
> >> space in production. My current setup configures ActiveMQ
> >> programatically.
> >> I don't think this is introducing problems, let me know if there are
> >> issues
> >> with programatic configuration.
> >>
> >>
> >> Default settings:
> >>        If I do not configure the SystemUsage or the Flow control, then
> >> 64MB
> >> default memory usage is reached and the producers are halted even though
> >> the
> >> queues are persistent and have much more space.  Should the default
> >> StoreBasedCursor behave this way?
> >>
> >>
> >> Turn off Flow Control:
> >>        When I turn off Flow Control with default SystemUseage settings,
> >> then the
> >> JVM memory is not capped.  After about 5 million messages with no
> >> consumers
> >> the JVM assigned 1GB of memory starts returning OutOfMemoryErrors.
> >>
> >>
> >> So what setting do I need to cap the memory and allow the messages to be
> >> stored to disk even when the cap is reached?
> >>
> >>
> >>
> >> This is how I programtically configure my BrokerService
> >>
> >>        System.setProperty("defaultBinSize", "16384");//Only way to set
> >> HashIndex bin size for KahaPersistenceAdapter
> >>        try {
> >>            uri = new URI("vm://"+brokerName);
> >>        } catch (URISyntaxException e) {
> >>            throw new RuntimeException(e);
> >>        }
> >>        brokerService = new BrokerService();
> >>        brokerService.setBrokerName(brokerName);
> >>        brokerService.setUseJmx(true);
> >>        brokerService.setUseLoggingForShutdownErrors(true);
> >>
> >>
> >>        PolicyMap policyMap = new PolicyMap();
> >>        List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
> >>        PolicyEntry policy = new PolicyEntry();
> >>        policy.setProducerFlowControl(true);
> >>        policy.setQueue(">");
> >>        entries.add(policy);
> >>        policyMap.setPolicyEntries(entries);
> >>        brokerService.setDestinationPolicy(policyMap);
> >>
> >>        //PERSISTENCE
> >>        brokerService.setPersistent(true);
> >>        KahaPersistenceAdapter persistenceAdapter = new
> >> KahaPersistenceAdapter();
> >>        persistenceAdapter.setDirectory(new
> >> File("/tmp/activemq-"+brokerName+"/kaha"));
> >>        brokerService.setDataDirectoryFile(new
> >> File("/tmp/activemq-"+brokerName+"/data"));
> >>        brokerService.setTmpDataDirectory(new
> >> File("/tmp/activemq-"+brokerName+"/temp"));
> >>        persistenceAdapter.setMaxDataFileLength(500L*1024*1024);
> >>
> >>        try {
> >>            brokerService.setPersistenceAdapter(persistenceAdapter);
> >>        } catch (IOException e) {
> >>            throw new RuntimeException(e);
> >>        }
> >>        try {
> >>            brokerService.getSystemUsage().setSendFailIfNoSpace(true);
> >>            brokerService.addConnector(uri);
> >>            brokerService.start();
> >>        } catch (Exception e) {
> >>            throw new RuntimeException(e);
> >>        }
> >>
> >>
> >>
> >> Here is a Producer:
> >>
> >> public class Producer implements Runnable{
> >>
> >>    private BrokerService brokerService;
> >>    private long numberOfMessages;
> >>
> >>    public Producer(BrokerService brokerService, long n){
> >>        this.brokerService = brokerService;
> >>        this.numberOfMessages = n;
> >>    }
> >>
> >>    public void run(){
> >>        ActiveMQConnectionFactory factory = new
> >> ActiveMQConnectionFactory(brokerService.getVmConnectorURI());
> >>        try {
> >>            Connection conn = factory.createConnection();
> >>            conn.start();
> >>            for (int i = 0; i < numberOfMessages; i++) {
> >>                Session session = conn.createSession(false,
> >> Session.AUTO_ACKNOWLEDGE);
> >>                Destination destination =
> >> session.createQueue("test-queue");
> >>                MessageProducer producer =
> >> session.createProducer(destination);
> >>                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
> >>                BytesMessage message = session.createBytesMessage();
> >>                message.writeBytes(new
> >>
> >>
> byte[]{0,0,0,66,0,0,0,5,0,0,0,0,0,0,0,3,0,0,0,49,51,49,51,51,53,53,48,51,51,54,0,0,0,49,50,51,52,53,0,0,0,0,0,0,0,0,0,0,17,116,114,97,99,101,32,109,101,32,112,108,101,97,115,101,32,50,});
> >>                try {
> >>                    producer.send(message);
> >>                } catch (ResourceAllocationException e) {
> >>                    e.printStackTrace();
> >>                }
> >>                session.close();
> >>            }
> >>        } catch (JMSException e) {
> >>            throw new RuntimeException(e);
> >>         }
> >>
> >>    }
> >> }
> >>
> >>
> >> rajdavies wrote:
> >> >
> >> > Hi Scott,
> >> >
> >> > just change the below config to enable flow control - i.e:
> >> >
> >> > <policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
> >> >   <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
> >> >
> >> > in 5.3 - producerFlowControl is on by default - so just remove the
> >> > producerFlowControl entry from your configuration.
> >> >
> >> > If this all sounds double dutch - send in your config - and we'll help
> >> > with the correct settings :)
> >> >
> >> >
> >> > On 12 Feb 2010, at 20:49, scot.hale wrote:
> >> >
> >> >>
> >> >> Fred,
> >> >>
> >> >> Were you able to configure ActiveMQ to grow without surpassing the
> >> >> memory
> >> >> setting?  I am trying to figure out how to do the same thing.
> >> >>
> >> >> -Scot
> >> >>
> >> >>
> >> >> Fred Moore-3 wrote:
> >> >>>
> >> >>> Hi,
> >> >>>
> >> >>> going back to Cursors and
> >> >>>
> >>
> http://activemq.apache.org/how-do-i-configure-activemq-to-hold-100s-of-millions-of-queue-messages-.html
> >> >>> ...
> >> >>>
> >> >>> ...can anyone shed some light on the actual role of memoryLimit
in:
> >> >>>   <policyEntry topic=">" producerFlowControl="false"
> >> >>> memoryLimit="1mb">
> >> >>>   <policyEntry queue=">" producerFlowControl="false"
> >> >>> memoryLimit="1mb">
> >> >>>
> >> >>> ...moreover: *when* will producerFlowControl start slowing down
> >> >>> consumers?
> >> >>>
> >> >>> Cheers,
> >> >>> F.
> >> >>>
> >> >>>
> >> >>
> >> >> --
> >> >> View this message in context:
> >> >>
> >>
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27569119.html
> >> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >> >>
> >> >
> >> > Rob Davies
> >> > http://twitter.com/rajdavies
> >> > I work here: http://fusesource.com
> >> > My Blog: http://rajdavies.blogspot.com/
> >> > I'm writing this: http://www.manning.com/snyder/
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >> --
> >> View this message in context:
> >>
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27597050.html
> >> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >>
> >
> >
> > --
> > http://blog.garytully.com
> >
> > Open Source Integration
> > http://fusesource.com
> >
> >
>
> --
> View this message in context:
> http://old.nabble.com/How-to-configure-5.3-broker-over-KahaDB-to-support-lots-of-unconsumed--persistent-msgs--tp27277849p27602503.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>


-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message