cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject cxf git commit: CXF-6199 Adding concurrentConsumers
Date Fri, 16 Jan 2015 15:48:03 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 88a658195 -> 41f3f6beb


CXF-6199 Adding concurrentConsumers


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/41f3f6be
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/41f3f6be
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/41f3f6be

Branch: refs/heads/master
Commit: 41f3f6beb5b090ff72a9cf19b0b1d1d23986f5cd
Parents: 88a6581
Author: Christian Schneider <chris@die-schneider.net>
Authored: Fri Jan 16 16:47:36 2015 +0100
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Fri Jan 16 16:47:53 2015 +0100

----------------------------------------------------------------------
 .../cxf/transport/jms/JMSConfiguration.java     |  9 +++
 .../cxf/transport/jms/JMSDestination.java       | 17 +++--
 .../cxf/transport/jms/uri/JMSEndpoint.java      | 14 ++++
 .../util/AbstractMessageListenerContainer.java  | 22 -------
 .../jms/util/MessageListenerContainer.java      | 68 +-------------------
 .../util/PollingMessageListenerContainer.java   | 42 ++++++++----
 .../cxf/transport/jms/uri/JMSEndpointTest.java  |  3 +-
 7 files changed, 67 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/41f3f6be/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index b4ec3f0..5dc0e31 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -54,6 +54,7 @@ public class JMSConfiguration {
     private boolean sessionTransacted;
     private boolean createSecurityContext = true;
 
+    private int concurrentConsumers = 1;
     private int maxSuspendedContinuations = DEFAULT_VALUE;
     private int reconnectPercentOfMax = 70;
 
@@ -320,6 +321,14 @@ public class JMSConfiguration {
     public void setTransactionManager(Object transactionManager) {
     }
 
+    public int getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
+
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
+    }
+
     public int getMaxSuspendedContinuations() {
         return maxSuspendedContinuations;
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41f3f6be/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index e4127d3..9f0fcbc 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -48,10 +48,8 @@ import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractMultiplexDestination;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
-import org.apache.cxf.transport.jms.util.AbstractMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSUtil;
-import org.apache.cxf.transport.jms.util.MessageListenerContainer;
 import org.apache.cxf.transport.jms.util.PollingMessageListenerContainer;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
 
@@ -125,15 +123,18 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
             connection = JMSFactory.createConnection(jmsConfig);
             connection.setExceptionListener(new ExceptionListener() {
                 public void onException(JMSException exception) {
-                    LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect",
exception);
-                    restartConnection();
+                    if (!shutdown) {
+                        LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect",
exception);
+                        restartConnection();
+                    }
                 }
             });
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Destination destination = jmsConfig.getTargetDestination(session);
-            AbstractMessageListenerContainer container = jmsConfig.getTransactionManager()
!= null
-                ? new PollingMessageListenerContainer(connection, destination, this)
-                : new MessageListenerContainer(connection, destination, this);
+
+            PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,

+                                                                                        
   destination, this);
+            container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
             container.setTransactionManager(jmsConfig.getTransactionManager());
             container.setMessageSelector(jmsConfig.getMessageSelector());
             container.setTransacted(jmsConfig.isSessionTransacted());
@@ -191,6 +192,8 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         getLogger().log(Level.FINE, "JMSDestination shutdown()");
         this.deactivate();
     }
