Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 86568 invoked from network); 15 Dec 2008 18:49:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 15 Dec 2008 18:49:17 -0000 Received: (qmail 57099 invoked by uid 500); 15 Dec 2008 18:49:30 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 57075 invoked by uid 500); 15 Dec 2008 18:49:30 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 57066 invoked by uid 99); 15 Dec 2008 18:49:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Dec 2008 10:49:30 -0800 X-ASF-Spam-Status: No, hits=0.0 required=10.0 tests= X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Dec 2008 18:49:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5502423888A0; Mon, 15 Dec 2008 10:48:53 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r726764 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/test/rollback/ activemq-ra/src/test/java/org/apache/activemq/ra/ Date: Mon, 15 Dec 2008 18:48:52 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081215184853.5502423888A0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Mon Dec 15 10:48:51 2008 New Revision: 726764 URL: http://svn.apache.org/viewvc?rev=726764&view=rev Log: fix - AMQ-2034 - have close in XA transaction deferred to synchronisation after completion, have rollback call beforeEnd to propagate acknowledgements; add a bunch of tests Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Dec 15 10:48:51 2008 @@ -590,11 +590,27 @@ */ public void close() throws JMSException { if (!unconsumedMessages.isClosed()) { - dispose(); - this.session.asyncSendPacket(info.createRemoveCommand()); + if (session.isTransacted() && session.getTransactionContext().getTransactionId() != null) { + session.getTransactionContext().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + doClose(); + } + + public void afterRollback() throws Exception { + doClose(); + } + }); + } else { + doClose(); + } } } + void doClose() throws JMSException { + dispose(); + this.session.asyncSendPacket(info.createRemoveCommand()); + } + void clearMessagesInProgress() { // we are called from inside the transport reconnection logic // which involves us clearing all the connections' consumers @@ -653,10 +669,14 @@ // } // Do we have any acks we need to send out before closing? - // Ack any delivered messages now. (session may still - // commit/rollback the acks). + // Ack any delivered messages now. // only processes optimized acknowledgements - deliverAcks(); + if (!session.isTransacted()) { + deliverAcks(); + if (session.isDupsOkAcknowledge()) { + acknowledge(); + } + } if (executorService != null) { executorService.shutdown(); try { @@ -665,9 +685,7 @@ Thread.currentThread().interrupt(); } } - if (session.isTransacted() || session.isDupsOkAcknowledge()) { - acknowledge(); - } + if (session.isClientAcknowledge()) { if (!this.info.isBrowser()) { // rollback duplicates that aren't acknowledged Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java Mon Dec 15 10:48:51 2008 @@ -27,6 +27,7 @@ import javax.transaction.xa.XAResource; import org.apache.activemq.command.SessionId; +import org.apache.activemq.transaction.Synchronization; /** * The XASession interface extends the capability of Session by adding access @@ -96,6 +97,24 @@ return new ActiveMQTopicSession(this); } + @Override + public void close() throws JMSException { + if (getTransactionContext().isInXATransaction()) { + getTransactionContext().addSynchronization(new Synchronization() { + public void afterCommit() throws Exception { + doClose(); + } + + public void afterRollback() throws Exception { + doClose(); + } + }); + } + } + + void doClose() throws JMSException { + super.close(); + } /** * This is called before transacted work is done by * the session. XA Work can only be done when this Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java Mon Dec 15 10:48:51 2008 @@ -224,6 +224,7 @@ throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); } + beforeEnd(); if (transactionId != null) { TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); this.transactionId = null; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java Mon Dec 15 10:48:51 2008 @@ -16,27 +16,39 @@ */ package org.apache.activemq; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; import javax.jms.XAConnection; import javax.jms.XAQueueConnection; import javax.jms.XASession; import javax.jms.XATopicConnection; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.transport.stomp.StompTransportFilter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { private static final Log LOG = LogFactory.getLog(ActiveMQXAConnectionFactoryTest.class); + long txGenerator = System.currentTimeMillis(); public void testCopy() throws URISyntaxException, JMSException { ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?"); @@ -117,6 +129,126 @@ connection2.close(); } + public void testVanilaTransactionalProduceReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + } + + public void testConsumerCloseTransactionalSendReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + producer.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + consumer.close(); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + session = connection1.createXASession(); + consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + assertNull(consumer.receive(1000)); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + } + + public void testSessionCloseTransactionalSendReceive() throws Exception { + + ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + XAConnection connection1 = (XAConnection)cf1.createConnection(); + connection1.start(); + XASession session = connection1.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(getName()); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText(getName()); + producer.send(message); + session.close(); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + + session = connection1.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + session.close(); + assertNotNull(receivedMessage); + assertEquals(getName(), receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + + session = connection1.createXASession(); + consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + assertNull(consumer.receive(1000)); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + } + + protected void assertCreateConnection(String uri) throws Exception { // Start up a broker with a tcp connector. BrokerService broker = new BrokerService(); @@ -161,5 +293,29 @@ assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection); assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection); } + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + + } } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Mon Dec 15 10:48:51 2008 @@ -652,5 +652,35 @@ assertNull(redispatchConsumer.receive(500)); redispatchSession.close(); } + + public void testRedispatchOfRolledbackTx() throws Exception { + + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + sendMessages(connection, destination, 1); + + MessageConsumer consumer = session.createConsumer(destination); + assertNotNull(consumer.receive(1000)); + + // install another consumer while message dispatch is unacked/uncommitted + Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination); + + session.rollback(); + session.close(); + + Message msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + // should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much! + assertEquals(3, msg.getLongProperty("JMSXDeliveryCount")); + redispatchSession.commit(); + + assertNull(redispatchConsumer.receive(500)); + redispatchSession.close(); + } + } Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java Mon Dec 15 10:48:51 2008 @@ -157,7 +157,8 @@ // Get the first. assertEquals(outbound[0], consumer.receive(1000)); consumer.close(); - + session.commit(); + QueueBrowser browser = session.createBrowser((Queue)destination); Enumeration enumeration = browser.getEnumeration(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java Mon Dec 15 10:48:51 2008 @@ -37,7 +37,7 @@ protected int numberOfMessagesOnQueue = 1; private Connection connection; - public void testVerifyCloseRedeliveryWithFailoverTransport() throws Throwable { + public void testVerifySessionCloseRedeliveryWithFailoverTransport() throws Throwable { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); MessageConsumer consumer = session.createConsumer(destination); @@ -57,7 +57,46 @@ assertEquals("redelivered message", id, message.getJMSMessageID()); assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); } + + public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws Throwable { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(destination); + + Message message = consumer.receive(1000); + String id = message.getJMSMessageID(); + assertNotNull(message); + LOG.info("got message " + message); + consumer.close(); + session.close(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + consumer = session.createConsumer(destination); + + message = consumer.receive(1000); + session.commit(); + assertNotNull(message); + assertEquals("redelivered message", id, message.getJMSMessageID()); + assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); + } + public void testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws Throwable { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(destination); + + Message message = consumer.receive(1000); + String id = message.getJMSMessageID(); + assertNotNull(message); + LOG.info("got message " + message); + consumer.close(); + session.rollback(); + + consumer = session.createConsumer(destination); + message = consumer.receive(1000); + session.commit(); + assertNotNull(message); + assertEquals("redelivered message", id, message.getJMSMessageID()); + assertEquals(3, message.getLongProperty("JMSXDeliveryCount")); + } + protected void setUp() throws Exception { super.setUp(); Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=726764&r1=726763&r2=726764&view=diff ============================================================================== --- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java (original) +++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java Mon Dec 15 10:48:51 2008 @@ -25,7 +25,9 @@ import java.util.concurrent.TimeUnit; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; @@ -188,6 +190,94 @@ } + public void testMessageExceptionReDelivery() throws Exception { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter(); + adapter.setServerUrl("vm://localhost?broker.persistent=false"); + adapter.start(new StubBootstrapContext()); + + final CountDownLatch messageDelivered = new CountDownLatch(2); + + final StubMessageEndpoint endpoint = new StubMessageEndpoint() { + public void onMessage(Message message) { + super.onMessage(message); + try { + messageDelivered.countDown(); + if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) { + throw new RuntimeException(getName() + " ex on first delivery"); + } else { + try { + assertTrue(message.getJMSRedelivered()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } catch (InterruptedException ignored) { + } + }; + + public void afterDelivery() throws ResourceException { + try { + if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) { + xaresource.end(xid, XAResource.TMFAIL); + xaresource.rollback(xid); + } else { + xaresource.end(xid, XAResource.TMSUCCESS); + xaresource.prepare(xid); + xaresource.commit(xid, false); + } + } catch (Throwable e) { + throw new ResourceException(e); + } + } + }; + + ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec(); + activationSpec.setDestinationType(Queue.class.getName()); + activationSpec.setDestination("TEST"); + activationSpec.setResourceAdapter(adapter); + activationSpec.validate(); + + MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() { + public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException { + endpoint.xaresource = resource; + return endpoint; + } + + public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException { + return true; + } + }; + + // Activate an Endpoint + adapter.endpointActivation(messageEndpointFactory, activationSpec); + + // Give endpoint a chance to setup and register its listeners + try { + Thread.sleep(1000); + } catch (Exception e) { + + } + + // Send the broker a message to that endpoint + MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); + producer.send(session.createTextMessage("Hello!")); + connection.close(); + + // Wait for the message to be delivered twice. + assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS)); + + // Shut the Endpoint down. + adapter.endpointDeactivation(messageEndpointFactory, activationSpec); + adapter.stop(); + + } + + public Xid createXid() throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream os = new DataOutputStream(baos);