cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: CXF-5543 Fix problem with workqueue rejecting messages. Some simplification in MessageListenerContainer
Date Thu, 03 Apr 2014 08:20:36 GMT
Repository: cxf
Updated Branches:
  refs/heads/master c51282ea7 -> 12e8613a9


CXF-5543 Fix problem with workqueue rejecting messages. Some simplification in MessageListenerContainer


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/12e8613a
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/12e8613a
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/12e8613a

Branch: refs/heads/master
Commit: 12e8613a90c00b8d34cf624003e7549423639e2b
Parents: c51282e
Author: Christian Schneider <chris@die-schneider.net>
Authored: Thu Apr 3 10:20:26 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Thu Apr 3 10:20:26 2014 +0200

----------------------------------------------------------------------
 .../cxf/transport/jms/JMSDestination.java       |  5 +---
 .../apache/cxf/transport/jms/JMSFactory.java    | 27 +++++++++++++++-----
 .../jms/util/MessageListenerContainer.java      | 22 ++++++++--------
 3 files changed, 31 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/12e8613a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 7c529ef..cda7c5e 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -21,7 +21,6 @@ package org.apache.cxf.transport.jms;
 
 import java.io.UnsupportedEncodingException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -106,9 +105,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
             Destination destination = jmsConfig.getTargetDestination(session);
             MessageListenerContainer container = new MessageListenerContainer(connection,
destination, this);
             container.setMessageSelector(jmsConfig.getMessageSelector());
-            
-            Executor executor = Executors.newFixedThreadPool(20); 
-                //JMSFactory.createExecutor(bus, "jms-destination");
+            Executor executor = JMSFactory.createExecutor(bus, "jms-destination");
             container.setExecutor(executor);
             container.start();
             return container;

http://git-wip-us.apache.org/repos/asf/cxf/blob/12e8613a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
index 6ad7b55..f3ae29e 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
@@ -29,6 +29,8 @@ import javax.naming.NamingException;
 import org.apache.cxf.Bus;
 import org.apache.cxf.transport.jms.util.JMSSender;
 import org.apache.cxf.transport.jms.util.JndiHelper;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueue;
 import org.apache.cxf.workqueue.WorkQueueManager;
 
 /**
@@ -103,17 +105,28 @@ public final class JMSFactory {
         return connection;
     }
     
+    /**
+     * Get workqueue from workqueue manager. Return an executor that will never reject messages
and
+     * instead block when all threads are used.
+     * 
+     * @param bus
+     * @param name
+     * @return
+     */
     public static Executor createExecutor(Bus bus, String name) {
         WorkQueueManager manager = bus.getExtension(WorkQueueManager.class);
-        Executor workQueue;
         if (manager != null) {
-            workQueue = manager.getNamedWorkQueue(name);
-            if (workQueue == null) {
-                workQueue = manager.getAutomaticWorkQueue();
-            }
+            AutomaticWorkQueue workQueue1 = manager.getNamedWorkQueue(name);
+            final WorkQueue workQueue = (workQueue1 == null) ? manager.getAutomaticWorkQueue()
: workQueue1;
+            return new Executor() {
+                
+                @Override
+                public void execute(Runnable command) {
+                    workQueue.execute(command, 0);
+                }
+            };
         } else {
-            workQueue = Executors.newFixedThreadPool(20);
+            return Executors.newFixedThreadPool(20);
         }
-        return workQueue;
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/12e8613a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
index dfc641d..fd49d4a 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -34,9 +34,7 @@ import javax.jms.Topic;
 import javax.jms.XASession;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
 
 import org.apache.cxf.common.logging.LogUtils;
 
@@ -182,29 +180,29 @@ public class MessageListenerContainer implements JMSListenerContainer
{
 
     }
     
+    @SuppressWarnings("PMD")
     static class TransactionalMessageListener implements MessageListener {
         private TransactionManager tm;
         private MessageListener listenerHandler;
-        private Session session;
+        private XASession session;
         
         public TransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
+            if (tm == null) {
+                throw new IllegalArgumentException("Must supply a transaction manager");
+            }
+            if (session == null || !(session instanceof XASession)) {
+                throw new IllegalArgumentException("Must supply an XASession");
+            }
             this.tm = tm;
-            this.session = session;
+            this.session = (XASession)session;
             this.listenerHandler = listenerHandler;
         }
 
         @Override
         public void onMessage(Message message) {
-            if (tm == null || !(session instanceof XASession)) {
-                listenerHandler.onMessage(message);
-                return;
-            }
             try {
-                XASession xaSession = (XASession)session; // TODO check cast
                 tm.begin();
-                Transaction tr = tm.getTransaction();
-                XAResource res = xaSession.getXAResource();
-                tr.enlistResource(res);
+                tm.getTransaction().enlistResource(session.getXAResource());
                 listenerHandler.onMessage(message);
                 tm.commit();
             } catch (Throwable e) {


Mime
View raw message