Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 57494 invoked from network); 22 Jul 2009 09:41:59 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Jul 2009 09:41:59 -0000 Received: (qmail 93993 invoked by uid 500); 22 Jul 2009 09:43:04 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 93935 invoked by uid 500); 22 Jul 2009 09:43:04 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 93908 invoked by uid 99); 22 Jul 2009 09:43:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2009 09:43:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2009 09:42:55 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1C1782388896; Wed, 22 Jul 2009 09:42:35 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r796646 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java Date: Wed, 22 Jul 2009 09:42:34 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090722094235.1C1782388896@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Wed Jul 22 09:42:34 2009 New Revision: 796646 URL: http://svn.apache.org/viewvc?rev=796646&view=rev Log: test case for https://issues.apache.org/activemq/browse/AMQ-2183 Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java (with props) Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java?rev=796646&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2183Test.java Wed Jul 22 09:42:34 2009 @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + + +import java.lang.Thread.UncaughtExceptionHandler; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class AMQ2183Test extends TestCase implements UncaughtExceptionHandler, MessageListener { + + private static final Log LOG = LogFactory.getLog(AMQ2183Test.class); + private static final int maxSent = 2000; + private final Map exceptions = new ConcurrentHashMap(); + + BrokerService master = new BrokerService(); + BrokerService slave = new BrokerService(); + URI masterUrl, slaveUrl; + + public void onException(JMSException e) { + exceptions.put(Thread.currentThread(), e); + } + + public void setUp() throws Exception { + + master = new BrokerService(); + slave = new BrokerService(); + + master.setBrokerName("Master"); + master.addConnector("tcp://localhost:0"); + master.deleteAllMessages(); + master.setWaitForSlave(true); + + Thread t = new Thread() { + public void run() { + try { + master.start(); + } catch (Exception e) { + e.printStackTrace(); + exceptions.put(Thread.currentThread(), e); + } + } + }; + t.start(); + Thread.sleep(2000); + masterUrl = master.getTransportConnectors().get(0).getConnectUri(); + + slave.setBrokerName("Slave"); + slave.deleteAllMessages(); + slave.addConnector("tcp://localhost:0"); + slave.setMasterConnectorURI(masterUrl.toString()); + slave.start(); + slaveUrl = slave.getTransportConnectors().get(0).getConnectUri(); + } + + public void tearDown() throws Exception { + master.stop(); + slave.stop(); + exceptions.clear(); + } + + class MessageCounter implements MessageListener { + int count = 0; + public void onMessage(Message message) { + count++; + } + + int getCount() { + return count; + } + } + + public void testMasterSlaveBugWithStopStartConsumers() throws Exception { + + Thread.setDefaultUncaughtExceptionHandler(this); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + "failover:(" + masterUrl + ")?randomize=false"); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + final MessageCounter counterA = new MessageCounter(); + connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.A.VirtualTopic.T")).setMessageListener(counterA); + + MessageCounter counterB = new MessageCounter(); + connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(new ActiveMQQueue("Consumer.B.VirtualTopic.T")).setMessageListener(counterB); + + Thread.sleep(2000); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.T")); + for (int i=0; i