activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r726764 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/activemq/test/rollback/ activemq-ra/src/test/java/org/apache/activemq/ra/
Date Mon, 15 Dec 2008 18:48:52 GMT
Author: gtully
Date: Mon Dec 15 10:48:51 2008
New Revision: 726764

URL: http://svn.apache.org/viewvc?rev=726764&view=rev
Log:
fix - AMQ-2034 - have close in XA transaction deferred to synchronisation after completion,
have rollback call beforeEnd to propagate acknowledgements; add a bunch of tests

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon Dec 15 10:48:51 2008
@@ -590,11 +590,27 @@
      */
     public void close() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
-            dispose();
-            this.session.asyncSendPacket(info.createRemoveCommand());
+            if (session.isTransacted() && session.getTransactionContext().getTransactionId()
!= null) {
+                session.getTransactionContext().addSynchronization(new Synchronization()
{
+                    public void afterCommit() throws Exception {
+                        doClose();
+                    }
+
+                    public void afterRollback() throws Exception {
+                        doClose();
+                    }
+                });
+            } else {
+                doClose();
+            } 
         }
     }
 
+    void doClose() throws JMSException {
+        dispose();
+        this.session.asyncSendPacket(info.createRemoveCommand());
+    }
+    
     void clearMessagesInProgress() {
         // we are called from inside the transport reconnection logic
         // which involves us clearing all the connections' consumers
@@ -653,10 +669,14 @@
 //            }
 
             // Do we have any acks we need to send out before closing?
-            // Ack any delivered messages now. (session may still
-            // commit/rollback the acks).
+            // Ack any delivered messages now.
             // only processes optimized acknowledgements
-            deliverAcks();
+            if (!session.isTransacted()) { 
+                deliverAcks();
+                if (session.isDupsOkAcknowledge()) {
+                    acknowledge();
+                }
+            }
             if (executorService != null) {
                 executorService.shutdown();
                 try {
@@ -665,9 +685,7 @@
                     Thread.currentThread().interrupt();
                 }
             }
-            if (session.isTransacted() || session.isDupsOkAcknowledge()) {
-                acknowledge();
-            }
+            
             if (session.isClientAcknowledge()) {
                 if (!this.info.isBrowser()) {
                     // rollback duplicates that aren't acknowledged

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java
Mon Dec 15 10:48:51 2008
@@ -27,6 +27,7 @@
 import javax.transaction.xa.XAResource;
 
 import org.apache.activemq.command.SessionId;
+import org.apache.activemq.transaction.Synchronization;
 
 /**
  * The XASession interface extends the capability of Session by adding access
@@ -96,6 +97,24 @@
         return new ActiveMQTopicSession(this);
     }
 
+    @Override
+    public void close() throws JMSException {
+        if (getTransactionContext().isInXATransaction()) {
+            getTransactionContext().addSynchronization(new Synchronization() {
+                public void afterCommit() throws Exception {
+                    doClose();
+                }
+                
+                public void afterRollback() throws Exception {
+                    doClose();
+                }
+            });
+        }
+    }
+
+    void doClose() throws JMSException {
+        super.close();
+    }
     /**
      * This is called before transacted work is done by
      * the session.  XA Work can only be done when this

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/TransactionContext.java
Mon Dec 15 10:48:51 2008
@@ -224,6 +224,7 @@
             throw new TransactionInProgressException("Cannot rollback() if an XA transaction
is already in progress ");
         }
         
+        beforeEnd();
         if (transactionId != null) {
             TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId,
TransactionInfo.ROLLBACK);
             this.transactionId = null;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
Mon Dec 15 10:48:51 2008
@@ -16,27 +16,39 @@
  */
 package org.apache.activemq;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
 import javax.jms.XAConnection;
 import javax.jms.XAQueueConnection;
 import javax.jms.XASession;
 import javax.jms.XATopicConnection;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
 
 import org.apache.activemq.broker.BrokerRegistry;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.transport.stomp.StompTransportFilter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport {
     private static final Log LOG = LogFactory.getLog(ActiveMQXAConnectionFactoryTest.class);
+    long txGenerator = System.currentTimeMillis();
 
     public void testCopy() throws URISyntaxException, JMSException {
         ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?");
@@ -117,6 +129,126 @@
         connection2.close();
     }
 
+    public void testVanilaTransactionalProduceReceive() throws Exception {
+        
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+        XAConnection connection1 = (XAConnection)cf1.createConnection();
+        connection1.start();
+        XASession session = connection1.createXASession();
+        XAResource resource = session.getXAResource();
+        Destination dest = new ActiveMQQueue(getName());
+        
+        // publish a message
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        MessageProducer producer = session.createProducer(dest);
+        ActiveMQTextMessage message  = new ActiveMQTextMessage();
+        message.setText(getName());
+        producer.send(message);
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        session.close();
+        
+        session = connection1.createXASession();
+        MessageConsumer consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+        assertNotNull(receivedMessage);
+        assertEquals(getName(), receivedMessage.getText());
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+    }
+    
+    public void testConsumerCloseTransactionalSendReceive() throws Exception {
+        
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+        XAConnection connection1 = (XAConnection)cf1.createConnection();
+        connection1.start();
+        XASession session = connection1.createXASession();
+        XAResource resource = session.getXAResource();
+        Destination dest = new ActiveMQQueue(getName());
+        
+        // publish a message
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        MessageProducer producer = session.createProducer(dest);
+        ActiveMQTextMessage message  = new ActiveMQTextMessage();
+        message.setText(getName());
+        producer.send(message);
+        producer.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        session.close();
+        
+        session = connection1.createXASession();
+        MessageConsumer consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+        consumer.close();
+        assertNotNull(receivedMessage);
+        assertEquals(getName(), receivedMessage.getText());
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        
+        session = connection1.createXASession();
+        consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        assertNull(consumer.receive(1000));
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        
+    }
+
+    public void testSessionCloseTransactionalSendReceive() throws Exception {
+        
+        ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
+        XAConnection connection1 = (XAConnection)cf1.createConnection();
+        connection1.start();
+        XASession session = connection1.createXASession();
+        XAResource resource = session.getXAResource();
+        Destination dest = new ActiveMQQueue(getName());
+        
+        // publish a message
+        Xid tid = createXid();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        MessageProducer producer = session.createProducer(dest);
+        ActiveMQTextMessage message  = new ActiveMQTextMessage();
+        message.setText(getName());
+        producer.send(message);
+        session.close();
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        
+        
+        session = connection1.createXASession();
+        MessageConsumer consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        TextMessage receivedMessage = (TextMessage) consumer.receive(1000);
+        session.close();
+        assertNotNull(receivedMessage);
+        assertEquals(getName(), receivedMessage.getText());
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);
+        
+        session = connection1.createXASession();
+        consumer = session.createConsumer(dest);
+        tid = createXid();
+        resource = session.getXAResource();
+        resource.start(tid, XAResource.TMNOFLAGS);
+        assertNull(consumer.receive(1000));
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.commit(tid, true);        
+    }
+
+    
     protected void assertCreateConnection(String uri) throws Exception {
         // Start up a broker with a tcp connector.
         BrokerService broker = new BrokerService();
@@ -161,5 +293,29 @@
         assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection);
         assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection);
     }
+    
+    public Xid createXid() throws IOException {
+        
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        final byte[] bs = baos.toByteArray();
+
+        return new Xid() {
+            public int getFormatId() {
+                return 86;
+            }
+
+            public byte[] getGlobalTransactionId() {
+                return bs;
+            }
+
+            public byte[] getBranchQualifier() {
+                return bs;
+            }
+        };
+
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Mon
Dec 15 10:48:51 2008
@@ -652,5 +652,35 @@
         assertNull(redispatchConsumer.receive(500));
         redispatchSession.close();
     }
+
     
+    public void testRedispatchOfRolledbackTx() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+        
+        sendMessages(connection, destination, 1);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        assertNotNull(consumer.receive(1000));
+        
+        // install another consumer while message dispatch is unacked/uncommitted
+        Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+
+        session.rollback();
+        session.close();
+                
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getJMSRedelivered());
+        // should have re-delivery of 2, one for re-dispatch, one for rollback which is a
little too much!
+        assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
+        redispatchSession.commit();
+        
+        assertNull(redispatchConsumer.receive(500));
+        redispatchSession.close();
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
Mon Dec 15 10:48:51 2008
@@ -157,7 +157,8 @@
         // Get the first.
         assertEquals(outbound[0], consumer.receive(1000));
         consumer.close();