+    
+    
 
     /**
      * Convert JMS message received by ListenerThread to CXF message and inform incomingObserver
that a

http://git-wip-us.apache.org/repos/asf/cxf/blob/41f3f6be/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index 885c1f8..9d104ac 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -82,6 +82,7 @@ public class JMSEndpoint {
     private String topicReplyToName;
     private boolean useConduitIdSelector = true;
     private String username;
+    private int concurrentConsumers = 1;
 
     /**
      * @param uri
@@ -342,6 +343,19 @@ public class JMSEndpoint {
     public void setUsername(String username) {
         this.username = username;
     }
+    
+    public int getConcurrentConsumers() {
+        return concurrentConsumers;
+    }
+    
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
+    }
+    
+    public void setConcurrentConsumers(String concurrentConsumers) {
+        this.concurrentConsumers = new Integer(concurrentConsumers);
+    }
+    
     public String getPassword() {
         return password;
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41f3f6be/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
index f33b3c8..f5affe4 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/AbstractMessageListenerContainer.java
@@ -24,7 +24,6 @@ import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.transaction.TransactionManager;
@@ -41,8 +40,6 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon
     protected int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
     protected String messageSelector;
     protected boolean running;
-    protected MessageConsumer consumer;
-    protected Session session;
     protected Executor executor;
     protected String durableSubscriptionName;
     protected boolean pubSubNoLocal;
@@ -99,23 +96,4 @@ public abstract class AbstractMessageListenerContainer implements JMSListenerCon
         this.transactionManager = transactionManager;
     }
 
-
-    
-    
-
-    /*
-    protected TransactionManager getTransactionManager() {
-        if (this.transactionManager == null) {
-            try {
-                InitialContext ctx = new InitialContext();
-                this.transactionManager = (TransactionManager)ctx
-                    .lookup("javax.transaction.TransactionManager");
-            } catch (NamingException e) {
-                // Ignore
-            }
-        }
-        return this.transactionManager;
-    }
-    */
-
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41f3f6be/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
index 3a7b2e3..605c43f 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -18,18 +18,16 @@
  */
 package org.apache.cxf.transport.jms.util;
 
-import java.util.concurrent.Executor;
 import java.util.logging.Level;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 import javax.jms.Topic;
-import javax.jms.XASession;
-import javax.transaction.TransactionManager;
 
 /**
  * Listen for messages on a queue or topic asynchronously by registering a
@@ -39,6 +37,8 @@ import javax.transaction.TransactionManager;
  * This has to be handled outside.
  */
 public class MessageListenerContainer extends AbstractMessageListenerContainer {
+    private MessageConsumer consumer;
+    private Session session;
     public MessageListenerContainer(Connection connection, Destination destination,
                                     MessageListener listenerHandler) {
         this.connection = connection;
@@ -82,29 +82,6 @@ public class MessageListenerContainer extends AbstractMessageListenerContainer
{
         ResourceCloser.close(connection);
     }
 
-    static class DispachingListener implements MessageListener {
-        private Executor executor;
-        private MessageListener listenerHandler;
-
-        public DispachingListener(Executor executor, MessageListener listenerHandler) {
-            this.executor = executor;
-            this.listenerHandler = listenerHandler;
-        }
-
-        @Override
-        public void onMessage(final Message message) {
-            executor.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    listenerHandler.onMessage(message);
-                }
-
-            });
-        }
-
-    }
-    
     static class LocalTransactionalMessageListener implements MessageListener {
         private MessageListener listenerHandler;
         private Session session;
@@ -137,43 +114,4 @@ public class MessageListenerContainer extends AbstractMessageListenerContainer
{
         
     }
     
-    static class XATransactionalMessageListener implements MessageListener {
-        private TransactionManager tm;
-        private MessageListener listenerHandler;
-        private XASession session;
-        
-        public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
-            if (tm == null) {
-                throw new IllegalArgumentException("Must supply a transaction manager");
-            }
-            if (!(session instanceof XASession)) {
-                throw new IllegalArgumentException("Must supply an XASession");
-            }
-            this.tm = tm;
-            this.session = (XASession)session;
-            this.listenerHandler = listenerHandler;
-        }
-
-        @Override
-        public void onMessage(Message message) {
-            try {
-                tm.begin();
-                tm.getTransaction().enlistResource(session.getXAResource());
-                listenerHandler.onMessage(message);
-                tm.commit();
-            } catch (Throwable e) {
-                safeRollback(e);
-            }
-        }
-        
-        private void safeRollback(Throwable t) {
-            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
-            try {
-                tm.rollback();
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e);
-            }
-        }
-        
-    }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41f3f6be/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 2a6ca01..9c2e29e 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -30,6 +30,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
+import javax.jms.Topic;
 
 import org.apache.cxf.common.logging.LogUtils;
 
@@ -38,7 +39,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
 
     private ExecutorService pollers;
 
