cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: CXF-5543 Fixing error from merge
Date Fri, 04 Apr 2014 07:27:17 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 7c7fff780 -> 1ef40fca2


CXF-5543 Fixing error from merge


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1ef40fca
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1ef40fca
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1ef40fca

Branch: refs/heads/master
Commit: 1ef40fca273d8d2ba3a37b91dbcf66d1f7e2dfed
Parents: 7c7fff7
Author: Christian Schneider <chris@die-schneider.net>
Authored: Fri Apr 4 09:27:08 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Fri Apr 4 09:27:08 2014 +0200

----------------------------------------------------------------------
 .../util/PollingMessageListenerContainer.java   | 217 +++++++------------
 1 file changed, 76 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/1ef40fca/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 6aa217d..b7c725d 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -19,27 +19,24 @@
 package org.apache.cxf.transport.jms.util;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.XASession;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
 import javax.transaction.TransactionManager;
 
 import org.apache.cxf.common.logging.LogUtils;
 
-public class MessageListenerContainer implements JMSListenerContainer {
-    private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
+public class PollingMessageListenerContainer implements JMSListenerContainer {
+    private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
 
     private Connection connection;
     private Destination destination;
@@ -48,15 +45,19 @@ public class MessageListenerContainer implements JMSListenerContainer
{
     private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
     private String messageSelector;
     private boolean running;
-    private MessageConsumer consumer;
-    private Session session;
     private Executor executor;
+    @SuppressWarnings("unused")
     private String durableSubscriptionName;
+    @SuppressWarnings("unused")
     private boolean pubSubNoLocal;
     private TransactionManager transactionManager;
 
-    public MessageListenerContainer(Connection connection, Destination destination,
-                                    MessageListener listenerHandler) {
+    private ExecutorService pollers;
+
+    private int numListenerThreads = 1;
+
+    public PollingMessageListenerContainer(Connection connection, Destination destination,
+                                           MessageListener listenerHandler) {
         this.connection = connection;
         this.destination = destination;
         this.listenerHandler = listenerHandler;
@@ -106,152 +107,86 @@ public class MessageListenerContainer implements JMSListenerContainer
{
         this.transactionManager = transactionManager;
     }
 
-    @Override
-    public void start() {
-        try {
-            session = connection.createSession(transacted, acknowledgeMode);
-            if (durableSubscriptionName != null) {
-                consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
-                                                           messageSelector, pubSubNoLocal);
-            } else {
-                consumer = session.createConsumer(destination, messageSelector);
+    class Poller implements Runnable {
+
+        @Override
+        public void run() {
+            ResourceCloser closer = new ResourceCloser();
+            while (running) {
+                try {
+                    if (transactionManager != null) {
+                        transactionManager.begin();
+                    }
+                    Session session = closer.register(connection.createSession(transacted,
acknowledgeMode));
+                    MessageConsumer consumer = closer.register(session.createConsumer(destination,
+                                                                                      messageSelector));
+                    Message message = consumer.receive(1000);
+                    try {
+                        if (message != null) {
+                            listenerHandler.onMessage(message);
+                        }
+                        if (transactionManager != null) {
+                            transactionManager.commit();
+                        } else {
+                            session.commit();
+                        }
+                    } catch (Exception e) {
+                        safeRollBack(session, e);
+                    }
+                } catch (Exception e) {
+                    LOG.log(Level.WARNING, "Unexpected exception", e);
+                } finally {
+                    closer.close();
+                }
             }
-            
-            MessageListener intListener = (transactionManager != null)
-                ? new XATransactionalMessageListener(transactionManager, session, listenerHandler)
-                : new LocalTransactionalMessageListener(session, listenerHandler); 
-            // new DispachingListener(getExecutor(), listenerHandler);
-            consumer.setMessageListener(intListener);
-            
-            running = true;
-        } catch (JMSException e) {
-            throw JMSUtil.convertJmsException(e);
-        }
-    }
 
-    @Override
-    public void stop() {
-        running = false;
-        ResourceCloser.close(consumer);
-        ResourceCloser.close(session);
-        consumer = null;
-        session = null;
-    }
+        }
 
-    @Override
-    public void shutdown() {
-        stop();
-        ResourceCloser.close(connection);
     }
 
-    protected TransactionManager getTransactionManager() {
-        if (this.transactionManager == null) {
-            try {
-                InitialContext ctx = new InitialContext();
-                this.transactionManager = (TransactionManager)ctx
-                    .lookup("javax.transaction.TransactionManager");
-            } catch (NamingException e) {
-                // Ignore
+    private void safeRollBack(Session session, Exception e) {
+        LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back",
e);
+        try {
+            if (transactionManager != null) {
+                transactionManager.rollback();
+            } else {
+                session.rollback();
             }
+        } catch (Exception e1) {
+            LOG.log(Level.WARNING, "Rollback of Local transaction failed", e1);
         }
-        return this.transactionManager;
     }
 
-    static class DispachingListener implements MessageListener {
-        private Executor executor;
-        private MessageListener listenerHandler;
-
-        public DispachingListener(Executor executor, MessageListener listenerHandler) {
-            this.executor = executor;
-            this.listenerHandler = listenerHandler;
+    @Override
+    public void start() {
+        if (running) {
+            return;
         }
-
-        @Override
-        public void onMessage(final Message message) {
-            executor.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    listenerHandler.onMessage(message);
-                }
-
-            });
+        running = true;
+        pollers = Executors.newFixedThreadPool(numListenerThreads);
+        for (int c = 0; c < numListenerThreads; c++) {
+            pollers.execute(new Poller());
         }
-
     }
-    
-    static class LocalTransactionalMessageListener implements MessageListener {
-        private MessageListener listenerHandler;
-        private Session session;
-        
-        public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler)
{
-            this.session = session;
-            this.listenerHandler = listenerHandler;
-        }
 
-        @Override
-        public void onMessage(Message message) {
-            try {
-                listenerHandler.onMessage(message);
-                session.commit();
-            } catch (Throwable e) {
-                safeRollback(e);
-            }
+    @Override
+    public void stop() {
+        if (!running) {
+            return;
         }
-        
-        private void safeRollback(Throwable t) {
-            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
-            try {
-                session.rollback();
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Rollback of Local transaction failed", e);
-            }
+        running = false;
+        pollers.shutdown();
+        try {
+            pollers.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            // Ignore
         }
-        
+        pollers = null;
     }
-    
-    static class XATransactionalMessageListener implements MessageListener {
-        private TransactionManager tm;
-        private MessageListener listenerHandler;
-        private XASession session;
-        
-        public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener
listenerHandler) {
-            if (tm == null) {
-                throw new IllegalArgumentException("Must supply a transaction manager");
-            }
-            if (session == null || !(session instanceof XASession)) {
-                throw new IllegalArgumentException("Must supply an XASession");
-            }
-            this.tm = tm;
-            this.session = (XASession)session;
-            this.listenerHandler = listenerHandler;
-        }
 
-        @Override
-        public void onMessage(Message message) {
-            try {
-                tm.begin();
-                tm.getTransaction().enlistResource(session.getXAResource());
-                listenerHandler.onMessage(message);
-                tm.commit();
-            } catch (Throwable e) {
-                safeRollback(e);
-                if (e instanceof RuntimeException) {
-                    throw (RuntimeException)e;
-                } else {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-        
-        private void safeRollback(Throwable t) {
-            LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling
back" , t);
-            try {
-                tm.rollback();
-            } catch (Exception e) {
-                LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e);
-            }
-        }
-        
+    @Override
+    public void shutdown() {
+        stop();
+        ResourceCloser.close(connection);
     }
 }


Mime
View raw message