cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: Bugfix for jms performance in async case, add support for using work queues
Date Thu, 27 Mar 2014 16:09:08 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 8a49565e6 -> 2c8472de7


Bugfix for jms performance in async case, add support for using work queues


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/2c8472de
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/2c8472de
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/2c8472de

Branch: refs/heads/master
Commit: 2c8472de712ab3615bb3a9f8f568734c501d68de
Parents: 8a49565
Author: Christian Schneider <chris@die-schneider.net>
Authored: Thu Mar 27 17:09:00 2014 +0100
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Thu Mar 27 17:09:00 2014 +0100

----------------------------------------------------------------------
 .../apache/cxf/transport/jms/JMSConduit.java    | 30 ++++++---
 .../cxf/transport/jms/JMSDestination.java       | 65 ++++++++++++++------
 .../apache/cxf/transport/jms/JMSFactory.java    | 60 ++++++------------
 .../jms/util/MessageListenerContainer.java      |  9 ++-
 4 files changed, 92 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/2c8472de/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index 011c859..17907a3 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -25,6 +25,7 @@ import java.lang.ref.WeakReference;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -48,6 +49,7 @@ import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSSender;
 import org.apache.cxf.transport.jms.util.JMSUtil;
+import org.apache.cxf.transport.jms.util.MessageListenerContainer;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 
@@ -102,11 +104,23 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
 
     private synchronized void getJMSListener(Destination replyTo) {
         if (jmsListener == null) {
-            jmsListener = JMSFactory
-                .createListenerContainer(jmsConfig, connection, this, replyTo, conduitId);
+            jmsListener = createListenerContainer(replyTo);
             addBusListener();
         }
     }
