Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0BA7D10D7C for ; Mon, 19 Jan 2015 23:08:01 +0000 (UTC) Received: (qmail 54005 invoked by uid 500); 19 Jan 2015 23:08:03 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 53932 invoked by uid 500); 19 Jan 2015 23:08:03 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 53922 invoked by uid 99); 19 Jan 2015 23:08:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Jan 2015 23:08:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07CC5E03A8; Mon, 19 Jan 2015 23:08:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cschneider@apache.org To: commits@cxf.apache.org Message-Id: <6fec30e1cc04430d92f542d06188b005@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: cxf git commit: CXF-6199 Separate XA and local transaction handling Date: Mon, 19 Jan 2015 23:08:03 +0000 (UTC) Repository: cxf Updated Branches: refs/heads/master a3fa00bcc -> 13d197463 CXF-6199 Separate XA and local transaction handling Project: http://git-wip-us.apache.org/repos/asf/cxf/repo Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/13d19746 Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/13d19746 Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/13d19746 Branch: refs/heads/master Commit: 13d19746384e4b0633e1552ae14674f396b0557a Parents: a3fa00b Author: Christian Schneider Authored: Tue Jan 20 00:07:57 2015 +0100 Committer: Christian Schneider Committed: Tue Jan 20 00:07:57 2015 +0100 ---------------------------------------------------------------------- .../util/PollingMessageListenerContainer.java | 113 +++++++++++++------ .../transport/jms/util/MessageListenerTest.java | 2 +- 2 files changed, 79 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cxf/blob/13d19746/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java index 9c2e29e..8c68338 100644 --- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java +++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java @@ -26,6 +26,7 @@ import java.util.logging.Logger; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -56,34 +57,25 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont MessageConsumer consumer = null; Session session = null; try { - if (transactionManager != null) { - transactionManager.begin(); - } - + // Create session early to optimize performance session = connection.createSession(transacted, acknowledgeMode); - if (durableSubscriptionName != null && destination instanceof Topic) { - consumer = session.createDurableSubscriber((Topic)destination, - durableSubscriptionName, - messageSelector, - pubSubNoLocal); - } else { - consumer = session.createConsumer(destination, messageSelector); - } - Message message = consumer.receive(1000); - try { - if (message != null) { - listenerHandler.onMessage(message); - } - if (transactionManager != null) { - transactionManager.commit(); - } else if (session.getTransacted()) { - session.commit(); + consumer = createConsumer(session); + while (running) { + Message message = consumer.receive(1000); + try { + if (message != null) { + listenerHandler.onMessage(message); + } + if (session.getTransacted()) { + session.commit(); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e); + safeRollBack(session, e); } - } catch (Exception e) { - safeRollBack(session, e); } } catch (Exception e) { - LOG.log(Level.WARNING, "Unexpected exception", e); + LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e); } finally { ResourceCloser.close(consumer); ResourceCloser.close(session); @@ -91,21 +83,71 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont } } + + private void safeRollBack(Session session, Exception e) { + try { + if (session.getTransacted()) { + session.rollback(); + } + } catch (Exception e1) { + LOG.log(Level.WARNING, "Rollback of Local transaction failed", e1); + } + } } + + private class XAPoller implements Runnable { - private void safeRollBack(Session session, Exception e) { - LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e); - try { - if (transactionManager != null) { - transactionManager.rollback(); - } else { - if (session.getTransacted()) { - session.rollback(); + @Override + public void run() { + while (running) { + MessageConsumer consumer = null; + Session session = null; + try { + transactionManager.begin(); + /* + * Create session inside transaction to give it the + * chance to enlist itself as a resource + */ + session = connection.createSession(transacted, acknowledgeMode); + consumer = createConsumer(session); + Message message = consumer.receive(1000); + try { + if (message != null) { + listenerHandler.onMessage(message); + } + transactionManager.commit(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e); + safeRollBack(session); + } finally { + ResourceCloser.close(consumer); + ResourceCloser.close(session); + } + } catch (Exception e) { + LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e); } + + } + + } + + private void safeRollBack(Session session) { + try { + transactionManager.rollback(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Rollback of XA transaction failed", e); } - } catch (Exception e1) { - LOG.log(Level.WARNING, "Rollback of Local transaction failed", e1); + } + + } + + private MessageConsumer createConsumer(Session session) throws JMSException { + if (durableSubscriptionName != null && destination instanceof Topic) { + return session.createDurableSubscriber((Topic)destination, durableSubscriptionName, + messageSelector, pubSubNoLocal); + } else { + return session.createConsumer(destination, messageSelector); } } @@ -117,7 +159,8 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont running = true; pollers = Executors.newFixedThreadPool(concurrentConsumers); for (int c = 0; c < concurrentConsumers; c++) { - pollers.execute(new Poller()); + Runnable poller = (transactionManager != null) ? new XAPoller() : new Poller(); + pollers.execute(poller); } } http://git-wip-us.apache.org/repos/asf/cxf/blob/13d19746/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java ---------------------------------------------------------------------- diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java index 2484277..8b635e8 100644 --- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java +++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java @@ -173,7 +173,7 @@ public class MessageListenerTest { // + ", expecting: " + expectedNum); Thread.sleep(100); } while ((System.currentTimeMillis() - startTime < timeout) && expectedNum != actualNum); - Assert.assertEquals(message + " -> number of messages", expectedNum, actualNum); + Assert.assertEquals(message + " -> number of messages on queue", expectedNum, actualNum); } private void sendMessage(Connection connection, Destination dest, String content) throws JMSException,