Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 92161 invoked from network); 10 Aug 2007 08:22:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Aug 2007 08:22:31 -0000 Received: (qmail 96098 invoked by uid 500); 10 Aug 2007 08:22:30 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 96081 invoked by uid 500); 10 Aug 2007 08:22:30 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 96072 invoked by uid 99); 10 Aug 2007 08:22:30 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2007 01:22:30 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2007 08:22:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id CAE9A1A981A; Fri, 10 Aug 2007 01:22:10 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r564505 - in /activemq/trunk/activemq-core: pom.xml src/test/java/org/apache/activemq/bugs/MessageSender.java src/test/java/org/apache/activemq/bugs/Receiver.java src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java Date: Fri, 10 Aug 2007 08:22:10 -0000 To: commits@activemq.apache.org From: jlim@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070810082210.CAE9A1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jlim Date: Fri Aug 10 01:22:09 2007 New Revision: 564505 URL: http://svn.apache.org/viewvc?view=rev&rev=564505 Log: added test case to simulate "javax.jms.JMSException: Transaction 'TX:ID:...' has not been started." exception this test appears to manifest consistently on a MacBook. Haven't been able to reproduce this on windows though. Is excluded by default as the test can sometime take too long to execute Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java Modified: activemq/trunk/activemq-core/pom.xml Modified: activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=564505&r1=564504&r2=564505 ============================================================================== --- activemq/trunk/activemq-core/pom.xml (original) +++ activemq/trunk/activemq-core/pom.xml Fri Aug 10 01:22:09 2007 @@ -309,6 +309,7 @@ + **/TransactionNotStartedErrorTest.* **/DefaultStoreBrokerTest.* **/TcpTransportBrokerTest.* **/activeio/* Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java?view=auto&rev=564505 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageSender.java Fri Aug 10 01:22:09 2007 @@ -0,0 +1,25 @@ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Session; + +public class MessageSender { + private MessageProducer producer; + private Session session; + + public MessageSender(String queueName,Connection connection, boolean useTransactedSession) throws Exception { + session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(session.createQueue(queueName)); + } + + public void send(String payload) throws Exception { + ObjectMessage message = session.createObjectMessage(); + message.setObject(payload); + producer.send(message); + if (session.getTransacted()) { + session.commit(); + } + } +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java?view=auto&rev=564505 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/Receiver.java Fri Aug 10 01:22:09 2007 @@ -0,0 +1,5 @@ +package org.apache.activemq.bugs; + +public interface Receiver { + public void receive(String s) throws Exception; +} Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java?view=auto&rev=564505 ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TransactionNotStartedErrorTest.java Fri Aug 10 01:22:09 2007 @@ -0,0 +1,303 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; + +import javax.jms.ObjectMessage; +import javax.jms.Session; +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.bugs.Receiver; +import org.apache.activemq.bugs.MessageSender; + +/* + * simulate message flow which cause the following exception + * in the broker (exception logged by client) + *