+    
+    private JMSListenerContainer createListenerContainer(Destination destination) {
+        MessageListenerContainer container = new MessageListenerContainer(connection, destination,
+                                                                          this);
+        String messageSelector = JMSFactory.getMessageSelector(jmsConfig, conduitId);
+        container.setMessageSelector(messageSelector);
+        Executor executor = JMSFactory.createExecutor(bus, "jms-conduit");
+        container.setExecutor(executor);
+        container.start();
+        return container;
+    }
+
+    
     /**
      * Send the JMS message and if the MEP is not oneway receive the response.
      * 
@@ -129,7 +143,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
         
         JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
         String userCID = headers.getJMSCorrelationID();
-        assertIsNotAsyncSyncAndUserCID(exchange, userCID);
+        assertIsNotAsyncAndUserCID(exchange, userCID);
 
         ResourceCloser closer = new ResourceCloser();
         try {
@@ -171,7 +185,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
 
             synchronized (exchange) {
                 sender.sendMessage(closer, session, targetDest, message);
-                LOG.log(Level.INFO, "client sending request message " 
+                LOG.log(Level.FINE, "client sending request message " 
                     + message.getJMSMessageID() + " to " + targetDest);
                 headers.setJMSMessageID(message.getJMSMessageID());
                 if (correlationId == null) {
@@ -200,7 +214,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
         }
     }
 
-    private void assertIsNotAsyncSyncAndUserCID(Exchange exchange, String userCID) {
+    private void assertIsNotAsyncAndUserCID(Exchange exchange, String userCID) {
         if (!exchange.isSynchronous() && userCID != null) {
             throw new IllegalArgumentException("User CID can not be used for asynchronous
exchanges");
         }
@@ -305,7 +319,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
     public void onMessage(javax.jms.Message jmsMessage) {
         try {
             String correlationId = jmsMessage.getJMSCorrelationID();
-            LOG.log(Level.INFO, "Received reply message with correlation id " + correlationId);
+            LOG.log(Level.FINE, "Received reply message with correlation id " + correlationId);
 
             // Try to correlate the incoming message with some timeout as it may have been
             // added to the map after the message was sent
@@ -313,7 +327,9 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
             Exchange exchange = null;
             while (exchange == null && count < 100) {
                 exchange = correlationMap.remove(correlationId);
-                Thread.sleep(100);
+                if (exchange == null) {
+                    Thread.sleep(1);
+                }
                 count++;
             }
             if (exchange == null) {

http://git-wip-us.apache.org/repos/asf/cxf/blob/2c8472de/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 1e349a0..76f857d 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -20,11 +20,15 @@
 package org.apache.cxf.transport.jms;
 
 import java.io.UnsupportedEncodingException;
+import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
+import javax.jms.Session;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -43,14 +47,17 @@ import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSUtil;
+import org.apache.cxf.transport.jms.util.MessageListenerContainer;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
 
-public class JMSDestination extends AbstractMultiplexDestination 
-    implements MessageListener {
+public class JMSDestination extends AbstractMultiplexDestination implements MessageListener
{
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
 
     private JMSConfiguration jmsConfig;
     private Bus bus;
+    
+    @SuppressWarnings("unused")
     private EndpointInfo ei;
     private JMSListenerContainer jmsListener;
     private ThrottlingCounter suspendedContinuations;
@@ -80,12 +87,33 @@ public class JMSDestination extends AbstractMultiplexDestination
         getLogger().log(Level.FINE, "JMSDestination activate().... ");
         jmsConfig.ensureProperlyConfigured();
 
-        jmsListener = JMSFactory.createTargetDestinationListener(ei, jmsConfig, this);
-        int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax()
/ 100;
-        this.suspendedContinuations = new ThrottlingCounter(this.jmsListener, 
-                                                            restartLimit,
+        jmsListener = createTargetDestinationListener();
+        int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax()
+                           / 100;
+        this.suspendedContinuations = new ThrottlingCounter(this.jmsListener, restartLimit,
                                                             jmsConfig.getMaxSuspendedContinuations());
     }
+    
+    
+    private JMSListenerContainer createTargetDestinationListener() {
+        Session session = null;
+        try {
+            Connection connection = JMSFactory.createConnection(jmsConfig);
+            connection.start();
+            session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE);
+            Destination destination = jmsConfig.getTargetDestination(session);
+            MessageListenerContainer container = new MessageListenerContainer(connection,
destination, this);
+            container.setMessageSelector(jmsConfig.getMessageSelector());
+            Executor executor = JMSFactory.createExecutor(bus, "jms-destination");
+            container.setExecutor(executor);
+            container.start();
+            return container;
+        } catch (JMSException e) {
+            throw JMSUtil.convertJmsException(e);
+        } finally {
+            ResourceCloser.close(session);
+        }
+    }
 
     public void deactivate() {
         if (jmsListener != null) {
@@ -113,32 +141,31 @@ public class JMSDestination extends AbstractMultiplexDestination
             if (loader != null) {
                 origLoader = ClassLoaderUtils.setThreadContextClassloader(loader);
             }
-            getLogger().log(Level.INFO, "JMS destination received message " + message + "
on " 
-                + jmsConfig.getTargetDestination());
-            Message inMessage = JMSMessageUtils.asCXFMessage(message, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+            getLogger().log(Level.FINE,
+                            "JMS destination received message " + message + " on "
+                                + jmsConfig.getTargetDestination());
+            Message inMessage = JMSMessageUtils
+                .asCXFMessage(message, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
             SecurityContext securityContext = JMSMessageUtils.buildSecurityContext(message,
jmsConfig);
             inMessage.put(SecurityContext.class, securityContext);
             inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
             ((MessageImpl)inMessage).setDestination(this);
             if (jmsConfig.getMaxSuspendedContinuations() != 0) {
-                JMSContinuationProvider cp = new JMSContinuationProvider(bus, 
-                                                                         inMessage, 
-                                                                         incomingObserver,

+                JMSContinuationProvider cp = new JMSContinuationProvider(bus, inMessage,
incomingObserver,
                                                                          suspendedContinuations);
                 inMessage.put(ContinuationProvider.class.getName(), cp);
             }
-            
+
             origBus = BusFactory.getAndSetThreadDefaultBus(bus);
 
             // FIXME
-            //JCATransactionalMessageListenerContainer.setMessageEndpoint(inMessage);
+            // JCATransactionalMessageListenerContainer.setMessageEndpoint(inMessage);
 
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
-            
-            if (inMessage.getExchange() != null 
-                && inMessage.getExchange().getInMessage() != null) {
+
+            if (inMessage.getExchange() != null && inMessage.getExchange().getInMessage()
!= null) {
                 inMessage = inMessage.getExchange().getInMessage();
             }
 
@@ -153,7 +180,7 @@ public class JMSDestination extends AbstractMultiplexDestination
                     }
                 }
             }
-            
+
         } catch (SuspendedInvocationException ex) {
             getLogger().log(Level.FINE, "Request message has been suspended");
         } catch (UnsupportedEncodingException ex) {
@@ -164,7 +191,7 @@ public class JMSDestination extends AbstractMultiplexDestination
             if (origBus != bus) {
                 BusFactory.setThreadDefaultBus(origBus);
             }
-            if (origLoader != null) { 
+            if (origLoader != null) {
                 origLoader.reset();
             }
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2c8472de/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
index 2d9f407..6ad7b55 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
@@ -18,21 +18,18 @@
  */
 package org.apache.cxf.transport.jms;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
 import javax.naming.NamingException;
 
