cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: CXF-5543 Adding experimental XA transaction support
Date Wed, 02 Apr 2014 19:34:18 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 0918a1bc9 -> 0739807fb


CXF-5543 Adding experimental XA transaction support


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/0739807f
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/0739807f
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/0739807f

Branch: refs/heads/master
Commit: 0739807fbc6f3b3e24aca2b517e34f1b9354d34e
Parents: 0918a1b
Author: Christian Schneider <chris@die-schneider.net>
Authored: Wed Apr 2 21:34:10 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Wed Apr 2 21:34:10 2014 +0200

----------------------------------------------------------------------
 rt/transports/jms/pom.xml                       |   6 +
 .../cxf/transport/jms/JMSDestination.java       |  10 +-
 .../jms/util/JMSListenerContainer.java          |   3 -
 .../jms/util/MessageListenerContainer.java      | 119 ++++++++++--
 .../transport/jms/util/MessageListenerTest.java | 191 +++++++++++++++++++
 5 files changed, 303 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/pom.xml
----------------------------------------------------------------------
diff --git a/rt/transports/jms/pom.xml b/rt/transports/jms/pom.xml
index e2ff1e0..e2224f6 100644
--- a/rt/transports/jms/pom.xml
+++ b/rt/transports/jms/pom.xml
@@ -119,6 +119,12 @@
             <artifactId>slf4j-jdk14</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+        	<groupId>org.apache.aries.transaction</groupId>
+        	<artifactId>org.apache.aries.transaction.manager</artifactId>
+        	<version>1.1.0</version>
+        	<scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 76f857d..7c529ef 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -21,6 +21,7 @@ package org.apache.cxf.transport.jms;
 
 import java.io.UnsupportedEncodingException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -62,6 +63,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
     private JMSListenerContainer jmsListener;
     private ThrottlingCounter suspendedContinuations;
     private ClassLoader loader;
+    private Connection connection;
 
     public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
         super(b, getTargetReference(info, b), info);
