cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject [3/5] cxf git commit: CXF-6199 Separate XA and local transaction handling
Date Mon, 26 Jan 2015 10:23:19 GMT
CXF-6199 Separate XA and local transaction handling


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/592cd43b
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/592cd43b
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/592cd43b

Branch: refs/heads/3.0.x-fixes
Commit: 592cd43b2bf0a1ae4483a1a72798e4ab238ffa9e
Parents: 15fa235
Author: Christian Schneider <chris@die-schneider.net>
Authored: Tue Jan 20 00:07:57 2015 +0100
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Mon Jan 26 10:41:47 2015 +0100

----------------------------------------------------------------------
 .../util/PollingMessageListenerContainer.java   | 113 +++++++++++++------
 .../transport/jms/util/MessageListenerTest.java |   2 +-
 2 files changed, 79 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/592cd43b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 9c2e29e..8c68338 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -26,6 +26,7 @@ import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -56,34 +57,25 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                 MessageConsumer consumer = null;
                 Session session = null;
                 try {
-                    if (transactionManager != null) {
-                        transactionManager.begin();
-                    }
-                    
+                    // Create session early to optimize performance
                     session = connection.createSession(transacted, acknowledgeMode);
-                    if (durableSubscriptionName != null && destination instanceof
Topic) {
-                        consumer = session.createDurableSubscriber((Topic)destination, 
-                                                                   durableSubscriptionName,
-                                                                   messageSelector,
-                                                                   pubSubNoLocal);
-                    } else {
-                        consumer = session.createConsumer(destination, messageSelector);
-                    }
-                    Message message = consumer.receive(1000);
-                    try {
-                        if (message != null) {
-                            listenerHandler.onMessage(message);
-                        }
-                        if (transactionManager != null) {
-                            transactionManager.commit();
-                        } else if (session.getTransacted()) {
-                            session.commit();
+                    consumer = createConsumer(session);
+                    while (running) {
+                        Message message = consumer.receive(1000);
+                        try {
+                            if (message != null) {
+                                listenerHandler.onMessage(message);
+                            }
+                            if (session.getTransacted()) {
+                                session.commit();
+                            }
+                        } catch (Exception e) {
+                            LOG.log(Level.WARNING, "Exception while processing jms message
in cxf. Rolling back", e);
+                            safeRollBack(session, e);
                         }
-                    } catch (Exception e) {
-                        safeRollBack(session, e);
                     }
                 } catch (Exception e) {
-                    LOG.log(Level.WARNING, "Unexpected exception", e);
+                    LOG.log(Level.WARNING, "Unexpected exception. Restarting session and
consumer", e);
                 } finally {
                     ResourceCloser.close(consumer);
                     ResourceCloser.close(session);
@@ -91,21 +83,71 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
             }
 
         }
+        
+        private void safeRollBack(Session session, Exception e) {
+            try {
+                if (session.getTransacted()) {
+                    session.rollback();
+                }
+            } catch (Exception e1) {
+                LOG.log(Level.WARNING, "Rollback of Local transaction failed", e1);
+            }
+        }
 
     }
+    
+    private class XAPoller implements Runnable {
 
-    private void safeRollBack(Session session, Exception e) {
-        LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back",
e);
-        try {
-            if (transactionManager != null) {
-                transactionManager.rollback();
-            } else {
-                if (session.getTransacted()) {
-                    session.rollback();
+        @Override
+        public void run() {
+            while (running) {
+                MessageConsumer consumer = null;
+                Session session = null;
+                try {
+                    transactionManager.begin();
+                    /*
+                     * Create session inside transaction to give it the 
+                     * chance to enlist itself as a resource
+                     */
+                    session = connection.createSession(transacted, acknowledgeMode);
+                    consumer = createConsumer(session);
+                    Message message = consumer.receive(1000);
+                    try {
+                        if (message != null) {
+                            listenerHandler.onMessage(message);
+                        }
+                        transactionManager.commit();
+                    } catch (Exception e) {
+                        LOG.log(Level.WARNING, "Exception while processing jms message in
cxf. Rolling back", e);
+                        safeRollBack(session);
+                    } finally {
+                        ResourceCloser.close(consumer);
+                        ResourceCloser.close(session);
+                    }
+                } catch (Exception e) {
+                    LOG.log(Level.WARNING, "Unexpected exception. Restarting session and
consumer", e);
                 }
+
+            }
+
+        }
+        
+        private void safeRollBack(Session session) {
+            try {
+                transactionManager.rollback();
+            } catch (Exception e) {
+                LOG.log(Level.WARNING, "Rollback of XA transaction failed", e);
             }
-        } catch (Exception e1) {
-            LOG.log(Level.WARNING, "Rollback of Local transaction failed", e1);
+        }
+
+    }
+    
+    private MessageConsumer createConsumer(Session session) throws JMSException {
+        if (durableSubscriptionName != null && destination instanceof Topic) {
+            return session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
+                                                   messageSelector, pubSubNoLocal);
+        } else {
+            return session.createConsumer(destination, messageSelector);
         }
     }
 
@@ -117,7 +159,8 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
         running = true;
         pollers = Executors.newFixedThreadPool(concurrentConsumers);
         for (int c = 0; c < concurrentConsumers; c++) {
-            pollers.execute(new Poller());
+            Runnable poller = (transactionManager != null) ? new XAPoller() : new Poller();

+            pollers.execute(poller);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/592cd43b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index 2484277..8b635e8 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -173,7 +173,7 @@ public class MessageListenerTest {
             //                   + ", expecting: " + expectedNum);
             Thread.sleep(100);
         } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum
!= actualNum);
-        Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum);
+        Assert.assertEquals(message + " -> number of messages on queue", expectedNum,
actualNum);
     }
 
     private void sendMessage(Connection connection, Destination dest, String content) throws
JMSException,


Mime
View raw message