+ * 2007-07-24 13:51:23,624 com.easynet.halo.Halo ERROR (LoggingErrorHandler.java: 23) JMS failure + * javax.jms.JMSException: Transaction 'TX:ID:dmt-53625-1185281414694-1:0:344' has not been started. + * at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:230) + * + * + * This appears to be consistent in a MacBook. Haven't been able to replicate it on Windows though + */ +public class TransactionNotStartedErrorTest extends TestCase { + + private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory + .getLog(TransactionNotStartedErrorTest.class); + private String hectorToHalo = "hectorToHalo"; + private String xenaToHalo = "xenaToHalo"; + private String troyToHalo = "troyToHalo"; + + private String haloToHector = "haloToHector"; + private String haloToXena = "haloToXena"; + private String haloToTroy = "haloToTroy"; + + private static int counter = 500; + + private static int hectorToHaloCtr = 0; + private static int xenaToHaloCtr = 0; + private static int troyToHaloCtr = 0; + + private static int haloToHectorCtr = 0; + private static int haloToXenaCtr = 0; + private static int haloToTroyCtr = 0; + + private BrokerService broker; + + private Connection hectorConnection; + private Connection xenaConnection; + private Connection troyConnection; + private Connection haloConnection; + + private final Object lock = new Object(); + + public Connection createConnection() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "tcp://localhost:61616"); + return factory.createConnection(); + } + + public Session createSession(Connection connection, boolean transacted) + throws JMSException { + return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + } + + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.addConnector("tcp://localhost:61616").setName("Default"); + broker.start(); + log.info("Starting broker.."); + } + + public void tearDown() throws Exception { + hectorConnection.close(); + xenaConnection.close(); + troyConnection.close(); + haloConnection.close(); + broker.stop(); + } + + public void testTransactionNotStartedError() throws Exception { + startBroker(); + hectorConnection = createConnection(); + Thread hectorThread = buildProducer(hectorConnection, hectorToHalo); + Receiver hHectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToHectorCtr++; + if (haloToHectorCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver); + + troyConnection = createConnection(); + Thread troyThread = buildProducer(troyConnection, troyToHalo); + Receiver hTroyReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToTroyCtr++; + if (haloToTroyCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver); + + xenaConnection = createConnection(); + Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); + Receiver hXenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + haloToXenaCtr++; + if (haloToXenaCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver); + + haloConnection = createConnection(); + final MessageSender hectorSender = buildTransactionalProducer( + haloToHector, haloConnection); + final MessageSender troySender = buildTransactionalProducer(haloToTroy, + haloConnection); + final MessageSender xenaSender = buildTransactionalProducer(haloToXena, + haloConnection); + Receiver hectorReceiver = new Receiver() { + public void receive(String s) throws Exception { + hectorToHaloCtr++; + troySender.send("halo to troy because of hector"); + if (hectorToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + Receiver xenaReceiver = new Receiver() { + public void receive(String s) throws Exception { + xenaToHaloCtr++; + hectorSender.send("halo to hector because of xena"); + if (xenaToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + Receiver troyReceiver = new Receiver() { + public void receive(String s) throws Exception { + troyToHaloCtr++; + xenaSender.send("halo to xena because of troy"); + if (troyToHaloCtr >= counter) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + }; + buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver); + buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver); + buildReceiver(haloConnection, troyToHalo, true, troyReceiver); + + haloConnection.start(); + + troyConnection.start(); + troyThread.start(); + + xenaConnection.start(); + xenaThread.start(); + + hectorConnection.start(); + hectorThread.start(); + waitForMessagesToBeDelivered(); + //number of messages received should match messages sent + assertEquals(hectorToHaloCtr, counter); + log.info("hectorToHalo received " + hectorToHaloCtr + " messages"); + assertEquals(xenaToHaloCtr, counter); + log.info("xenaToHalo received " + xenaToHaloCtr + " messages"); + assertEquals(troyToHaloCtr, counter); + log.info("troyToHalo received " + troyToHaloCtr + " messages"); + assertEquals(haloToHectorCtr, counter); + log.info("haloToHector received " + haloToHectorCtr + " messages"); + assertEquals(haloToXenaCtr, counter); + log.info("haloToXena received " + haloToXenaCtr + " messages"); + assertEquals(haloToTroyCtr, counter); + log.info("haloToTroy received " + haloToTroyCtr + " messages"); + + } + + protected void waitForMessagesToBeDelivered() { + // let's give the listeners enough time to read all messages + long maxWaitTime = counter * 3000; + long waitTime = maxWaitTime; + long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); + + synchronized (lock) { + boolean hasMessages = true; + while (hasMessages && waitTime >= 0) { + try { + lock.wait(200); + } catch (InterruptedException e) { + log.error(e); + } + //check if all messages have been received + hasMessages = hectorToHaloCtr < counter + || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter + || haloToXenaCtr < counter || haloToTroyCtr < counter; + waitTime = maxWaitTime - (System.currentTimeMillis() - start); + } + } + } + + public MessageSender buildTransactionalProducer(String queueName, + Connection connection) throws Exception { + + return new MessageSender(queueName, connection, true); + } + + public Thread buildProducer(Connection connection, final String queueName) + throws Exception { + + final Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + final MessageSender producer = new MessageSender(queueName, connection, + false); + Thread thread = new Thread() { + + public synchronized void run() { + for (int i = 0; i < counter; i++) { + try { + producer.send(queueName); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + throw new RuntimeException("on " + queueName + " send", + e); + } + } + } + }; + return thread; + } + + public void buildReceiver(Connection connection, final String queueName, + boolean transacted, final Receiver receiver) throws Exception { + final Session session = transacted ? connection.createSession(true, + Session.SESSION_TRANSACTED) : connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageConsumer inputMessageConsumer = session.createConsumer(session + .createQueue(queueName)); + MessageListener messageListener = new MessageListener() { + + public void onMessage(Message message) { + try { + ObjectMessage objectMessage = (ObjectMessage) message; + String s = (String) objectMessage.getObject(); + receiver.receive(s); + if (session.getTransacted()) { + session.commit(); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + inputMessageConsumer.setMessageListener(messageListener); + } + +}