Return-Path: X-Original-To: apmail-activemq-users-archive@www.apache.org Delivered-To: apmail-activemq-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 19BE5DFFE for ; Mon, 18 Mar 2013 23:39:55 +0000 (UTC) Received: (qmail 24134 invoked by uid 500); 18 Mar 2013 23:39:54 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 24112 invoked by uid 500); 18 Mar 2013 23:39:54 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 24102 invoked by uid 99); 18 Mar 2013 23:39:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Mar 2013 23:39:54 +0000 X-ASF-Spam-Status: No, hits=2.0 required=5.0 tests=SPF_NEUTRAL,URI_HEX X-Spam-Check-By: apache.org Received-SPF: neutral (nike.apache.org: 216.139.250.139 is neither permitted nor denied by domain of jktsavla@gmail.com) Received: from [216.139.250.139] (HELO joe.nabble.com) (216.139.250.139) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Mar 2013 23:39:48 +0000 Received: from [192.168.236.139] (helo=joe.nabble.com) by joe.nabble.com with esmtp (Exim 4.72) (envelope-from ) id 1UHjeJ-00067p-Ev for users@activemq.apache.org; Mon, 18 Mar 2013 16:39:27 -0700 Date: Mon, 18 Mar 2013 16:39:27 -0700 (PDT) From: jaikit To: users@activemq.apache.org Message-ID: <1363649967456-4664856.post@n4.nabble.com> In-Reply-To: <1363644045050-4664855.post@n4.nabble.com> References: <1363410281734-4664784.post@n4.nabble.com> <1300613b.6292.13d789d9f33.Coremail.suonayi2006@163.com> <1363545152113-4664808.post@n4.nabble.com> <1363644045050-4664855.post@n4.nabble.com> Subject: Re: Random slow Subscribers - causing Topic to full - Solution ? MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org Currently I have 2 consumers set up - which are consuming from Queues. I have disabled producerflowcontrol, setMemoryLimit to 1, queueprefetch = 10, topicprefetch = 10. I have added infinite sleep in consumer1 and I ran the load test with 900000 events. My expectation is consumer1 discarding all events and consumer2 consuming all events. However both the consumers are now blocked. Also all the events are saved in both the queue. Each queue size is 900000. I have pasted my code snippet below. Can some one please guide me. Thanks. /* configure activemq broker. */ broker.setDeleteAllMessagesOnStartup(true); broker.setUseJmx(true); broker.setAdvisorySupport(false); PolicyMap policyMap = new PolicyMap(); List entries = new ArrayList(); PolicyEntry topicPolicy = new PolicyEntry(); topicPolicy.setTopic(">"); topicPolicy.setProducerFlowControl(false); entries.add(topicPolicy); PolicyEntry queuePolicy = new PolicyEntry(); /* if this is true and if consumers are slow - producers will be throttled and worst case halted */ queuePolicy.setProducerFlowControl(false); /* set flow control for all topics */ queuePolicy.setQueue(">"); queuePolicy.setMemoryLimit(1); ConstantPendingMessageLimitStrategy constantPendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy(); constantPendingMessageLimitStrategy.setLimit(1); queuePolicy.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy); OldestMessageEvictionStrategy oldestMessageEvictionStrategy = new OldestMessageEvictionStrategy(); oldestMessageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1); queuePolicy.setMessageEvictionStrategy(oldestMessageEvictionStrategy); /* send an advisory message if a consumer is deemed slow */ queuePolicy.setAdvisoryForSlowConsumers(false); /* the period (in ms) of checks for message expiry on queued messages, value of 0 disables */ queuePolicy.setExpireMessagesPeriod(1000); /* Set the PrefetchSize for all topics. You can override this value while creating consumer. */ // policy.setTopicPrefetch(10); queuePolicy.setQueuePrefetch(10); entries.add(queuePolicy); policyMap.setPolicyEntries(entries); broker.setDestinationPolicy(policyMap); /* All undeliverable messages will be sent to ActiveMQ.DLQ which has fixed size. If it reaches fixed size, * producers will be throttled. Drop dead letter queue. Enable it case by case basis. */ // DiscardingDLQBrokerPlugin dlqBrokerPlugin = new DiscardingDLQBrokerPlugin(); // dlqBrokerPlugin.setDropAll(true); // dlqBrokerPlugin.setDropTemporaryTopics(true); // dlqBrokerPlugin.setDropTemporaryQueues(true); // BrokerPlugin[] plugins = { dlqBrokerPlugin }; // broker.setPlugins(plugins); VirtualTopic virtualTopic = new VirtualTopic(); // the new config that enables selectors on the intercepter virtualTopic.setSelectorAware(true); VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); interceptor.setVirtualDestinations(new VirtualDestination[] { virtualTopic }); broker.setDestinationInterceptors(new DestinationInterceptor[] { interceptor }); broker.start(); -- View this message in context: http://activemq.2283324.n4.nabble.com/Random-slow-Subscribers-causing-Topic-to-full-Solution-tp4664784p4664856.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.