cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: Fix for jms performance
Date Fri, 28 Mar 2014 12:51:58 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 10897281f -> 3d701b59b


Fix for jms performance


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/3d701b59
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/3d701b59
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/3d701b59

Branch: refs/heads/master
Commit: 3d701b59b57942fa0800a54cd46e3f049208da71
Parents: 1089728
Author: Christian Schneider <chris@die-schneider.net>
Authored: Fri Mar 28 13:51:24 2014 +0100
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Fri Mar 28 13:51:45 2014 +0100

----------------------------------------------------------------------
 .../apache/cxf/transport/jms/JMSConduit.java    | 177 ++++++++++---------
 .../cxf/transport/jms/JMSConfiguration.java     |   5 +-
 .../apache/cxf/jms/testsuite/util/testcases.xml |   3 +-
 3 files changed, 95 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/3d701b59/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index 17907a3..ddcc271 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -139,81 +139,102 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
 
         jmsConfig.ensureProperlyConfigured();        
         assertIsNotTextMessageAndMtom(outMessage);
-        //assertIsNotSyncAndTopicReply(exchange);
-        
-        JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
-        String userCID = headers.getJMSCorrelationID();
-        assertIsNotAsyncAndUserCID(exchange, userCID);
 
         ResourceCloser closer = new ResourceCloser();
         try {
             if (connection == null) {
                 connection = JMSFactory.createConnection(jmsConfig);
+                connection.start();
             }
             Session session = closer.register(connection.createSession(jmsConfig.isSessionTransacted(),

                                                                        Session.AUTO_ACKNOWLEDGE));
-            Destination targetDest = jmsConfig.getTargetDestination(session);
             
-            Destination replyToDestination = null;
-            if (!exchange.isOneWay()) {
-                if (!exchange.isSynchronous() && staticReplyDestination == null)
{
-                    staticReplyDestination = jmsConfig.getReplyDestination(session);
-                    getJMSListener(staticReplyDestination);
-                }
-                replyToDestination = jmsConfig.getReplyToDestination(session, headers.getJMSReplyTo());
+            if (exchange.isOneWay()) {
+                sendMessage(request, outMessage, null, null, closer, session);
+            } else {
+                sendAndReceiveMessage(exchange, request, outMessage, closer, session);
             }
-            connection.start();
+        } catch (JMSException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            closer.close();
+        }
+    }
 
