Repository: cxf
Updated Branches:
refs/heads/master 1f430d4c7 -> 40cb28fbc
[CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/40cb28fb
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/40cb28fb
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/40cb28fb
Branch: refs/heads/master
Commit: 40cb28fbcc2e3d0805f0d3ea90797c1310c3cba5
Parents: 1f430d4
Author: Christian Schneider <chris@die-schneider.net>
Authored: Fri Apr 21 10:47:16 2017 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Fri Apr 21 12:29:55 2017 +0200
----------------------------------------------------------------------
rt/transports/jms/pom.xml | 8 +-
.../cxf/transport/jms/JMSDestination.java | 7 +-
.../util/PollingMessageListenerContainer.java | 93 +++++---------------
.../transport/jms/util/MessageListenerTest.java | 72 ++++++++++++++-
4 files changed, 103 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/pom.xml
----------------------------------------------------------------------
diff --git a/rt/transports/jms/pom.xml b/rt/transports/jms/pom.xml
index e13c5a6..e6fd9a4 100644
--- a/rt/transports/jms/pom.xml
+++ b/rt/transports/jms/pom.xml
@@ -45,7 +45,6 @@
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.1_spec</artifactId>
- <version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
@@ -64,6 +63,13 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <version>2.0.0</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-rt-management</artifactId>
http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 8ec23cd..22a94de 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -118,19 +118,20 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
Session session = null;
try {
connection = JMSFactory.createConnection(jmsConfig);
- connection.setExceptionListener(new ExceptionListener() {
+ ExceptionListener exListener = new ExceptionListener() {
public void onException(JMSException exception) {
if (!shutdown) {
LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect",
exception);
restartConnection();
}
}
- });
+ };
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = jmsConfig.getTargetDestination(session);
PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
-
destination, this);
+
destination,
+
this, exListener);
container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
container.setTransactionManager(jmsConfig.getTransactionManager());
container.setMessageSelector(jmsConfig.getMessageSelector());
http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index c4276eb..461a2b1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -36,24 +37,25 @@ import org.apache.cxf.common.logging.LogUtils;
public class PollingMessageListenerContainer extends AbstractMessageListenerContainer {
private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
+ private ExceptionListener exceptionListener;
public PollingMessageListenerContainer(Connection connection, Destination destination,
- MessageListener listenerHandler) {
+ MessageListener listenerHandler, ExceptionListener
exceptionListener) {
this.connection = connection;
this.destination = destination;
this.listenerHandler = listenerHandler;
+ this.exceptionListener = exceptionListener;
}
- private class Poller extends AbstractPoller implements Runnable {
+ private class Poller implements Runnable {
@Override
public void run() {
Session session = null;
- init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
- // Create session early to optimize performance
+ // Create session early to optimize performance // In
session = closer.register(connection.createSession(transacted, acknowledgeMode));
MessageConsumer consumer = closer.register(createConsumer(session));
while (running) {
@@ -70,14 +72,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
safeRollBack(session);
}
}
- } catch (Throwable e) {
- catchUnexpectedExceptionDuringPolling(null, e);
+ } catch (Exception e) {
+ handleException(e);
}
}
-
}
- @Override
protected void safeRollBack(Session session) {
try {
if (session != null && session.getTransacted()) {
@@ -90,11 +90,10 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
}
- private class XAPoller extends AbstractPoller implements Runnable {
+ private class XAPoller implements Runnable {
@Override
public void run() {
- init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
@@ -121,14 +120,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
safeRollBack(session);
}
} catch (Exception e) {
- catchUnexpectedExceptionDuringPolling(null, e);
+ handleException(e);
}
-
}
}
- @Override
protected void safeRollBack(Session session) {
try {
transactionManager.rollback();
@@ -139,64 +136,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
}
- private abstract class AbstractPoller {
- private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception";
- private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry";
- protected int retryCounter = -1;
- protected int counter;
- protected int sleepingTime = 5000;
-
- protected void init() {
- if (jndiEnvironment != null) {
- if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) {
- retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION));
- }
- if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) {
- sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY));
- }
- }
- }
-
- protected boolean hasToCount() {
- return retryCounter > -1;
- }
-
- protected boolean hasToStop() {
- return counter > retryCounter;
- }
-
- protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e)
{
- LOG.log(Level.WARNING, "Unexpected exception.", e);
- if (hasToCount()) {
- counter++;
- if (hasToStop()) {
- stop(session, e);
- }
- }
- if (running) {
- try {
- String log = "Now sleeping for " + sleepingTime / 1000 + " seconds";
- log += hasToCount()
- ? ". Then restarting session and consumer: attempt " + counter +
"/" + retryCounter
- : "";
- LOG.log(Level.WARNING, log);
- Thread.sleep(sleepingTime);
- } catch (InterruptedException e1) {
- LOG.log(Level.WARNING, e1.getMessage());
- }
- }
- }
-
- protected void stop(Session session, Throwable e) {
- LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e);
- safeRollBack(session);
- running = false;
- }
-
- protected abstract void safeRollBack(Session session);
-
- }
-
private MessageConsumer createConsumer(Session session) throws JMSException {
if (durableSubscriptionName != null && destination instanceof Topic) {
return session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
@@ -205,6 +144,18 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
return session.createConsumer(destination, messageSelector);
}
}
+
+ protected void handleException(Exception e) {
+ running = false;
+ JMSException wrapped;
+ if (e instanceof JMSException) {
+ wrapped = (JMSException) e;
+ } else {
+ wrapped = new JMSException("Wrapped exception. " + e.getMessage());
+ wrapped.addSuppressed(e);
+ }
+ this.exceptionListener.onException(wrapped);
+ }
@Override
public void start() {
http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index 82cc37a..228ffa7 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms.util;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -36,14 +37,76 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.pool.XaPooledConnectionFactory;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.awaitility.Awaitility;
+import org.easymock.Capture;
import org.junit.Assert;
import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
public class MessageListenerTest {
private static final String FAIL = "fail";
private static final String FAILFIRST = "failfirst";
private static final String OK = "ok";
+
+ @Test
+ public void testConnectionProblem() throws JMSException {
+ Connection connection = createConnection("broker");
+ Queue dest = JMSUtil.createQueue(connection, "test");
+
+ MessageListener listenerHandler = new TestMessageListener();
+ ExceptionListener exListener = createMock(ExceptionListener.class);
+
+ Capture<JMSException> captured = newCapture();
+ exListener.onException(capture(captured));
+ expectLastCall();
+ replay(exListener);
+
+ PollingMessageListenerContainer container = //
+ new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener);
+ connection.close(); // Simulate connection problem
+ container.start();
+ Awaitility.await().until(() -> !container.isRunning());
+ verify(exListener);
+ JMSException ex = captured.getValue();
+ Assert.assertEquals("The connection is already closed", ex.getMessage());
+ }
+
+ @Test
+ public void testConnectionProblemXA() throws JMSException, XAException, InterruptedException
{
+ TransactionManager transactionManager = new GeronimoTransactionManager();
+ Connection connection = createXAConnection("brokerJTA", transactionManager);
+ Queue dest = JMSUtil.createQueue(connection, "test");
+
+ MessageListener listenerHandler = new TestMessageListener();
+ ExceptionListener exListener = createMock(ExceptionListener.class);
+
+ Capture<JMSException> captured = newCapture();
+ exListener.onException(capture(captured));
+ expectLastCall();
+ replay(exListener);
+
+ PollingMessageListenerContainer container = //
+ new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener);
+ container.setTransacted(false);
+ container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
+ container.setTransactionManager(transactionManager);
+
+ connection.close(); // Simulate connection problem
+ container.start();
+ Awaitility.await().until(() -> !container.isRunning());
+ verify(exListener);
+ JMSException ex = captured.getValue();
+ // Closing the pooled connection will result in a NPE when using it
+ Assert.assertEquals("Wrapped exception. null", ex.getMessage());
+ }
@Test
public void testWithJTA() throws JMSException, XAException, InterruptedException {
@@ -52,11 +115,16 @@ public class MessageListenerTest {
Queue dest = JMSUtil.createQueue(connection, "test");
MessageListener listenerHandler = new TestMessageListener();
+ ExceptionListener exListener = new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ }
+ };
PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
dest,
- listenerHandler);
+ listenerHandler,
exListener);
container.setTransacted(false);
container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
-
container.setTransactionManager(transactionManager);
container.start();
|