-    private int numListenerThreads = 1;
+    private int concurrentConsumers = 1;
 
     public PollingMessageListenerContainer(Connection connection, Destination destination,
                                            MessageListener listenerHandler) {
@@ -47,20 +48,27 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
         this.listenerHandler = listenerHandler;
     }
 
-    class Poller implements Runnable {
+    private class Poller implements Runnable {
 
         @Override
         public void run() {
-            ResourceCloser closer = new ResourceCloser();
             while (running) {
+                MessageConsumer consumer = null;
+                Session session = null;
                 try {
                     if (transactionManager != null) {
                         transactionManager.begin();
                     }
-                    Session session = closer.register(connection.createSession(transacted,
acknowledgeMode));
-
-                    MessageConsumer consumer = closer.register(session.createConsumer(destination,
-                                                                                      messageSelector));
+                    
+                    session = connection.createSession(transacted, acknowledgeMode);
+                    if (durableSubscriptionName != null && destination instanceof
Topic) {
+                        consumer = session.createDurableSubscriber((Topic)destination, 
+                                                                   durableSubscriptionName,
+                                                                   messageSelector,
+                                                                   pubSubNoLocal);
+                    } else {
+                        consumer = session.createConsumer(destination, messageSelector);
+                    }
                     Message message = consumer.receive(1000);
                     try {
                         if (message != null) {
@@ -68,7 +76,7 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                         }
                         if (transactionManager != null) {
                             transactionManager.commit();
-                        } else {
+                        } else if (session.getTransacted()) {
                             session.commit();
                         }
                     } catch (Exception e) {
@@ -77,7 +85,8 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                 } catch (Exception e) {
                     LOG.log(Level.WARNING, "Unexpected exception", e);
                 } finally {
-                    closer.close();
+                    ResourceCloser.close(consumer);
+                    ResourceCloser.close(session);
                 }
             }
 
@@ -91,7 +100,9 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
             if (transactionManager != null) {
                 transactionManager.rollback();
             } else {
-                session.rollback();
+                if (session.getTransacted()) {
+                    session.rollback();
+                }
             }
         } catch (Exception e1) {
             LOG.log(Level.WARNING, "Rollback of Local transaction failed", e1);
@@ -104,14 +115,15 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
             return;
         }
         running = true;
-        pollers = Executors.newFixedThreadPool(numListenerThreads);
-        for (int c = 0; c < numListenerThreads; c++) {
+        pollers = Executors.newFixedThreadPool(concurrentConsumers);
+        for (int c = 0; c < concurrentConsumers; c++) {
             pollers.execute(new Poller());
         }
     }
 
     @Override
     public void stop() {
+        LOG.fine("Shuttting down " + this.getClass().getSimpleName());
         if (!running) {
             return;
         }
@@ -122,12 +134,16 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
         } catch (InterruptedException e) {
             // Ignore
         }
+        pollers.shutdownNow();
         pollers = null;
     }
 
     @Override
     public void shutdown() {
         stop();
-        ResourceCloser.close(connection);
+    }
+
+    public void setConcurrentConsumers(int concurrentConsumers) {
+        this.concurrentConsumers = concurrentConsumers;
     }
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/41f3f6be/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/JMSEndpointTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/JMSEndpointTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/JMSEndpointTest.java
index b7f5041..f52f28e 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/JMSEndpointTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/JMSEndpointTest.java
@@ -29,10 +29,11 @@ public class JMSEndpointTest extends Assert {
 
     @Test
     public void testBasicQueue() throws Exception {
-        JMSEndpoint endpoint = new JMSEndpoint("jms:queue:Foo.Bar");
+        JMSEndpoint endpoint = new JMSEndpoint("jms:queue:Foo.Bar?concurrentConsumers=21");
         assertEquals(JMSEndpoint.QUEUE, endpoint.getJmsVariant());
         assertEquals("Foo.Bar", endpoint.getDestinationName());
         assertEquals(JMSEndpoint.QUEUE, endpoint.getJmsVariant());
+        assertEquals(21, endpoint.getConcurrentConsumers());
     }
 
     @Test


Mime
View raw message