-            String messageType = jmsConfig.getMessageType();
-            String correlationId = createCorrelationId(exchange, userCID);
-            if (correlationId != null) {
-                correlationMap.put(correlationId, exchange);
-            }
-            
-            javax.jms.Message message = JMSMessageUtils.asJMSMessage(jmsConfig, 
-                                                                     outMessage,
-                                                                     request, 
-                                                                     messageType,
-                                                                     session,  
-                                                                     correlationId, 
-                                                                     JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-            if (replyToDestination != null) {
-                message.setJMSReplyTo(replyToDestination);
-            }
+    private void sendAndReceiveMessage(final Exchange exchange, final Object request, final
Message outMessage,
+                                ResourceCloser closer,
+                                Session session) throws JMSException {
+        if (staticReplyDestination == null) {
+            staticReplyDestination = jmsConfig.getReplyDestination(session);
+            getJMSListener(staticReplyDestination);
+        }
+        JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
+        String userCID = headers.getJMSCorrelationID();
+        assertIsNotAsyncAndUserCID(exchange, userCID);
+        String correlationId = createCorrelationId(exchange, userCID);
+        if (correlationId != null) {
+            correlationMap.put(correlationId, exchange);
+        }
+        
+        Destination replyToDestination = jmsConfig.getReplyToDestination(session, headers.getJMSReplyTo());
+        String jmsMessageID = sendMessage(request, outMessage, replyToDestination, correlationId,
closer, session);
+        
+        boolean perMessageCorrelationId = correlationId == null || userCID != null;
+        if (correlationId == null) {
+            correlationId = jmsMessageID;
+            correlationMap.put(correlationId, exchange);
+        }
 
-            JMSSender sender = JMSFactory.createJmsSender(jmsConfig, headers);
-
-            synchronized (exchange) {
-                sender.sendMessage(closer, session, targetDest, message);
-                LOG.log(Level.FINE, "client sending request message " 
-                    + message.getJMSMessageID() + " to " + targetDest);
-                headers.setJMSMessageID(message.getJMSMessageID());
-                if (correlationId == null) {
-                    // Warning: We might loose the reply if it already arrived at this point

-                    correlationId = message.getJMSMessageID();
-                    correlationMap.put(correlationId, exchange);
-                }
-            }
-            
-            /**
-             * If the message is not oneWay we will expect to receive a reply on the listener.
-             */
-            if (!exchange.isOneWay() && (exchange.isSynchronous())) {
-                Destination replyDestination = staticReplyDestination != null 
-                    ? staticReplyDestination : replyToDestination;
-                javax.jms.Message replyMessage = JMSUtil.receive(session, replyDestination,
correlationId,
+        if (exchange.isSynchronous()) {
+            if (perMessageCorrelationId) {
+                // TODO Not sure if replyToDestination is correct here
+                javax.jms.Message replyMessage = JMSUtil.receive(session, replyToDestination,
correlationId,
                                                                  jmsConfig.getReceiveTimeout(),
                                                                  jmsConfig.isPubSubNoLocal());
                 correlationMap.remove(correlationId);
-                doReplyMessage(exchange, replyMessage);
+                processReplyMessage(exchange, replyMessage);
+            } else {
+                synchronized (exchange) {
+                    try {
+                        exchange.wait(jmsConfig.getReceiveTimeout());
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Interrupted while correlating", e);
+                    }
+                    if (exchange.get(CORRELATED) != Boolean.TRUE) {
+                        throw new RuntimeException("Timeout receiving message with correlationId
"
+                                                   + correlationId);
+                    }
+                }
             }
-        } catch (JMSException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        } finally {
-            closer.close();
         }
     }
 
+    private String sendMessage(final Object request, final Message outMessage,
+                               Destination replyToDestination, String correlationId,
+                               ResourceCloser closer, Session session) throws JMSException
{
+        JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
+        javax.jms.Message message = JMSMessageUtils.asJMSMessage(jmsConfig, 
+                                                                 outMessage,
+                                                                 request, 
+                                                                 jmsConfig.getMessageType(),
+                                                                 session,  
+                                                                 correlationId, 
+                                                                 JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+        if (replyToDestination != null) {
+            message.setJMSReplyTo(replyToDestination);
+        }
+
+        JMSSender sender = JMSFactory.createJmsSender(jmsConfig, headers);
+        
+        Destination targetDest = jmsConfig.getTargetDestination(session);
+        sender.sendMessage(closer, session, targetDest, message);
+        String jmsMessageID = message.getJMSMessageID();
+        LOG.log(Level.FINE, "client sending request message " 
+            + jmsMessageID + " to " + targetDest);
+        headers.setJMSMessageID(jmsMessageID);
+        return jmsMessageID;
+    }
+
     private void assertIsNotAsyncAndUserCID(Exchange exchange, String userCID) {
         if (!exchange.isSynchronous() && userCID != null) {
             throw new IllegalArgumentException("User CID can not be used for asynchronous
exchanges");
@@ -230,35 +251,21 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
             throw new ConfigurationException(msg);
         }
     }
-    
-    @SuppressWarnings("unused")
-    private void assertIsNotSyncAndTopicReply(Exchange exchange) {
-        if (exchange.isSynchronous() && jmsConfig.isReplyPubSubDomain()) {
-            throw new IllegalArgumentException("Synchronous calls can not be combined with
a response on a Topic");
-        }
-    }
 
     private String createCorrelationId(final Exchange exchange, String userCID) {
-        String correlationId = null;
-        if (!exchange.isOneWay()) {
-            if (userCID != null) {
-                correlationId = userCID;
-            } else if (!jmsConfig.isSetConduitSelectorPrefix()
-                       && !jmsConfig.isReplyPubSubDomain()
-                       && (exchange.isSynchronous() || exchange.isOneWay())
-                       && (!jmsConfig.isSetUseConduitIdSelector() 
-                           || !jmsConfig.isUseConduitIdSelector())) {
-                // in this case the correlation id will be set to
-                // the message id later
-                correlationId = null;
-            } else { 
-                String prefix = (jmsConfig.isUseConduitIdSelector()) 
-                    ? jmsConfig.getConduitSelectorPrefix() + conduitId 
-                    : jmsConfig.getConduitSelectorPrefix();
-                correlationId = JMSUtil.createCorrelationId(prefix, messageCount.incrementAndGet());
-            }
+        if (userCID != null) {
+            return userCID;
+        } else if (!jmsConfig.isSetConduitSelectorPrefix() && !jmsConfig.isReplyPubSubDomain()
+                   && exchange.isSynchronous()
+                   && (!jmsConfig.isSetUseConduitIdSelector() || !jmsConfig.isUseConduitIdSelector()))
{
+            // in this case the correlation id will be set to
+            // the message id later
+            return null;
+        } else {
+            String prefix = (jmsConfig.isUseConduitIdSelector()) ? jmsConfig.getConduitSelectorPrefix()
+                                                                   + conduitId : jmsConfig.getConduitSelectorPrefix();
+            return JMSUtil.createCorrelationId(prefix, messageCount.incrementAndGet());
         }
-        return correlationId;
     }
 
     private JMSMessageHeadersType getOrCreateJmsHeaders(final Message outMessage) {
@@ -336,7 +343,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
                 LOG.log(Level.WARNING, "Could not correlate message with correlationId "
+ correlationId);
                 return;
             }
-            doReplyMessage(exchange, jmsMessage);
+            processReplyMessage(exchange, jmsMessage);
         } catch (JMSException e) {
             throw JMSUtil.convertJmsException(e);
         } catch (InterruptedException e) {
@@ -349,7 +356,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
      * Process the reply message
      * @throws JMSException 
      */
-    public void doReplyMessage(Exchange exchange, javax.jms.Message jmsMessage) throws JMSException
{
+    protected void processReplyMessage(Exchange exchange, javax.jms.Message jmsMessage) throws
JMSException {
         
         LOG.log(Level.FINE, "client received reply: ", jmsMessage);
         try {
@@ -376,7 +383,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
         }
     }
 
-    public synchronized void shutdownListeners() {
+    private synchronized void shutdownListeners() {
         if (listener != null) {
             listener.unreg();
             listener = null;

http://git-wip-us.apache.org/repos/asf/cxf/blob/3d701b59/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 15e27b6..af49c64 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -391,11 +391,10 @@ public class JMSConfiguration {
     }
     
     public Destination getReplyToDestination(Session session, String userDestination) throws
JMSException {
-        String replyTo = userDestination;
-        if (replyTo == null) {
+        if (userDestination == null) {
             return getReplyDestination(session);
         }
-        return destinationResolver.resolveDestinationName(session, replyTo, replyPubSubDomain);
+        return destinationResolver.resolveDestinationName(session, userDestination, replyPubSubDomain);
     }
     
     public Destination getReplyDestination(Session session) throws JMSException {

http://git-wip-us.apache.org/repos/asf/cxf/blob/3d701b59/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/testcases.xml
----------------------------------------------------------------------
diff --git a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/testcases.xml
b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/testcases.xml
index 01236e6..6c9f382 100644
--- a/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/testcases.xml
+++ b/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/testcases.xml
@@ -51,8 +51,7 @@
             <targetService/>
             <contentType>text/xml; charset=UTF-8</contentType>
             <soapAction/>
-            <requestURI>jms:jndi:dynamicQueues/testqueue0001
-                        </requestURI>
+            <requestURI>jms:jndi:dynamicQueues/testqueue0001</requestURI>
             <messageBody/>
         </requestMessage>
         <responseMessage>


Mime
View raw message