activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Restricting queue sizes (was Re: setting a policy on a particular queue)
Date Tue, 03 Jul 2007 14:40:02 GMT
On 7/2/07, Jim Alateras <jima@comware.com.au> wrote:
> James Strachan wrote:
> > On 7/1/07, Jim Alateras <jima@comware.com.au> wrote:
> >> I have a queue which i use to publish the status of the device.
> >>
> >> The consumer will dequeue the status and send it to a central management
> >> system at a configurable frequency. The publisher will publish a new
> >> status only when there is a change in the device state.
> >>
> >> In some circumstances the queue will contain multiple device messages
> >> but only the most recent one is valid.
> >>
> >> I was wondering whether there was a way to apply the following policy to
> >> the queue  such that it only holds the most recent device status.
> >>
> >> Can I limit the size of the queue to 1 message; the most recent one.
> >
> > There's no pre-defined queue-size-limited policy yet I'm afraid.
> > Messages can be discarded on a per-consumer basis on topics; along
> > with last-image caching and so forth; but we don't yet support a fixed
> > size queue where older messages are discarded. Though the broker
> > implementation is extensible; so you could plugin your own particular
> > provider via the DestinationFactory
> >
> > http://activemq.apache.org/maven/activemq-core/apidocs/org/apache/activemq/broker/region/DestinationFactory.html
> >
> >
> James, is there an example or something similar in the code base that I
> can use to bootstrap this development effort.
>
>
> cheers
> </jima>
>


I'm afraid there's nothing exactly the same kind of thing; it might
involve having to grok the inner workings of some classes; mostly the
Queue class
http://activemq.apache.org/maven/activemq-core/apidocs/org/apache/activemq/broker/region/Queue.html

and maybe QueueSubscription (and its base class PrefetchSubscription) might help
http://activemq.apache.org/maven/activemq-core/apidocs/org/apache/activemq/broker/region/QueueSubscription.html

There's the virtual destination stuff; but thats more of an
interceptor around a Destination, rather than tinkering with the
internals....
http://activemq.apache.org/maven/activemq-core/apidocs/org/apache/activemq/broker/region/virtual/package-summary.html

we do inject multiple policies into a Queue...
http://activemq.apache.org/maven/activemq-core/apidocs/org/apache/activemq/broker/region/policy/package-summary.html

so maybe we need to enable some kind of policy to define message
deletion on a queue or something?


If the restricting queue size was a simple matter of just stopping
more things entering the queue until sufficient space is available;
then we kinda have this already in 5.x; where destinations can have
their own usage constraints (usually size in bytes rather than number
of messages though).  What you want though is to delete old messages
on the queue when a new message arrives so this won't fit.

BTW its vital you grok the prefetch mechanism in ActiveMQ...
http://activemq.apache.org/what-is-the-prefetch-limit-for.html
as once a message is put into a prefetch buffer (either on the broker,
on the socket or in the client's in RAM buffer), its pretty hard to
remove.

So to only keep 1 message in transit to the consumer at any time; you
should use a prefetch value 1. If you pull messages via
consumer.receive() you could use zero to defer dispatching messages
until the last possible moment. i.e. only 1 message is in transit to
the consumer at any point in time. Also you might wanna use an
Exclusive Consumer to avoid multiple consumers hogging the messages.

Once thats the case; then the broker has all of the messages in its
Queue so that it can then delete old messages when new messages come
in if it wants. The Queue class mentioned above does that work - the
code is a little complex as you might imagine as its dealing with flow
control, transactions, paging messages in/out of RAM and optional
persistence. See if you can figure out how to tweak that to remove old
messages if a new message is about to be added (you may have to call
store.removeMessage() if you want persistent messaging - which would
involve simulating a MessageAck)


As a total aside; normally processing of a status message is pretty
quick, so its gonna be rare for there to be loads of messages backing
up on the queue; the main time that'll happen is on startup (or if the
consumer slows down / pauses etc). So a workaround to the problem
could be - on startup your consumer consumes all of the available
messages wtihin a time window (say a second) then discarding all but
the latest message? i.e. if its no big deal to occasionally get
multiple status messages together, but you just want to avoid cases
like stopping and starting the system and having thousands of status
messages; you could just use a pure JMS solution, discarding messages
close to each other - purely at the JMS client side of things.

Its kinda like a message throttle; folks have used similar mechanisms
to slow down the huge numbers of messages on stock exchanges, by
collapsing price changes for a single stock down to a single message
per stock. You can do this using Camel with an Aggregator. The basic
idea is explained here...

http://cwiki.apache.org/CAMEL/aggregator.html

So for example to consume from the status queue and aggregate messages
together (so only the latest one gets through within a window -
defined by default by a 1 second time window or 50k messages whichever
happens soonest), you could do this...

from("activemq:theStatusQueue").aggregator(header("JMSDestination")).to("activemq:theExternalSystemQueue");

The downside is this does add a bit of latency (basically whatever the
timeout is on the window); which defaults to 1 second but you could go
as low as a few millis; as mostly if there are 2 messages on a queue,
by the time the first one is actually received, there's usually
another one there pretty quickly (if prefetch is at its default
value).

-- 
James
-------
http://macstrac.blogspot.com/

Mime
View raw message