Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 73862 invoked from network); 10 Feb 2009 17:39:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Feb 2009 17:39:08 -0000 Received: (qmail 73529 invoked by uid 500); 10 Feb 2009 17:39:08 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 73507 invoked by uid 500); 10 Feb 2009 17:39:07 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 73497 invoked by uid 99); 10 Feb 2009 17:39:07 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Feb 2009 09:39:07 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 10 Feb 2009 17:39:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A969823888A5; Tue, 10 Feb 2009 17:38:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r743027 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Date: Tue, 10 Feb 2009 17:38:29 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090210173830.A969823888A5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Tue Feb 10 17:38:28 2009 New Revision: 743027 URL: http://svn.apache.org/viewvc?rev=743027&view=rev Log: test for AMQ-2100 Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=743027&r1=743026&r2=743027&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Tue Feb 10 17:38:28 2009 @@ -16,12 +16,18 @@ */ package org.apache.activemq; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -36,6 +42,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import edu.emory.mathcs.backport.java.util.Collections; + /** * Test cases used to test the JMS message consumer. * @@ -109,6 +117,73 @@ assertEquals(2, counter.get()); } + + public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch closeDone = new CountDownLatch(1); + + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + // preload the queue + sendMessages(session, destination, 2000); + + + final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination); + + final Map exceptions = + Collections.synchronizedMap(new HashMap()); + Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Uncaught exception:", e); + exceptions.put(t, e); + } + }); + + final class AckAndClose implements Runnable { + private Message message; + + public AckAndClose(Message m) { + this.message = m; + } + + public void run() { + try { + int count = counter.incrementAndGet(); + if (count == 590) { + // close in a separate thread is ok by jms + consumer.close(); + closeDone.countDown(); + } + if (count % 200 == 0) { + // ensure there are some outstanding messages + // ack every 200 + message.acknowledge(); + } + } catch (Exception e) { + LOG.error("Exception on close or ack:", e); + exceptions.put(Thread.currentThread(), e); + } + } + }; + + final ExecutorService executor = Executors.newCachedThreadPool(); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message m) { + // ack and close eventually in separate thread + executor.execute(new AckAndClose(m)); + } + }); + + assertTrue(closeDone.await(20, TimeUnit.SECONDS)); + // await possible exceptions + Thread.sleep(1000); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + + public void initCombosForTestMutiReceiveWithPrefetch1() { addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),