cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [3/3] git commit: CXF-5543 Fixing tests
Date Fri, 04 Apr 2014 07:15:31 GMT
CXF-5543 Fixing tests


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/7c7fff78
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/7c7fff78
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/7c7fff78

Branch: refs/heads/master
Commit: 7c7fff780a00adffbc1c67be4457932d8028c62f
Parents: 5c2c2c7
Author: Christian Schneider <chris@die-schneider.net>
Authored: Fri Apr 4 09:15:16 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Fri Apr 4 09:15:16 2014 +0200

----------------------------------------------------------------------
 .../jms/util/MessageListenerContainer.java      | 255 +++++++++++++++++++
 .../transport/jms/util/MessageListenerTest.java |  32 ++-
 2 files changed, 275 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/7c7fff78/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
new file mode 100644
index 0000000..6521289
--- /dev/null
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.TransactionManager;
+
+import org.apache.cxf.common.logging.LogUtils;
+
+public class MessageListenerContainer implements JMSListenerContainer {
+    private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
+
+    private Connection connection;
+    private Destination destination;
+    private MessageListener listenerHandler;
+    private boolean transacted;
+    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+    private String messageSelector;
+    private boolean running;
+    private MessageConsumer consumer;
+    private Session session;
+    private Executor executor;
+    private String durableSubscriptionName;
+    private boolean pubSubNoLocal;
+    private TransactionManager transactionManager;
+
+    public MessageListenerContainer(Connection connection, Destination destination,
+                                    MessageListener listenerHandler) {
+        this.connection = connection;
+        this.destination = destination;
+        this.listenerHandler = listenerHandler;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
+    public void setAcknowledgeMode(int acknowledgeMode) {
+        this.acknowledgeMode = acknowledgeMode;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    protected Executor getExecutor() {
+        if (executor == null) {
+            executor = Executors.newFixedThreadPool(10);
+        }
+        return executor;
+    }
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+    public void setDurableSubscriptionName(String durableSubscriptionName) {
+        this.durableSubscriptionName = durableSubscriptionName;
+    }
+
+    public void setPubSubNoLocal(boolean pubSubNoLocal) {
+        this.pubSubNoLocal = pubSubNoLocal;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    @Override
+    public void start() {
+        try {
+            session = connection.createSession(transacted, acknowledgeMode);
+            if (durableSubscriptionName != null && destination instanceof Topic)
{
+                consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
+                                                           messageSelector, pubSubNoLocal);
+            } else {
+                consumer = session.createConsumer(destination, messageSelector);
+            }
+            
+            MessageListener intListener = (transactionManager != null)
+                ? new XATransactionalMessageListener(transactionManager, session, listenerHandler)
+                : new LocalTransactionalMessageListener(session, listenerHandler); 
+            // new DispachingListener(getExecutor(), listenerHandler);
+            consumer.setMessageListener(intListener);
+            
+            running = true;
+        } catch (JMSException e) {
+            throw JMSUtil.convertJmsException(e);
+        }
+    }
+
+    @Override
+    public void stop() {
+        running = false;
+        ResourceCloser.close(consumer);
+        ResourceCloser.close(session);
+        consumer = null;
+        session = null;
+    }
+
+    @Override
+    public void shutdown() {
+        stop();
+        ResourceCloser.close(connection);
+    }
+
+    protected TransactionManager getTransactionManager() {
+        if (this.transactionManager == null) {
+            try {
+                InitialContext ctx = new InitialContext();
+                this.transactionManager = (TransactionManager)ctx
+                    .lookup("javax.transaction.TransactionManager");
+            } catch (NamingException e) {
+                // Ignore
+            }
+        }
+        return this.transactionManager;
+    }
+
+    static class DispachingListener implements MessageListener {
+        private Executor executor;
+        private MessageListener listenerHandler;
+
+        public DispachingListener(Executor executor, MessageListener listenerHandler) {
+            this.executor = executor;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(final Message message) {
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    listenerHandler.onMessage(message);
+                }
+
+            });
+        }
+
+    }
+    
+    static class LocalTransactionalMessageListener implements MessageListener {
+        private MessageListener listenerHandler;
+        private Session session;
+        
+        public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler)
{
+            this.session = session;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                listenerHandler.onMessage(message);
+                if (session.getTransacted()) {
+                    session.commit();
+                }
+            } catch (Throwable e) {
+                safeRollback(e);
+            }
+        }
+        
+        private void safeRollback(Throwable t) {
+            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
+            try {
+                session.rollback();
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Rollback of Local transaction failed", e);
+            }
+        }
+        
+    }
+    
+    @SuppressWarnings("PMD")
+    static class XATransactionalMessageListener implements MessageListener {
+        private TransactionManager tm;
+        private MessageListener listenerHandler;
+        private XASession session;
+        
+        public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
+            if (tm == null) {
+                throw new IllegalArgumentException("Must supply a transaction manager");
+            }
+            if (session == null || !(session instanceof XASession)) {
+                throw new IllegalArgumentException("Must supply an XASession");
+            }
+            this.tm = tm;
+            this.session = (XASession)session;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                tm.begin();
+                tm.getTransaction().enlistResource(session.getXAResource());
+                listenerHandler.onMessage(message);
+                tm.commit();
+            } catch (Throwable e) {
+                safeRollback(e);
+            }
+        }
+        
+        private void safeRollback(Throwable t) {
+            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
+            try {
+                tm.rollback();
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e);
+            }
+        }
+        
+    }
+}

