From users-return-23568-apmail-activemq-users-archive=activemq.apache.org@activemq.apache.org Fri May 07 21:19:37 2010 Return-Path: Delivered-To: apmail-activemq-users-archive@www.apache.org Received: (qmail 57481 invoked from network); 7 May 2010 21:19:37 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 May 2010 21:19:37 -0000 Received: (qmail 5025 invoked by uid 500); 7 May 2010 21:19:36 -0000 Delivered-To: apmail-activemq-users-archive@activemq.apache.org Received: (qmail 4905 invoked by uid 500); 7 May 2010 21:19:36 -0000 Mailing-List: contact users-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@activemq.apache.org Delivered-To: mailing list users@activemq.apache.org Received: (qmail 4897 invoked by uid 99); 7 May 2010 21:19:36 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 May 2010 21:19:36 +0000 X-ASF-Spam-Status: No, hits=0.0 required=10.0 tests=FREEMAIL_FROM,RCVD_IN_DNSWL_NONE,SPF_PASS,T_TO_NO_BRKTS_FREEMAIL X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of jamiemccrindle@gmail.com designates 72.14.220.159 as permitted sender) Received: from [72.14.220.159] (HELO fg-out-1718.google.com) (72.14.220.159) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 May 2010 21:19:30 +0000 Received: by fg-out-1718.google.com with SMTP id l26so86029fgb.14 for ; Fri, 07 May 2010 14:19:08 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:received:received:date:message-id :subject:from:to:content-type; bh=uSxSQnV7Qxt8aMmX0VC9bnXAxyiRx6+4fjfNB92flnk=; b=xBjbIjMsqwpu862U1aHLj5wHIjt8fxj376e6kBlt9XOcvtH88qWWHKJs4y0siWLeZA LwJMUgn1/2DWmk/0odphN5ubbFaOUdSvZ/opOKLlTm3WR/sScl/xELvRzHSlVX3ov+oK NYG80VkqzftrACl8I8BFy/AVxX43gjP5VPjwQ= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:date:message-id:subject:from:to:content-type; b=nx0cRf/AEhizBwYTKMbzlcQyai0zPIT4EoH6z04eKkGJVY5j44Xc5Unb0IBADeM8UQ aSx51TlVICHamliwDXVd7HJdd7PiBv4nfX1x3X5h8HY6DMAM4Ab+/YJ1pOcAEKT/1btX qleBwOuvOcJtAxATdskxCYhoAFkK5JGm7R5I8= MIME-Version: 1.0 Received: by 10.239.186.76 with SMTP id f12mr64041hbh.59.1273267148724; Fri, 07 May 2010 14:19:08 -0700 (PDT) Received: by 10.239.177.80 with HTTP; Fri, 7 May 2010 14:19:08 -0700 (PDT) Date: Fri, 7 May 2010 22:19:08 +0100 Message-ID: Subject: Spring && DefaultMessageListenerContainer && JmsTemplate && maxMessagesPerTask == 1 && Network of Brokers && maxConcurrentConsumer > 1 == Crumple From: Jamie McCrindle To: users@activemq.apache.org Content-Type: text/plain; charset=ISO-8859-1 Greetings all, After some weeks of scratching my head, I _believe_ I have found the magic combination that appears to be causing messages to become stuck in our network of brokers. It's so convoluted that it's entirely likely that the error isn't what I think it is but I have managed to create a test case that mirrors the behaviour we're seeing live. And it goes something like this: We have two brokers in a network of brokers. Producers were publishing to a queue on one of the brokers and consumers reading off the queue in the other broker. After a short while, messages would suddenly pile up on the 'producer' broker and not get read off on the 'consumer' broker. Pause for lots of random testing... It appeared that the network bridge subscription from the 'consumer' broker was disappearing from the 'producer' broker, causing the pile up. More testing later... And it looks like if we have a DefaultMessageListenerContainer with the following configuration: maxMessagesPerTask: 1 cacheLevel: CONSUMER maxConcurrentConsumer: 3 (more than 1, basically) concurrentConsumers: 1 sessionAcknowledgeMode: Session.AUTO_ACKNOWLEDGE When Spring scales back the dynamic amount of consumers, the network subscription appears to get lost and messages pile up on the producer side. Workaround: Use concurrentConsumers instead of maxConcurrentConsumers so that there are a static number of consumers (not setting maxMessagesPerTask also seems to work). I'll add it this to Jira if you'd like but the test case to replicate the behaviour is as follows: package org.example.activemq; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.pool.PooledConnectionFactory; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.listener.DefaultMessageListenerContainer; public class NetworkTest extends TestCase { public void testNetworkOfBrokers() throws Exception { BrokerService brokerService1 = null; BrokerService brokerService2 = null; final int total = 100; final CountDownLatch latch = new CountDownLatch(total); try { { brokerService1 = new BrokerService(); brokerService1.setBrokerName("one"); brokerService1.setUseJmx(false); brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService1.addConnector("tcp://0.0.0.0:61616"); NetworkConnector network1 = brokerService1.addNetworkConnector("static:(tcp://localhost:51515)"); network1.setName("network1"); network1.setDynamicOnly(true); network1.setNetworkTTL(3); network1.setPrefetchSize(1); brokerService1.start(); } { brokerService2 = new BrokerService(); brokerService2.setBrokerName("two"); brokerService2.setUseJmx(false); brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService2.addConnector("tcp://0.0.0.0:51515"); NetworkConnector network2 = brokerService2.addNetworkConnector("static:(tcp://localhost:61616)"); network2.setName("network1"); network2.setDynamicOnly(true); network2.setNetworkTTL(3); network2.setPrefetchSize(1); brokerService2.start(); } ExecutorService pool = Executors.newSingleThreadExecutor(); ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false"); final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory1); container.setMaxConcurrentConsumers(10); container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); container.setDestination(new ActiveMQQueue("testingqueue")); container.setMessageListener(new MessageListener() { public void onMessage(Message message) { latch.countDown(); } }); container.setMaxMessagesPerTask(1); container.afterPropertiesSet(); container.start(); pool.submit(new Callable() { public Object call() throws Exception { try { final int batch = 10; ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false"); PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2); JmsTemplate template = new JmsTemplate(pooledConnectionFactory); ActiveMQQueue queue = new ActiveMQQueue("testingqueue"); for(int b = 0; b < batch; b++) { for(int i = 0; i < (total / batch); i++) { template.send(queue, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(); message.setText("Hello World!"); return message; } }); } // give spring time to scale back again while(container.getActiveConsumerCount() > 1) { System.out.println("active consumer count: " + container.getActiveConsumerCount()); System.out.println("concurrent consumer count: " + container.getConcurrentConsumers()); Thread.sleep(1000); } } pooledConnectionFactory.stop(); } catch(Throwable t) { t.printStackTrace(); } return null; } }); pool.shutdown(); pool.awaitTermination(10, TimeUnit.SECONDS); int count = 0; // give it 20 seconds while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) { System.out.println("count " + latch.getCount()); } container.destroy(); } finally { try { if(brokerService1 != null) { brokerService1.stop(); }} catch(Throwable t) { t.printStackTrace(); } try { if(brokerService2 != null) { brokerService2.stop(); }} catch(Throwable t) { t.printStackTrace(); } } if(latch.getCount() > 0) { fail("latch should have gone down to 0 but was " + latch.getCount()); } } }