@@ -77,7 +79,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
      * @return the inbuilt backchannel
      */
     protected Conduit getInbuiltBackChannel(Message inMessage) {
-        return new BackChannelConduit(inMessage, jmsConfig, jmsListener.getConnection());
+        return new BackChannelConduit(inMessage, jmsConfig, connection);
     }
 
     /**
@@ -98,13 +100,15 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
     private JMSListenerContainer createTargetDestinationListener() {
         Session session = null;
         try {
-            Connection connection = JMSFactory.createConnection(jmsConfig);
+            connection = JMSFactory.createConnection(jmsConfig);
             connection.start();
             session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE);
             Destination destination = jmsConfig.getTargetDestination(session);
             MessageListenerContainer container = new MessageListenerContainer(connection,
destination, this);
             container.setMessageSelector(jmsConfig.getMessageSelector());
-            Executor executor = JMSFactory.createExecutor(bus, "jms-destination");
+            
+            Executor executor = Executors.newFixedThreadPool(20); 
+                //JMSFactory.createExecutor(bus, "jms-destination");
             container.setExecutor(executor);
             container.start();
             return container;

http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
index 7adf956..4398dd8 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
@@ -19,13 +19,10 @@
 
 package org.apache.cxf.transport.jms.util;
 
-import javax.jms.Connection;
 
 public interface JMSListenerContainer {
     boolean isRunning();
     void stop();
     void start();
     void shutdown();
-    
-    Connection getConnection();
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
index bd9a2b0..dfc641d 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -20,6 +20,8 @@ package org.apache.cxf.transport.jms.util;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -29,11 +31,20 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.jms.Topic;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+import org.apache.cxf.common.logging.LogUtils;
 
 public class MessageListenerContainer implements JMSListenerContainer {
-    
+    private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
+
     private Connection connection;
-    private Destination replyTo;
+    private Destination destination;
     private MessageListener listenerHandler;
     private boolean transacted;
     private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
@@ -44,19 +55,19 @@ public class MessageListenerContainer implements JMSListenerContainer
{
     private Executor executor;
     private String durableSubscriptionName;
     private boolean pubSubNoLocal;
+    private TransactionManager transactionManager;
 
-    public MessageListenerContainer(Connection connection, 
-                                    Destination replyTo,
+    public MessageListenerContainer(Connection connection, Destination destination,
                                     MessageListener listenerHandler) {
         this.connection = connection;
-        this.replyTo = replyTo;
+        this.destination = destination;
         this.listenerHandler = listenerHandler;
     }
-    
+
     public Connection getConnection() {
         return connection;
     }
-    
+
     public void setTransacted(boolean transacted) {
         this.transacted = transacted;
     }
@@ -68,7 +79,7 @@ public class MessageListenerContainer implements JMSListenerContainer {
     public void setMessageSelector(String messageSelector) {
         this.messageSelector = messageSelector;
     }
-    
+
     private Executor getExecutor() {
         if (executor == null) {
             executor = Executors.newFixedThreadPool(10);
@@ -92,18 +103,27 @@ public class MessageListenerContainer implements JMSListenerContainer
{
     public boolean isRunning() {
         return running;
     }
-    
+
+    public void setTransactionManager(TransactionManager transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
     @Override
     public void start() {
         try {
             session = connection.createSession(transacted, acknowledgeMode);
             if (durableSubscriptionName != null) {
-                consumer = session.createDurableSubscriber((Topic)replyTo, durableSubscriptionName,
+                consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
                                                            messageSelector, pubSubNoLocal);
             } else {
-                consumer = session.createConsumer(replyTo, messageSelector);
+                consumer = session.createConsumer(destination, messageSelector);
             }
-            consumer.setMessageListener(listenerHandler);
+            
+            MessageListener intListener = (transactionManager != null)
+                ? new TransactionalMessageListener(transactionManager, session, listenerHandler)
+                : new DispachingListener(getExecutor(), listenerHandler);
+            consumer.setMessageListener(intListener);
+            
             running = true;
         } catch (JMSException e) {
             throw JMSUtil.convertJmsException(e);
@@ -125,22 +145,81 @@ public class MessageListenerContainer implements JMSListenerContainer
{
         ResourceCloser.close(connection);
     }
 
-    class DispachingListener implements MessageListener {
+    protected TransactionManager getTransactionManager() {
+        if (this.transactionManager == null) {
+            try {
+                InitialContext ctx = new InitialContext();
+                this.transactionManager = (TransactionManager)ctx
+                    .lookup("javax.transaction.TransactionManager");
+            } catch (NamingException e) {
+                // Ignore
+            }
+        }
+        return this.transactionManager;
+    }
+
+    static class DispachingListener implements MessageListener {
+        private Executor executor;
+        private MessageListener listenerHandler;
+
+        public DispachingListener(Executor executor, MessageListener listenerHandler) {
+            this.executor = executor;
+            this.listenerHandler = listenerHandler;
+
+        }
 
         @Override
         public void onMessage(final Message message) {
-            getExecutor().execute(new Runnable() {
-                
+            executor.execute(new Runnable() {
+
                 @Override
                 public void run() {
-                    try {
-                        listenerHandler.onMessage(message);
-                    } catch (Exception e) {
-                        // Ignore
-                    }
+                    listenerHandler.onMessage(message);
                 }
+
             });
         }
+
+    }
+    
+    static class TransactionalMessageListener implements MessageListener {
+        private TransactionManager tm;
+        private MessageListener listenerHandler;
+        private Session session;
+        
+        public TransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
+            this.tm = tm;
+            this.session = session;
+            this.listenerHandler = listenerHandler;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            if (tm == null || !(session instanceof XASession)) {
+                listenerHandler.onMessage(message);
+                return;
+            }
+            try {
+                XASession xaSession = (XASession)session; // TODO check cast
+                tm.begin();
+                Transaction tr = tm.getTransaction();
+                XAResource res = xaSession.getXAResource();
+                tr.enlistResource(res);
+                listenerHandler.onMessage(message);
+                tm.commit();
+            } catch (Throwable e) {
+                safeRollback(e);
+            }
+        }
+        
+        private void safeRollback(Throwable t) {
+            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back");
+            try {
+                tm.rollback();
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e);
+            }
+        }
         
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/0739807f/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
new file mode 100644
index 0000000..a800e1c
--- /dev/null
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnectionFactory;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQXAConnectionFactory;
+import org.apache.aries.transaction.internal.AriesTransactionManagerImpl;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class MessageListenerTest {
+
+    private static final String OK = "ok";
+
+    @Test
+    @Ignore
+    public void testWithJTA() throws JMSException, XAException, InterruptedException {
+        Connection connection = createConnection();
+        Queue dest = createQueue(connection, "test");
+        
+        MessageListener listenerHandler = new TestMessageListener();
+        MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
+        container.setTransacted(false);
+        TransactionManager transactionManager = new AriesTransactionManagerImpl();
+        container.setTransactionManager(transactionManager);
+        container.start();
+        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest,
0);
+        synchronized (listenerHandler) {
+            sendMessage(connection, dest, OK);
+            listenerHandler.wait();
+        }
+        Thread.sleep(500);
+        assertNumMessagesInQueue("This message should be committed", connection, dest, 0);
+        synchronized (listenerHandler) {
+            sendMessage(connection, dest, "Fail");
+            listenerHandler.wait();
+        }
+        Thread.sleep(500);
+        assertNumMessagesInQueue("First try should do rollback", connection, dest, 1);
+        Thread.sleep(500);
+        assertNumMessagesInQueue("Second try should work", connection, dest, 0);
+        
+        container.stop();
+        connection.close();
+    }
+    
+    @Test
+    public void testNoTransaction() throws JMSException, XAException, InterruptedException
{
+        ConnectionFactory cf = new ActiveMQConnectionFactory("vm://broker1?broker.persistent=false");
+        Connection connection = cf.createConnection();
+        connection.start();
+        Queue dest = createQueue(connection, "test");
+       
+        MessageListener listenerHandler = new TestMessageListener();
+        MessageListenerContainer container = new MessageListenerContainer(connection, dest,
listenerHandler);
+        container.setTransacted(false);
+        container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+        container.start();
+        assertNumMessagesInQueue("At the start the queue should be empty", connection, dest,
0);
+        synchronized (listenerHandler) {
+            sendMessage(connection, dest, OK);
+            listenerHandler.wait();
+        }
+        Thread.sleep(500);
+        assertNumMessagesInQueue("This message should be committed", connection, dest, 0);
+        synchronized (listenerHandler) {
+            sendMessage(connection, dest, "Fail");
+            listenerHandler.wait();
+        }
+        Thread.sleep(500);
+        assertNumMessagesInQueue("Even when an exception occurs the message should be committed",
connection, dest, 0);
+        container.stop();
+        connection.close();
+    }
+
+    private Connection createConnection() throws JMSException {
+        XAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://broker2?broker.persistent=false");
+        Connection connection = cf.createXAConnection();
+        connection.start();
+        return connection;
+    }
+
+    protected void drainQueue(Connection connection, Queue dest) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(dest);
+        while (consumer.receiveNoWait() != null) {
+            System.out.println("Consuming old message");
+        }
+        consumer.close();
+        session.close();
+        assertNumMessagesInQueue("", connection, dest, 0);
+    }
+
+    private void assertNumMessagesInQueue(String message, 
+                                          Connection connection, 
+                                          Queue queue, 
+                                          int expectedNum) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueBrowser browser = session.createBrowser(queue);
+        @SuppressWarnings("unchecked")
+        Enumeration<Message> messages = browser.getEnumeration();
+        int actualNum = 0;
+        while (messages.hasMoreElements()) {
+            actualNum++;
+            messages.nextElement();
+        }
+        browser.close();
+        session.close();
+        Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum);
+    }
+
+    private void sendMessage(Connection connection, Destination dest, String content) throws
JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer prod = session.createProducer(dest);
+        Message message = session.createTextMessage(content);
+        prod.send(message);
+        prod.close();
+        session.close();
+    }
+
+    private Queue createQueue(Connection connection, String name) throws JMSException {
+        Session session = null;
+        try {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            return session.createQueue(name);
+        } finally {
+            session.close();
+        }
+    }
+    
+    private static final class TestMessageListener implements MessageListener {
+        @Override
+        public void onMessage(Message message) {
+            TextMessage textMessage = (TextMessage) message;
+            try {
+                String text = textMessage.getText();
+                if (MessageListenerTest.OK.equals(text)) {
+                    System.out.println("Simulating Processing successful");
+                } else {
+                    if (message.getJMSRedelivered()) {
+                        System.out.println("Simulating processing worked on second try");
+                    } else {
+                        throw new RuntimeException("Simulating something went wrong. Expecting
rollback");
+                    }
+                }
+            } catch (JMSException e) {
+                // Ignore
+            } finally {
+                synchronized (this) {
+                    this.notifyAll();
+                }
+
+            }
+        }
+    }
+}


Mime
View raw message