http://git-wip-us.apache.org/repos/asf/cxf/blob/7c7fff78/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index fec9536..c1bf86a 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -37,6 +37,7 @@ import javax.transaction.xa.XAException;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQXAConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.pool.XaPooledConnectionFactory;
 import org.apache.aries.transaction.internal.AriesTransactionManagerImpl;
 import org.junit.Assert;
 import org.junit.Test;
@@ -49,14 +50,16 @@ public class MessageListenerTest {
 
     @Test
     public void testWithJTA() throws JMSException, XAException, InterruptedException {
-        Connection connection = createXAConnection("brokerJTA");
+        TransactionManager transactionManager = new AriesTransactionManagerImpl();
+        Connection connection = createXAConnection("brokerJTA", transactionManager);
         Queue dest = createQueue(connection, "test");
 
         MessageListener listenerHandler = new TestMessageListener();
-        MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
+        PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
dest,
+                                                                                        listenerHandler);
         container.setTransacted(false);
         container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
-        TransactionManager transactionManager = new AriesTransactionManagerImpl();
+
         container.setTransactionManager(transactionManager);
         container.start();
 
@@ -72,7 +75,8 @@ public class MessageListenerTest {
         Queue dest = createQueue(connection, "test");
 
         MessageListener listenerHandler = new TestMessageListener();
-        MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
+        PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
dest,
+                                                                                        listenerHandler);
         container.setTransacted(false);
         container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
         container.start();
@@ -107,17 +111,18 @@ public class MessageListenerTest {
 
     private void testTransactionalBehaviour(Connection connection, Queue dest) throws JMSException,
         InterruptedException {
+        Queue dlq = createQueue(connection, "ActiveMQ.DLQ");
         assertNumMessagesInQueue("At the start the queue should be empty", connection, dest,
0, 0);
+        assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq,
0, 0);
 
         sendMessage(connection, dest, OK);
         assertNumMessagesInQueue("This message should be committed", connection, dest, 0,
1000);
 
         sendMessage(connection, dest, FAILFIRST);
-        assertNumMessagesInQueue("Should be rolled back on first try", connection, dest,
1, 800);
         assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000);
 
-        sendMessage(connection, dest, "Fail");
-        assertNumMessagesInQueue("Should be rolled back", connection, dest, 1, 1000);
+        sendMessage(connection, dest, FAIL);
+        assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 1000);
     }
 
     private Connection createConnection(String name) throws JMSException {
@@ -129,11 +134,14 @@ public class MessageListenerTest {
         return connection;
     }
 
-    private Connection createXAConnection(String name) throws JMSException {
+    private Connection createXAConnection(String name, TransactionManager tm) throws JMSException
{
         ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://" + name
                                                                          + "?broker.persistent=false");
         cf.setRedeliveryPolicy(redeliveryPolicy());
-        Connection connection = cf.createXAConnection();
+        XaPooledConnectionFactory cfp = new XaPooledConnectionFactory(cf);
+        cfp.setTransactionManager(tm);
+        cfp.setConnectionFactory(cf);
+        Connection connection = cfp.createConnection();
         connection.start();
         return connection;
     }
@@ -141,8 +149,7 @@ public class MessageListenerTest {
     private RedeliveryPolicy redeliveryPolicy() {
         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         redeliveryPolicy.setRedeliveryDelay(1000);
-        redeliveryPolicy.setMaximumRedeliveries(3);
-        redeliveryPolicy.setUseExponentialBackOff(false);
+        redeliveryPolicy.setMaximumRedeliveries(1);
         return redeliveryPolicy;
     }
 
@@ -164,7 +171,8 @@ public class MessageListenerTest {
         int actualNum;
         do {
             actualNum = getNumMessages(connection, queue);
-            System.out.println("Messages in queue: " + actualNum + ", expecting: " + expectedNum);
+            System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum
+                               + ", expecting: " + expectedNum);
             Thread.sleep(100);
         } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum
!= actualNum);
         Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum);


Mime
View raw message