-import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.jms.util.JMSListenerContainer;
+import org.apache.cxf.Bus;
 import org.apache.cxf.transport.jms.util.JMSSender;
-import org.apache.cxf.transport.jms.util.JMSUtil;
 import org.apache.cxf.transport.jms.util.JndiHelper;
-import org.apache.cxf.transport.jms.util.MessageListenerContainer;
-import org.apache.cxf.transport.jms.util.ResourceCloser;
+import org.apache.cxf.workqueue.WorkQueueManager;
 
 /**
  * Factory to create jms helper objects from configuration and context information
@@ -90,44 +87,12 @@ public final class JMSFactory {
         return sender;
     }
 
-    private static String getMessageSelector(JMSConfiguration jmsConfig, String conduitId)
{
+    static String getMessageSelector(JMSConfiguration jmsConfig, String conduitId) {
         String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
         String conduitIdSt = jmsConfig.isUseConduitIdSelector() && conduitId != null
? conduitId : "";
         String correlationIdPrefix = staticSelectorPrefix + conduitIdSt;
         return correlationIdPrefix.isEmpty() ? null : "JMSCorrelationID LIKE '" + correlationIdPrefix
+ "%'";
     }
-    
-    public static JMSListenerContainer createTargetDestinationListener(EndpointInfo ei, 
-                                                                       JMSConfiguration jmsConfig,
-                                                                       MessageListener listenerHandler)
{
-        Session session = null;
-        try {
-            Connection connection = createConnection(jmsConfig);
-            connection.start();
-            session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE);
-            Destination destination = jmsConfig.getTargetDestination(session);
-            MessageListenerContainer container = new MessageListenerContainer(connection,
destination, listenerHandler);
-            container.setMessageSelector(jmsConfig.getMessageSelector());
-            container.start();
-            return container;
-        } catch (JMSException e) {
-            throw JMSUtil.convertJmsException(e);
-        } finally {
-            ResourceCloser.close(session);
-        }
-    }
-
-    public static JMSListenerContainer createListenerContainer(JMSConfiguration jmsConfig,
-                                                               Connection connection,
-                                                               MessageListener listenerHandler,

-                                                               Destination destination,
-                                                               String conduitId) {
-        MessageListenerContainer container = new MessageListenerContainer(connection, destination,
listenerHandler);
-        String messageSelector = getMessageSelector(jmsConfig, conduitId);
-        container.setMessageSelector(messageSelector);
-        container.start();
-        return container;
-    }
 
     public static Connection createConnection(JMSConfiguration jmsConfig) throws JMSException
{
         Connection connection = jmsConfig.getConnectionFactory().createConnection(jmsConfig.getUserName(),
@@ -138,4 +103,17 @@ public final class JMSFactory {
         return connection;
     }
     
+    public static Executor createExecutor(Bus bus, String name) {
+        WorkQueueManager manager = bus.getExtension(WorkQueueManager.class);
+        Executor workQueue;
+        if (manager != null) {
+            workQueue = manager.getNamedWorkQueue(name);
+            if (workQueue == null) {
+                workQueue = manager.getAutomaticWorkQueue();
+            }
+        } else {
+            workQueue = Executors.newFixedThreadPool(20);
+        }
+        return workQueue;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/2c8472de/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
index 204fa51..bd9a2b0 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -18,7 +18,7 @@
  */
 package org.apache.cxf.transport.jms.util;
 
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 
 import javax.jms.Connection;
@@ -41,7 +41,7 @@ public class MessageListenerContainer implements JMSListenerContainer {
     private boolean running;
     private MessageConsumer consumer;
     private Session session;
-    private ExecutorService executor;
+    private Executor executor;
     private String durableSubscriptionName;
     private boolean pubSubNoLocal;
 
@@ -51,7 +51,6 @@ public class MessageListenerContainer implements JMSListenerContainer {
         this.connection = connection;
         this.replyTo = replyTo;
         this.listenerHandler = listenerHandler;
-        executor = Executors.newFixedThreadPool(20);
     }
     
     public Connection getConnection() {
@@ -70,14 +69,14 @@ public class MessageListenerContainer implements JMSListenerContainer
{
         this.messageSelector = messageSelector;
     }
     
-    private ExecutorService getExecutor() {
+    private Executor getExecutor() {
         if (executor == null) {
             executor = Executors.newFixedThreadPool(10);
         }
         return executor;
     }
 
-    public void setExecutor(ExecutorService executor) {
+    public void setExecutor(Executor executor) {
         this.executor = executor;
     }
 


Mime
View raw message