Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4D55EDC27 for ; Mon, 11 Mar 2013 19:18:45 +0000 (UTC) Received: (qmail 64222 invoked by uid 500); 11 Mar 2013 19:18:45 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 64193 invoked by uid 500); 11 Mar 2013 19:18:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 64185 invoked by uid 99); 11 Mar 2013 19:18:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Mar 2013 19:18:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Mar 2013 19:18:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5305B238889B; Mon, 11 Mar 2013 19:18:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1455290 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java Date: Mon, 11 Mar 2013 19:18:24 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130311191824.5305B238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Mar 11 19:18:23 2013 New Revision: 1455290 URL: http://svn.apache.org/r1455290 Log: Added at test case and fix for AMQ-4351. Avoids deadlock by not holding on to the pending and dispatch locks when we try to detach a durable sub from a topic. Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1455290&r1=1455289&r2=1455290&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Mar 11 19:18:23 2013 @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -188,6 +189,8 @@ public class DurableTopicSubscription ex active.set(false); offlineTimestamp.set(System.currentTimeMillis()); this.usageManager.getMemoryUsage().removeUsageListener(this); + + ArrayList topicsToDeactivate = new ArrayList(); synchronized (pendingLock) { pending.stop(); @@ -195,7 +198,7 @@ public class DurableTopicSubscription ex for (Destination destination : durableDestinations.values()) { Topic topic = (Topic) destination; if (!keepDurableSubsActive) { - topic.deactivate(context, this); + topicsToDeactivate.add(topic); } else { topic.getDestinationStatistics().getInflight().subtract(dispatched.size()); } @@ -236,6 +239,9 @@ public class DurableTopicSubscription ex } } } + for(Topic topic: topicsToDeactivate) { + topic.deactivate(context, this); + } prefetchExtension.set(0); } Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java?rev=1455290&view=auto ============================================================================== --- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java (added) +++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java Mon Mar 11 19:18:23 2013 @@ -0,0 +1,233 @@ +package org.apache.activemq.broker; + +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.*; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implements the test case attached to: + * https://issues.apache.org/jira/browse/AMQ-4351 + * + * This version avoids the spring deps. + */ +public class AMQ4351Test extends BrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4351Test.class); + + public static Test suite() { + return suite(AMQ4351Test.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + + // Lets clean up often. + broker.setOfflineDurableSubscriberTaskSchedule(500); + broker.setOfflineDurableSubscriberTimeout(2000); // lets delete durable subs much faster. + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + EmbeddedDataSource dataSource = new EmbeddedDataSource(); + dataSource.setDatabaseName("derbyDb"); + dataSource.setCreateDatabase("create"); + jdbc.setDataSource(dataSource); + + jdbc.deleteAllMessages(); + broker.setPersistenceAdapter(jdbc); + return broker; + } + + ActiveMQConnectionFactory connectionFactory; + ActiveMQTopic destination = new ActiveMQTopic("TEST"); + + @Override + protected void setUp() throws Exception { + super.setUp(); + connectionFactory = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + } + + class ProducingClient implements Runnable { + final AtomicLong size = new AtomicLong(); + final AtomicBoolean done = new AtomicBoolean(); + CountDownLatch doneLatch = new CountDownLatch(1); + + Connection connection; + Session session; + MessageProducer producer; + + ProducingClient() throws JMSException { + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(destination); + } + + private void sendMessage() { + try { + producer.send(session.createTextMessage("Test")); + long i = size.incrementAndGet(); + if( (i % 1000) == 0 ) { + LOG.info("produced " + i + "."); + } + } catch (JMSException e) { + e.printStackTrace(); + } + } + + public void start() { + new Thread(this, "ProducingClient").start(); + } + + public void stop() throws InterruptedException { + done.set(true); + if( !doneLatch.await(20, TimeUnit.MILLISECONDS) ) { + try { + connection.close(); + doneLatch.await(); + } catch (JMSException e) { + } + } + } + + @Override + public void run() { + try { + try { + while (!done.get()) { + sendMessage(); + Thread.sleep(10); + } + } finally { + connection.close(); + } + } catch (Exception e) { + e.printStackTrace(); + done.set(true); + } finally { + doneLatch.countDown(); + } + } + } + + class ConsumingClient implements Runnable { + final String name; + final AtomicLong size = new AtomicLong(); + final AtomicBoolean done = new AtomicBoolean(); + CountDownLatch doneLatch = new CountDownLatch(1); + + public ConsumingClient(String name) { + this.name = name; + } + + public void start() { + LOG.info("Starting JMS listener " + name); + new Thread(this, "ConsumingClient: "+name).start(); + } + + public void stopAsync() { + done.set(true); + } + + public void stop() throws InterruptedException { + stopAsync(); + doneLatch.await(); + } + + @Override + public void run() { + try { + Connection connection = connectionFactory.createConnection(); + connection.setClientID(name); + connection.start(); + try { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createDurableSubscriber(destination, name, null, false); + while( !done.get() ) { + Message msg = consumer.receive(100); + if(msg!=null ) { + size.incrementAndGet(); + session.commit(); + } + } + } finally { + connection.close(); + LOG.info("Stopped JMS listener " + name); + } + } catch (Exception e) { + e.printStackTrace(); + done.set(true); + } finally { + doneLatch.countDown(); + } + } + + } + + public void testAMQ4351() throws InterruptedException, JMSException { + LOG.info("Start test."); + + ProducingClient producer = new ProducingClient(); + ConsumingClient listener1 = new ConsumingClient("subscriber-1"); + ConsumingClient listener2 = new ConsumingClient("subscriber-2"); + ConsumingClient listener3 = new ConsumingClient("subscriber-3"); + try { + + listener1.start(); + listener2.start(); + listener3.start(); + int subs = 100; + + List subscribers = new ArrayList(subs); + for (int i = 4; i < subs; i++) { + ConsumingClient client = new ConsumingClient("subscriber-" + i); + subscribers.add(client); + client.start(); + } + + LOG.info("All subscribers started."); + producer.sendMessage(); + + LOG.info("Stopping 97 subscribers...."); + for (ConsumingClient client : subscribers) { + client.stopAsync(); + } + + // Start producing messages for 10 minutes, at high rate + LOG.info("Starting mass message producer..."); + producer.start(); + + + long lastSize = listener1.size.get(); + for( int i=0 ; i < 10; i++ ) { + Thread.sleep(1000); + long size = listener1.size.get(); + LOG.info("Listener 1: consumed: "+(size - lastSize)); + assertTrue( size > lastSize ); + lastSize = size; + } + } finally { + LOG.info("Stopping clients"); + listener1.stop(); + listener2.stop(); + listener3.stop(); + producer.stop(); + } + } + +}