cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: CXF-5543 Made tests more solid
Date Thu, 03 Apr 2014 09:23:17 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 12e8613a9 -> c62ac164b


CXF-5543 Made tests more solid


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/c62ac164
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/c62ac164
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/c62ac164

Branch: refs/heads/master
Commit: c62ac164b892b2a7d075eb936ceee9dc935f4b7f
Parents: 12e8613
Author: Christian Schneider <chris@die-schneider.net>
Authored: Thu Apr 3 11:23:09 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Thu Apr 3 11:23:09 2014 +0200

----------------------------------------------------------------------
 .../jms/util/MessageListenerContainer.java      |  44 ++++-
 .../transport/jms/util/MessageListenerTest.java | 160 ++++++++++++-------
 2 files changed, 138 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/c62ac164/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
index fd49d4a..01d9ae7 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -78,7 +78,7 @@ public class MessageListenerContainer implements JMSListenerContainer {
         this.messageSelector = messageSelector;
     }
 
-    private Executor getExecutor() {
+    protected Executor getExecutor() {
         if (executor == null) {
             executor = Executors.newFixedThreadPool(10);
         }
@@ -118,8 +118,9 @@ public class MessageListenerContainer implements JMSListenerContainer
{
             }
             
             MessageListener intListener = (transactionManager != null)
-                ? new TransactionalMessageListener(transactionManager, session, listenerHandler)
-                : new DispachingListener(getExecutor(), listenerHandler);
+                ? new XATransactionalMessageListener(transactionManager, session, listenerHandler)
+                : new LocalTransactionalMessageListener(session, listenerHandler); 
+            // new DispachingListener(getExecutor(), listenerHandler);
             consumer.setMessageListener(intListener);
             
             running = true;
@@ -163,7 +164,6 @@ public class MessageListenerContainer implements JMSListenerContainer
{
         public DispachingListener(Executor executor, MessageListener listenerHandler) {
             this.executor = executor;
             this.listenerHandler = listenerHandler;
-
         }
 
         @Override
@@ -180,13 +180,43 @@ public class MessageListenerContainer implements JMSListenerContainer
{
 
     }
     
+    static class LocalTransactionalMessageListener implements MessageListener {
+        private MessageListener listenerHandler;
+        private Session session;
+        
+        public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler)
{
+            this.session = session;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                listenerHandler.onMessage(message);
+                session.commit();
+            } catch (Throwable e) {
+                safeRollback(e);
+            }
+        }
+        
+        private void safeRollback(Throwable t) {
+            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
+            try {
+                session.rollback();
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Rollback of Local transaction failed", e);
+            }
+        }
+        
+    }
+    
     @SuppressWarnings("PMD")
-    static class TransactionalMessageListener implements MessageListener {
+    static class XATransactionalMessageListener implements MessageListener {
         private TransactionManager tm;
         private MessageListener listenerHandler;
         private XASession session;
         
-        public TransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
+        public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
             if (tm == null) {
                 throw new IllegalArgumentException("Must supply a transaction manager");
             }
@@ -211,7 +241,7 @@ public class MessageListenerContainer implements JMSListenerContainer
{
         }
         
         private void safeRollback(Throwable t) {
-            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back");
+            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
             try {
                 tm.rollback();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/c62ac164/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index a800e1c..fec9536 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -21,7 +21,6 @@ package org.apache.cxf.transport.jms.util;
 import java.util.Enumeration;
 
 import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -32,90 +31,122 @@ import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.XAConnectionFactory;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.XAException;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
 import org.apache.aries.transaction.internal.AriesTransactionManagerImpl;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class MessageListenerTest {
 
+    private static final String FAIL = "fail";
+    private static final String FAILFIRST = "failfirst";
     private static final String OK = "ok";
 
     @Test
-    @Ignore
     public void testWithJTA() throws JMSException, XAException, InterruptedException {
-        Connection connection = createConnection();
+        Connection connection = createXAConnection("brokerJTA");
         Queue dest = createQueue(connection, "test");
-        
+
         MessageListener listenerHandler = new TestMessageListener();
         MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
         container.setTransacted(false);
+        container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
         TransactionManager transactionManager = new AriesTransactionManagerImpl();
         container.setTransactionManager(transactionManager);
         container.start();
-        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest,
0);
-        synchronized (listenerHandler) {
-            sendMessage(connection, dest, OK);
-            listenerHandler.wait();
-        }
-        Thread.sleep(500);
-        assertNumMessagesInQueue("This message should be committed", connection, dest, 0);
-        synchronized (listenerHandler) {
-            sendMessage(connection, dest, "Fail");
-            listenerHandler.wait();
-        }
-        Thread.sleep(500);
-        assertNumMessagesInQueue("First try should do rollback", connection, dest, 1);
-        Thread.sleep(500);
-        assertNumMessagesInQueue("Second try should work", connection, dest, 0);
-        
+
+        testTransactionalBehaviour(connection, dest);
+
         container.stop();
         connection.close();
     }
-    
+
     @Test
     public void testNoTransaction() throws JMSException, XAException, InterruptedException
{
-        ConnectionFactory cf = new ActiveMQConnectionFactory("vm://broker1?broker.persistent=false");
-        Connection connection = cf.createConnection();
-        connection.start();
+        Connection connection = createConnection("brokerNoTransaction");
         Queue dest = createQueue(connection, "test");
-       
+
         MessageListener listenerHandler = new TestMessageListener();
         MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
         container.setTransacted(false);
         container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
         container.start();
-        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest,
0);
-        synchronized (listenerHandler) {
-            sendMessage(connection, dest, OK);
-            listenerHandler.wait();
-        }
-        Thread.sleep(500);
-        assertNumMessagesInQueue("This message should be committed", connection, dest, 0);
-        synchronized (listenerHandler) {
-            sendMessage(connection, dest, "Fail");
-            listenerHandler.wait();
-        }
-        Thread.sleep(500);
-        assertNumMessagesInQueue("Even when an exception occurs the message should be committed",
connection, dest, 0);
+
+        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest,
0, 0);
+
+        sendMessage(connection, dest, OK);
+        assertNumMessagesInQueue("This message should be committed", connection, dest, 0,
1000);
+
+        sendMessage(connection, dest, FAIL);
+        assertNumMessagesInQueue("Even when an exception occurs the message should be committed",
connection,
+                                 dest, 0, 1000);
+
+        container.stop();
+        connection.close();
+    }
+
+    @Test
+    public void testLocalTransaction() throws JMSException, XAException, InterruptedException
{
+        Connection connection = createConnection("brokerLocalTransaction");
+        Queue dest = createQueue(connection, "test");
+        MessageListener listenerHandler = new TestMessageListener();
+        MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
+        container.setTransacted(true);
+        container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
+        container.start();
+
+        testTransactionalBehaviour(connection, dest);
         container.stop();
         connection.close();
     }
 
-    private Connection createConnection() throws JMSException {
-        XAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://broker2?broker.persistent=false");
+    private void testTransactionalBehaviour(Connection connection, Queue dest) throws JMSException,
+        InterruptedException {
+        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest,
0, 0);
+
+        sendMessage(connection, dest, OK);
+        assertNumMessagesInQueue("This message should be committed", connection, dest, 0,
1000);
+
+        sendMessage(connection, dest, FAILFIRST);
+        assertNumMessagesInQueue("Should be rolled back on first try", connection, dest,
1, 800);
+        assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000);
+
+        sendMessage(connection, dest, "Fail");
+        assertNumMessagesInQueue("Should be rolled back", connection, dest, 1, 1000);
+    }
+
+    private Connection createConnection(String name) throws JMSException {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://" + name
+                                                                     + "?broker.persistent=false");
+        cf.setRedeliveryPolicy(redeliveryPolicy());
+        Connection connection = cf.createConnection();
+        connection.start();
+        return connection;
+    }
+
+    private Connection createXAConnection(String name) throws JMSException {
+        ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://" + name
+                                                                         + "?broker.persistent=false");
+        cf.setRedeliveryPolicy(redeliveryPolicy());
         Connection connection = cf.createXAConnection();
         connection.start();
         return connection;
     }
 
