activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gary Tully <gary.tu...@gmail.com>
Subject Re: Random slow Subscribers - causing Topic to full - Solution ?
Date Tue, 19 Mar 2013 17:02:12 GMT
note: the only way to discard messages from a queue is to consume them (as
in ack them) or have them expire by setting a message expiry time.
The setPendingMessageLimitStrategy and  setMessageEvictionStrategy policys
only apply to non durable topic subscriptions.


On 18 March 2013 23:39, jaikit <jktsavla@gmail.com> wrote:

> Currently I have 2 consumers set up - which are consuming from Queues. I
> have
> disabled producerflowcontrol, setMemoryLimit to 1, queueprefetch = 10,
> topicprefetch = 10.
>
> I have added infinite sleep in consumer1 and I ran the load test with
> 900000
> events. My expectation is consumer1 discarding all events and consumer2
> consuming all events. However both the consumers are now blocked. Also all
> the events are saved in both the queue. Each queue size is 900000.
>
> I have pasted my code snippet below. Can some one please guide me. Thanks.
>
>         /* configure activemq broker. */
>         broker.setDeleteAllMessagesOnStartup(true);
>         broker.setUseJmx(true);
>         broker.setAdvisorySupport(false);
>         PolicyMap policyMap = new PolicyMap();
>         List<PolicyEntry> entries = new ArrayList<PolicyEntry>();
>         PolicyEntry topicPolicy = new PolicyEntry();
>         topicPolicy.setTopic(">");
>         topicPolicy.setProducerFlowControl(false);
>         entries.add(topicPolicy);
>
>         PolicyEntry queuePolicy = new PolicyEntry();
>         /* if this is true and if consumers are slow - producers will be
> throttled and worst case halted */
>         queuePolicy.setProducerFlowControl(false);
>         /* set flow control for all topics */
>         queuePolicy.setQueue(">");
>
>         queuePolicy.setMemoryLimit(1);
>         ConstantPendingMessageLimitStrategy
> constantPendingMessageLimitStrategy = new
> ConstantPendingMessageLimitStrategy();
>         constantPendingMessageLimitStrategy.setLimit(1);
>
>
>
> queuePolicy.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy);
>         OldestMessageEvictionStrategy oldestMessageEvictionStrategy = new
> OldestMessageEvictionStrategy();
>
> oldestMessageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1);
>
> queuePolicy.setMessageEvictionStrategy(oldestMessageEvictionStrategy);
>         /* send an advisory message if a consumer is deemed slow */
>         queuePolicy.setAdvisoryForSlowConsumers(false);
>         /* the period (in ms) of checks for message expiry on queued
> messages, value of 0 disables */
>         queuePolicy.setExpireMessagesPeriod(1000);
>         /* Set the PrefetchSize for all topics. You can override this value
> while creating consumer. */
>         // policy.setTopicPrefetch(10);
>         queuePolicy.setQueuePrefetch(10);
>         entries.add(queuePolicy);
>         policyMap.setPolicyEntries(entries);
>         broker.setDestinationPolicy(policyMap);
>
>         /* All undeliverable messages will be sent to ActiveMQ.DLQ which
> has
> fixed size. If it reaches fixed size,
>          * producers will be throttled. Drop dead letter queue. Enable it
> case by case basis. */
>         // DiscardingDLQBrokerPlugin dlqBrokerPlugin = new
> DiscardingDLQBrokerPlugin();
>         // dlqBrokerPlugin.setDropAll(true);
>         // dlqBrokerPlugin.setDropTemporaryTopics(true);
>         // dlqBrokerPlugin.setDropTemporaryQueues(true);
>         // BrokerPlugin[] plugins = { dlqBrokerPlugin };
>         // broker.setPlugins(plugins);
>
>         VirtualTopic virtualTopic = new VirtualTopic();
>         // the new config that enables selectors on the intercepter
>         virtualTopic.setSelectorAware(true);
>         VirtualDestinationInterceptor interceptor = new
> VirtualDestinationInterceptor();
>         interceptor.setVirtualDestinations(new VirtualDestination[] {
> virtualTopic });
>         broker.setDestinationInterceptors(new DestinationInterceptor[] {
> interceptor });
>         broker.start();
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664856.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
http://redhat.com
http://blog.garytully.com

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message