Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 601C010466 for ; Fri, 4 Apr 2014 07:15:42 +0000 (UTC) Received: (qmail 20507 invoked by uid 500); 4 Apr 2014 07:15:36 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 20326 invoked by uid 500); 4 Apr 2014 07:15:34 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 20289 invoked by uid 99); 4 Apr 2014 07:15:29 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Apr 2014 07:15:29 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9D79694A0FB; Fri, 4 Apr 2014 07:15:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cschneider@apache.org To: commits@cxf.apache.org Date: Fri, 04 Apr 2014 07:15:29 -0000 Message-Id: <8c0e5c1c592b42908696021bc96af07f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: CXF-5543 Fixing transactional tests and adding Polling Repository: cxf Updated Branches: refs/heads/master c67b0119c -> 7c7fff780 CXF-5543 Fixing transactional tests and adding Polling Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/a96ef8a0 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/a96ef8a0 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/a96ef8a0 Branch: refs/heads/master Commit: a96ef8a0f5d3f311a2530b9f13a2a8424c1b665c Parents: c62ac16 Author: Christian Schneider Authored: Thu Apr 3 13:55:34 2014 +0200 Committer: Christian Schneider Committed: Thu Apr 3 13:55:34 2014 +0200 ---------------------------------------------------------------------- .../jms/util/MessageListenerContainer.java | 253 ------------------- .../util/PollingMessageListenerContainer.java | 253 +++++++++++++++++++ 2 files changed, 253 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/a96ef8a0/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java deleted file mode 100644 index 01d9ae7..0000000 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.cxf.transport.jms.util; - -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.XASession; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import javax.transaction.TransactionManager; - -import org.apache.cxf.common.logging.LogUtils; - -public class MessageListenerContainer implements JMSListenerContainer { - private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class); - - private Connection connection; - private Destination destination; - private MessageListener listenerHandler; - private boolean transacted; - private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; - private String messageSelector; - private boolean running; - private MessageConsumer consumer; - private Session session; - private Executor executor; - private String durableSubscriptionName; - private boolean pubSubNoLocal; - private TransactionManager transactionManager; - - public MessageListenerContainer(Connection connection, Destination destination, - MessageListener listenerHandler) { - this.connection = connection; - this.destination = destination; - this.listenerHandler = listenerHandler; - } - - public Connection getConnection() { - return connection; - } - - public void setTransacted(boolean transacted) { - this.transacted = transacted; - } - - public void setAcknowledgeMode(int acknowledgeMode) { - this.acknowledgeMode = acknowledgeMode; - } - - public void setMessageSelector(String messageSelector) { - this.messageSelector = messageSelector; - } - - protected Executor getExecutor() { - if (executor == null) { - executor = Executors.newFixedThreadPool(10); - } - return executor; - } - - public void setExecutor(Executor executor) { - this.executor = executor; - } - - public void setDurableSubscriptionName(String durableSubscriptionName) { - this.durableSubscriptionName = durableSubscriptionName; - } - - public void setPubSubNoLocal(boolean pubSubNoLocal) { - this.pubSubNoLocal = pubSubNoLocal; - } - - @Override - public boolean isRunning() { - return running; - } - - public void setTransactionManager(TransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - - @Override - public void start() { - try { - session = connection.createSession(transacted, acknowledgeMode); - if (durableSubscriptionName != null) { - consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName, - messageSelector, pubSubNoLocal); - } else { - consumer = session.createConsumer(destination, messageSelector); - } - - MessageListener intListener = (transactionManager != null) - ? new XATransactionalMessageListener(transactionManager, session, listenerHandler) - : new LocalTransactionalMessageListener(session, listenerHandler); - // new DispachingListener(getExecutor(), listenerHandler); - consumer.setMessageListener(intListener); - - running = true; - } catch (JMSException e) { - throw JMSUtil.convertJmsException(e); - } - } - - @Override - public void stop() { - running = false; - ResourceCloser.close(consumer); - ResourceCloser.close(session); - consumer = null; - session = null; - } - - @Override - public void shutdown() { - stop(); - ResourceCloser.close(connection); - } - - protected TransactionManager getTransactionManager() { - if (this.transactionManager == null) { - try { - InitialContext ctx = new InitialContext(); - this.transactionManager = (TransactionManager)ctx - .lookup("javax.transaction.TransactionManager"); - } catch (NamingException e) { - // Ignore - } - } - return this.transactionManager; - } - - static class DispachingListener implements MessageListener { - private Executor executor; - private MessageListener listenerHandler; - - public DispachingListener(Executor executor, MessageListener listenerHandler) { - this.executor = executor; - this.listenerHandler = listenerHandler; - } - - @Override - public void onMessage(final Message message) { - executor.execute(new Runnable() { - - @Override - public void run() { - listenerHandler.onMessage(message); - } - - }); - } - - } - - static class LocalTransactionalMessageListener implements MessageListener { - private MessageListener listenerHandler; - private Session session; - - public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler) { - this.session = session; - this.listenerHandler = listenerHandler; - } - - @Override - public void onMessage(Message message) { - try { - listenerHandler.onMessage(message); - session.commit(); - } catch (Throwable e) { - safeRollback(e); - } - } - - private void safeRollback(Throwable t) { - LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); - try { - session.rollback(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Rollback of Local transaction failed", e); - } - } - - } - - @SuppressWarnings("PMD") - static class XATransactionalMessageListener implements MessageListener { - private TransactionManager tm; - private MessageListener listenerHandler; - private XASession session; - - public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { - if (tm == null) { - throw new IllegalArgumentException("Must supply a transaction manager"); - } - if (session == null || !(session instanceof XASession)) { - throw new IllegalArgumentException("Must supply an XASession"); - } - this.tm = tm; - this.session = (XASession)session; - this.listenerHandler = listenerHandler; - } - - @Override - public void onMessage(Message message) { - try { - tm.begin(); - tm.getTransaction().enlistResource(session.getXAResource()); - listenerHandler.onMessage(message); - tm.commit(); - } catch (Throwable e) { - safeRollback(e); - } - } - - private void safeRollback(Throwable t) { - LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); - try { - tm.rollback(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/cxf/blob/a96ef8a0/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java new file mode 100644 index 0000000..01d9ae7 --- /dev/null +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.jms.util; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.XASession; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import javax.transaction.TransactionManager; + +import org.apache.cxf.common.logging.LogUtils; + +public class MessageListenerContainer implements JMSListenerContainer { + private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class); + + private Connection connection; + private Destination destination; + private MessageListener listenerHandler; + private boolean transacted; + private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; + private String messageSelector; + private boolean running; + private MessageConsumer consumer; + private Session session; + private Executor executor; + private String durableSubscriptionName; + private boolean pubSubNoLocal; + private TransactionManager transactionManager; + + public MessageListenerContainer(Connection connection, Destination destination, + MessageListener listenerHandler) { + this.connection = connection; + this.destination = destination; + this.listenerHandler = listenerHandler; + } + + public Connection getConnection() { + return connection; + } + + public void setTransacted(boolean transacted) { + this.transacted = transacted; + } + + public void setAcknowledgeMode(int acknowledgeMode) { + this.acknowledgeMode = acknowledgeMode; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + protected Executor getExecutor() { + if (executor == null) { + executor = Executors.newFixedThreadPool(10); + } + return executor; + } + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + public void setDurableSubscriptionName(String durableSubscriptionName) { + this.durableSubscriptionName = durableSubscriptionName; + } + + public void setPubSubNoLocal(boolean pubSubNoLocal) { + this.pubSubNoLocal = pubSubNoLocal; + } + + @Override + public boolean isRunning() { + return running; + } + + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + @Override + public void start() { + try { + session = connection.createSession(transacted, acknowledgeMode); + if (durableSubscriptionName != null) { + consumer = session.createDurableSubscriber((Topic)destination, durableSubscriptionName, + messageSelector, pubSubNoLocal); + } else { + consumer = session.createConsumer(destination, messageSelector); + } + + MessageListener intListener = (transactionManager != null) + ? new XATransactionalMessageListener(transactionManager, session, listenerHandler) + : new LocalTransactionalMessageListener(session, listenerHandler); + // new DispachingListener(getExecutor(), listenerHandler); + consumer.setMessageListener(intListener); + + running = true; + } catch (JMSException e) { + throw JMSUtil.convertJmsException(e); + } + } + + @Override + public void stop() { + running = false; + ResourceCloser.close(consumer); + ResourceCloser.close(session); + consumer = null; + session = null; + } + + @Override + public void shutdown() { + stop(); + ResourceCloser.close(connection); + } + + protected TransactionManager getTransactionManager() { + if (this.transactionManager == null) { + try { + InitialContext ctx = new InitialContext(); + this.transactionManager = (TransactionManager)ctx + .lookup("javax.transaction.TransactionManager"); + } catch (NamingException e) { + // Ignore + } + } + return this.transactionManager; + } + + static class DispachingListener implements MessageListener { + private Executor executor; + private MessageListener listenerHandler; + + public DispachingListener(Executor executor, MessageListener listenerHandler) { + this.executor = executor; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(final Message message) { + executor.execute(new Runnable() { + + @Override + public void run() { + listenerHandler.onMessage(message); + } + + }); + } + + } + + static class LocalTransactionalMessageListener implements MessageListener { + private MessageListener listenerHandler; + private Session session; + + public LocalTransactionalMessageListener(Session session, MessageListener listenerHandler) { + this.session = session; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(Message message) { + try { + listenerHandler.onMessage(message); + session.commit(); + } catch (Throwable e) { + safeRollback(e); + } + } + + private void safeRollback(Throwable t) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); + try { + session.rollback(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Rollback of Local transaction failed", e); + } + } + + } + + @SuppressWarnings("PMD") + static class XATransactionalMessageListener implements MessageListener { + private TransactionManager tm; + private MessageListener listenerHandler; + private XASession session; + + public XATransactionalMessageListener(TransactionManager tm, Session session, MessageListener listenerHandler) { + if (tm == null) { + throw new IllegalArgumentException("Must supply a transaction manager"); + } + if (session == null || !(session instanceof XASession)) { + throw new IllegalArgumentException("Must supply an XASession"); + } + this.tm = tm; + this.session = (XASession)session; + this.listenerHandler = listenerHandler; + } + + @Override + public void onMessage(Message message) { + try { + tm.begin(); + tm.getTransaction().enlistResource(session.getXAResource()); + listenerHandler.onMessage(message); + tm.commit(); + } catch (Throwable e) { + safeRollback(e); + } + } + + private void safeRollback(Throwable t) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back" , t); + try { + tm.rollback(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Rollback of JTA transaction failed", e); + } + } + + } +}