-    protected void drainQueue(Connection connection, Queue dest) throws JMSException {
+    private RedeliveryPolicy redeliveryPolicy() {
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        redeliveryPolicy.setRedeliveryDelay(1000);
+        redeliveryPolicy.setMaximumRedeliveries(3);
+        redeliveryPolicy.setUseExponentialBackOff(false);
+        return redeliveryPolicy;
+    }
+
+    protected void drainQueue(Connection connection, Queue dest) throws JMSException, InterruptedException
{
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createConsumer(dest);
         while (consumer.receiveNoWait() != null) {
@@ -123,13 +154,23 @@ public class MessageListenerTest {
         }
         consumer.close();
         session.close();
-        assertNumMessagesInQueue("", connection, dest, 0);
+        assertNumMessagesInQueue("", connection, dest, 0, 0);
+    }
+
+    private void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
+                                          int expectedNum, int timeout) throws JMSException,
+        InterruptedException {
+        long startTime = System.currentTimeMillis();
+        int actualNum;
+        do {
+            actualNum = getNumMessages(connection, queue);
+            System.out.println("Messages in queue: " + actualNum + ", expecting: " + expectedNum);
+            Thread.sleep(100);
+        } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum
!= actualNum);
+        Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum);
     }
 
-    private void assertNumMessagesInQueue(String message, 
-                                          Connection connection, 
-                                          Queue queue, 
-                                          int expectedNum) throws JMSException {
+    private int getNumMessages(Connection connection, Queue queue) throws JMSException {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         QueueBrowser browser = session.createBrowser(queue);
         @SuppressWarnings("unchecked")
@@ -141,16 +182,18 @@ public class MessageListenerTest {
         }
         browser.close();
         session.close();
-        Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum);
+        return actualNum;
     }
 
