Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5F083104EE for ; Fri, 4 Apr 2014 07:27:23 +0000 (UTC) Received: (qmail 41745 invoked by uid 500); 4 Apr 2014 07:27:22 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 41693 invoked by uid 500); 4 Apr 2014 07:27:21 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 41649 invoked by uid 99); 4 Apr 2014 07:27:17 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2014 07:27:17 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 1DAA794A11E; Fri, 4 Apr 2014 07:27:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cschneider@apache.org To: commits@cxf.apache.org Message-Id: <1c1c8996be724711969d19c81dd874c1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CXF-5543 Fixing error from merge Date: Fri, 4 Apr 2014 07:27:17 +0000 (UTC) Repository: cxf Updated Branches: refs/heads/master 7c7fff780 -> 1ef40fca2 CXF-5543 Fixing error from merge Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1ef40fca Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1ef40fca Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1ef40fca Branch: refs/heads/master Commit: 1ef40fca273d8d2ba3a37b91dbcf66d1f7e2dfed Parents: 7c7fff7 Author: Christian Schneider Authored: Fri Apr 4 09:27:08 2014 +0200 Committer: Christian Schneider Committed: Fri Apr 4 09:27:08 2014 +0200 ---------------------------------------------------------------------- .../util/PollingMessageListenerContainer.java | 217 +++++++------------ 1 file changed, 76 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/1ef40fca/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index 6aa217d..b7c725d 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -19,27 +19,24 @@ package org.apache.cxf.transport.jms.util; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.XASession; -import javax.naming.InitialContext; -import javax.naming.NamingException; import javax.transaction.TransactionManager; import org.apache.cxf.common.logging.LogUtils; -public class MessageListenerContainer implements JMSListenerContainer { - private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class); +public class PollingMessageListenerContainer implements JMSListenerContainer { + private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class); private Connection connection; private Destination destination; @@ -48,15 +45,19 @@ public class MessageListenerContainer implements JMSListenerContainer { private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; private String messageSelector; private boolean running; - private MessageConsumer consumer; - private Session session; private Executor executor; + @SuppressWarnings("unused") private String durableSubscriptionName; + @SuppressWarnings("unused") private boolean pubSubNoLocal; private TransactionManager transactionManager; - public MessageListenerContainer(Connection connection, Destination destination, - MessageListener listenerHandler) { + private ExecutorService pollers; + + private int numListenerThreads = 1; + + public PollingMessageListenerContainer(Connection connection, Destination destination, + MessageListener listenerHandler) { this.connection = connection; this.destination = destination; this.listenerHandler = listenerHandler; @@ -106,152 +107,86 @@ public class MessageListenerContainer implements JMSListenerContainer { this.transactionManager = transactionManager; } - @Override - public void start() { - try { - session = connection.createSession(transacted, acknowledgeMode); - if (durableSubscriptionName != null) { - consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName, - messageSelector, pubSubNoLocal); - } else { - consumer = session.createConsumer(destination, messageSelector); + class Poller implements Runnable { + + @Override + public void run() { + ResourceCloser closer = new ResourceCloser(); + while (running) { + try { + if (transactionManager != null) { + transactionManager.begin(); + } + Session session = closer.register(connection.createSession(transacted, acknowledgeMode)); + MessageConsumer consumer = closer.register(session.createConsumer(destination, + messageSelector)); + Message message = consumer.receive(1000); + try { + if (message != null) { + listenerHandler.onMessage(message); + } + if (transactionManager != null) { + transactionManager.commit(); + } else { + session.commit(); + } + } catch (Exception e) { + safeRollBack(session, e); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Unexpected exception", e); + } finally { + closer.close(); + } } - - MessageListener intListener = (transactionManager != null) - ? new XATransactionalMessageListener(transactionManager, session, listenerHandler) - : new LocalTransactionalMessageListener(session, listenerHandler); - // new DispachingListener(getExecutor(), listenerHandler); - consumer.setMessageListener(intListener); - - running = true; - } catch (JMSException e) { - throw JMSUtil.convertJmsException(e); - } - } - @Override - public void stop() { - running = false; - ResourceCloser.close(consumer); - ResourceCloser.close(session); - consumer = null; - session = null; - } + } - @Override - public void shutdown() { - stop(); - ResourceCloser.close(connection); } - protected TransactionManager getTransactionManager() { - if (this.transactionManager == null) { - try { - InitialContext ctx = new InitialContext(); - this.transactionManager = (TransactionManager)ctx - .lookup("javax.transaction.TransactionManager"); - } catch (NamingException e) { - // Ignore + private void safeRollBack(Session session, Exception e) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e); + try { + if (transactionManager != null) { + transactionManager.rollback(); + } else { + session.rollback(); } + } catch (Exception e1) { + LOG.log(Level.WARNING, "Rollback of Local transaction failed", e1); } - return this.transactionManager; } - static class DispachingListener implements MessageListener { - private Executor executor; - private MessageListener listenerHandler; - - public DispachingListener(Executor executor, MessageListener listenerHandler) { - this.executor = executor; - this.listenerHandler = listenerHandler; + @Override + public void start() { + if (running) { + return; } - - @Override - public void onMessage(final Message message) { - executor.execute(new Runnable() { - - @Override - public void run() { - listenerHandler.onMessage(message); - } - - }); + running = true; + pollers = Executors.newFixedThreadPool(numListenerThreads); + for (int c = 0; c < numListenerThreads; c++) { + pollers.execute(new Poller()); } - } - - static class LocalTransactionalMessageListener implements MessageListener { - private MessageListener listenerHandler; - private Session session; - - public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler) { - this.session = session; - this.listenerHandler = listenerHandler; - } - @Override - public void onMessage(Message message) { - try { - listenerHandler.onMessage(message); - session.commit(); - } catch (Throwable e) { - safeRollback(e); - } + @Override + public void stop() { + if (!running) { + return; } - - private void safeRollback(Throwable t) { - LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); - try { - session.rollback(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Rollback of Local transaction failed", e); - } + running = false; + pollers.shutdown(); + try { + pollers.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore } - + pollers = null; } - - static class XATransactionalMessageListener implements MessageListener { - private TransactionManager tm; - private MessageListener listenerHandler; - private XASession session; - - public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { - if (tm == null) { - throw new IllegalArgumentException("Must supply a transaction manager"); - } - if (session == null || !(session instanceof XASession)) { - throw new IllegalArgumentException("Must supply an XASession"); - } - this.tm = tm; - this.session = (XASession)session; - this.listenerHandler = listenerHandler; - } - @Override - public void onMessage(Message message) { - try { - tm.begin(); - tm.getTransaction().enlistResource(session.getXAResource()); - listenerHandler.onMessage(message); - tm.commit(); - } catch (Throwable e) { - safeRollback(e); - if (e instanceof RuntimeException) { - throw (RuntimeException)e; - } else { - throw new RuntimeException(e); - } - } - } - - private void safeRollback(Throwable t) { - LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); - try { - tm.rollback(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e); - } - } - + @Override + public void shutdown() { + stop(); + ResourceCloser.close(connection); } }