activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Attila Nagy <>
Subject Re: Can this be done with ActiveMQ? (durable virtual topic with selector on STOMP with network of brokers)
Date Mon, 04 Nov 2013 20:58:26 GMT
Sure (python):

I think the problem on the two paths are:
- selectorAware is not durable-aware, I mean, if I consume from a queue 
(which is fed by a virtual topic) with a selector, it works only in 
online mode, the chain doesn't retain the selector when the consumer is 
offline. I think I understand this, and this is what I'm asking in the 
subject: can AMQ do this (or something like this, durable subscriptions 
to a topic with a selector)?
- without selectorAware, the virtual topic posts all messages from the 
topic to the queue, and the consumer only gets ones, which its selector 
allows. My problem with this is what do I do with the bigger and bigger 
pile of messages, lurking in various queues (for each consumer with 
different selectors) and why it doesn't work? I mean it works, but 
consuming from this setup takes ages, some messages quickly, then 
nothing for looong seconds (nearly one minute), then again a burst and 
nothing... I don't know what causes this.

Here are two graphs:

Each of them shows the time passing by in seconds on the X axis and the 
number of messages sent on the Y.
The first graph shows what happens when a sender posts into a virtual 
topic, which has a (queue) consumer without selectors (and without 
The red sender publishes messages, and the green consumer consumes them 
with a slight delay, but with the same rate as the sender.

On the second message, the sender has published 1000 messages, but the 
consumer had a selector, which selected only 666 (2/3) of them. You can 
see that the sender has published the 666 messages in about a second, 
while the consumer got them only in batches with long waits.

I do individual ACKs, but I can't see how a program error could do this. 
I receive all selected messages after all, but only with big delays.

But this is just an interesting point, which I don't understand, and my 
real problem is the first one:
how to publish in a distributed network of brokers into a topic with 
different durable subscribers with selectors.
That's what I would like to know. :)

On 11/04/2013 09:31 PM, Johan Edstrom wrote:
> Are you marking the message as persistent?
> On Nov 4, 2013, at 1:08 PM, Attila Nagy <> wrote:
>> Even on you can find the terms durable topic and queue. I think this
is a compact way to represent that I want to store/get my messages even when the consumer
is offline.
>> BTW, I'm afraid you haven't read my mail to the end, or the message didn't get through.
>> I'll try to conclude it: I would like to post to a virtual topic in a network of
brokers and consume from this topic (queues) with different clients -with durable subscriptions,
but I hope you won't mind there is no durable subscription for queues- with different selectors,
and I would like to get only those messages to those queues, which the selectors match.
>> This works when the consumer is online (consuming from the queue), but when I remove
it, nothing will get into the queue.
>> Or the other way around: all messages got into the queue even when the consumer is
offline, but the consumer receives messages in very slow batches when I use a selector.
>> Without a selector, the full speed can be achieved.
>> These are two problems, though.
>> On 11/04/2013 04:56 PM, Aleksandar Ivanisevic wrote:
>>> The only thing that is durable is a consumer, topic per se can not be
>>> durable. Also how is a queue durable? Perhaps you mean a queue with
>>> persistent messages on it?
>>> To have a durable consumer to a topic you need to create a durable
>>> subscription, either through the console or by sending a client-id and
>>> activemq.subscriptionName headers when connecting.
>>> Attila Nagy <> writes:
>>>> Hi,
>>>> I'm struggling with AMQ 5.9.0 to achieve my goals: durable virtual
>>>> topics with selectors on STOMP, with a network of four brokers
>>>> (connected with SSL connectors, with ACLs and certificate
>>>> authentication/authorization both on client and server side).
>>>> In english: I want to publish messages to a -more, but I think it's
>>>> irrelevant here- (virtual) topic from machines spread in many data
>>>> centers to AMQ servers in two DCs (2x2 machines, fully meshed). Any
>>>> publisher or consumer can connect to any of the servers.
>>>> The messages in the topic have several headers and I would like to
>>>> filter them into durable queues.
>>>> So for example Publisher1..10 publishes Type1..10 messages, but
>>>> Consumer1 only consuming Type1, Consumer2 consuming Type1-3 and so on
>>>> from durable queues.
>>>> To protect the queues, I would like to deliver only the messages
>>>> matching the consumer's selector, and publish the message with a TTL
>>>> set, so if the consumer for the given queue is away for an extender
>>>> period of time, the messages should be dropped.
>>>> Seems to be fun, but I can't get it to work.
>>>> The two behaviours I could get -so far with only one machine and only
>>>> one publisher/consumer (one queue):
>>>> - everything works nicely, the queue gets only the relevant messages,
>>>> but it's not durable. If there is a consumer, it gets the messages,
>>>> but if nobody listens, nothing gets to the queue.
>>>> - the queue gets all of the messages (not just the ones, the selector
>>>> would allow) and is durable. However, the consumer gets the messages
>>>> in bursts, like around 130 messages per second and nothing for about a
>>>> minute, then another 130 messages and nothing for a minute, while the
>>>> queue is full with messages.
>>>> The configuration I use is:
>>>> The difference between the two, described above is the selectorAware
>>>> true setting, commented out in the pastebin config.
>>>> I use a python client, publish to /topic/VirtualTopic.radius and
>>>> consume from /queue/Consumer.radiusmq.VirtualTopic.radius with the
>>>> following code snippet:
>>>>      conn.connect(headers={'client-id':'radiusmq'})
>>>>      conn.subscribe(headers={
>>>> 'destination':'/queue/Consumer.radiusmq.VirtualTopic.radius',
>>>>                              'ack':'client',
>>>>                              'id':1,
>>>>                              'activemq.prefetchSize':1000,
>>>>                              'selector':"Xsystem = 'wired' AND ("
>>>>                              "Xstatustype = 'STOP' OR "
>>>>                              "Xstatustype = 'INTERIM_UPDATE')",
>>>>                              }
>>>>                     )
>>>> I have some graphs about the latter case, if helps, however, I would
>>>> like to get the former working, but with a durable queue.
>>>> Thanks,

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message