-    private void sendMessage(Connection connection, Destination dest, String content) throws
JMSException {
+    private void sendMessage(Connection connection, Destination dest, String content) throws
JMSException,
+        InterruptedException {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = session.createProducer(dest);
         Message message = session.createTextMessage(content);
         prod.send(message);
         prod.close();
         session.close();
+        Thread.sleep(500); // Give receiver some time to process
     }
 
     private Queue createQueue(Connection connection, String name) throws JMSException {
@@ -162,29 +205,28 @@ public class MessageListenerTest {
             session.close();
         }
     }
-    
+
     private static final class TestMessageListener implements MessageListener {
         @Override
         public void onMessage(Message message) {
-            TextMessage textMessage = (TextMessage) message;
+            TextMessage textMessage = (TextMessage)message;
             try {
                 String text = textMessage.getText();
-                if (MessageListenerTest.OK.equals(text)) {
+                if (OK.equals(text)) {
                     System.out.println("Simulating Processing successful");
-                } else {
+                } else if (FAIL.equals(text)) {
+                    throw new RuntimeException("Simulating something went wrong. Expecting
rollback");
+                } else if (FAILFIRST.equals(text)) {
                     if (message.getJMSRedelivered()) {
                         System.out.println("Simulating processing worked on second try");
                     } else {
                         throw new RuntimeException("Simulating something went wrong. Expecting
rollback");
                     }
+                } else {
+                    throw new IllegalArgumentException("Invalid message type");
                 }
             } catch (JMSException e) {
                 // Ignore
-            } finally {
-                synchronized (this) {
-                    this.notifyAll();
-                }
-
             }
         }
     }


Mime
View raw message