-
+        session.commit();
+        
         QueueBrowser browser = session.createBrowser((Queue)destination);
         Enumeration enumeration = browser.getEnumeration();
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/test/rollback/CloseRollbackRedeliveryQueueTest.java
Mon Dec 15 10:48:51 2008
@@ -37,7 +37,7 @@
     protected int numberOfMessagesOnQueue = 1;
     private Connection connection;
    
-    public void testVerifyCloseRedeliveryWithFailoverTransport() throws Throwable {
+    public void testVerifySessionCloseRedeliveryWithFailoverTransport() throws Throwable
{
         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageConsumer consumer = session.createConsumer(destination);
 
@@ -57,7 +57,46 @@
         assertEquals("redelivered message", id, message.getJMSMessageID());
         assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
     }
+    
+    public void testVerifyConsumerAndSessionCloseRedeliveryWithFailoverTransport() throws
Throwable {
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        Message message = consumer.receive(1000);
+        String id = message.getJMSMessageID();
+        assertNotNull(message);
+        LOG.info("got message " + message);
+        consumer.close();
+        session.close();
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        consumer = session.createConsumer(destination);
+
+        message = consumer.receive(1000);
+        session.commit();
+        assertNotNull(message);
+        assertEquals("redelivered message", id, message.getJMSMessageID());
+        assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+    }
 
+    public void testVerifyConsumerCloseSessionRollbackRedeliveryWithFailoverTransport() throws
Throwable {
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        Message message = consumer.receive(1000);
+        String id = message.getJMSMessageID();
+        assertNotNull(message);
+        LOG.info("got message " + message);
+        consumer.close();
+        session.rollback();
+        
+        consumer = session.createConsumer(destination);
+        message = consumer.receive(1000);
+        session.commit();
+        assertNotNull(message);
+        assertEquals("redelivered message", id, message.getJMSMessageID());
+        assertEquals(3, message.getLongProperty("JMSXDeliveryCount"));
+    }
+    
     protected void setUp() throws Exception {
         super.setUp();
 

Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java?rev=726764&r1=726763&r2=726764&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java Mon Dec 15
10:48:51 2008
@@ -25,7 +25,9 @@
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -188,6 +190,94 @@
     }
 
 
+    public void testMessageExceptionReDelivery() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+        adapter.setServerUrl("vm://localhost?broker.persistent=false");
+        adapter.start(new StubBootstrapContext());
+
+        final CountDownLatch messageDelivered = new CountDownLatch(2);
+
+        final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+            public void onMessage(Message message) {
+                super.onMessage(message);
+                try {
+                    messageDelivered.countDown();
+                    if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
+                        throw new RuntimeException(getName() + " ex on first delivery");
+                    } else {
+                        try {
+                            assertTrue(message.getJMSRedelivered());
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                } catch (InterruptedException ignored) {
+                }
+            };
+            
+            public void afterDelivery() throws ResourceException {
+                try {
+                    if (!messageDelivered.await(1, TimeUnit.MILLISECONDS)) {
+                        xaresource.end(xid, XAResource.TMFAIL);
+                        xaresource.rollback(xid);
+                    } else {
+                        xaresource.end(xid, XAResource.TMSUCCESS);
+                        xaresource.prepare(xid);
+                        xaresource.commit(xid, false);
+                    }
+                } catch (Throwable e) {
+                    throw new ResourceException(e);
+                }
+            }
+        };
+
+        ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+        activationSpec.setDestinationType(Queue.class.getName());
+        activationSpec.setDestination("TEST");
+        activationSpec.setResourceAdapter(adapter);
+        activationSpec.validate();
+
+        MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+            public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException
{
+                endpoint.xaresource = resource;
+                return endpoint;
+            }
+
+            public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException
{
+                return true;
+            }
+        };
+
+        // Activate an Endpoint
+        adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+        // Give endpoint a chance to setup and register its listeners
+        try {
+            Thread.sleep(1000);
+        } catch (Exception e) {
+
+        }
+
+        // Send the broker a message to that endpoint
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+        producer.send(session.createTextMessage("Hello!"));
+        connection.close();
+
+        // Wait for the message to be delivered twice.
+        assertTrue(messageDelivered.await(10000, TimeUnit.MILLISECONDS));
+
+        // Shut the Endpoint down.
+        adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+        adapter.stop();
+
+    }
+
+
     public Xid createXid() throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream os = new DataOutputStream(baos);



Mime
View raw message