cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject cxf git commit: [CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener
Date Fri, 21 Apr 2017 10:30:02 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 1f430d4c7 -> 40cb28fbc


[CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/40cb28fb
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/40cb28fb
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/40cb28fb

Branch: refs/heads/master
Commit: 40cb28fbcc2e3d0805f0d3ea90797c1310c3cba5
Parents: 1f430d4
Author: Christian Schneider <chris@die-schneider.net>
Authored: Fri Apr 21 10:47:16 2017 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Fri Apr 21 12:29:55 2017 +0200

----------------------------------------------------------------------
 rt/transports/jms/pom.xml                       |  8 +-
 .../cxf/transport/jms/JMSDestination.java       |  7 +-
 .../util/PollingMessageListenerContainer.java   | 93 +++++---------------
 .../transport/jms/util/MessageListenerTest.java | 72 ++++++++++++++-
 4 files changed, 103 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/pom.xml
----------------------------------------------------------------------
diff --git a/rt/transports/jms/pom.xml b/rt/transports/jms/pom.xml
index e13c5a6..e6fd9a4 100644
--- a/rt/transports/jms/pom.xml
+++ b/rt/transports/jms/pom.xml
@@ -45,7 +45,6 @@
         <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
             <artifactId>geronimo-jta_1.1_spec</artifactId>
-            <version>1.1.1</version>
         </dependency>
         <dependency>
             <groupId>org.apache.geronimo.specs</groupId>
@@ -64,6 +63,13 @@
             <artifactId>easymock</artifactId>
             <scope>test</scope>
         </dependency>
+        
+        <dependency>
+        	<groupId>org.awaitility</groupId>
+        	<artifactId>awaitility</artifactId>
+        	<version>2.0.0</version>
+        	<scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.cxf</groupId>
             <artifactId>cxf-rt-management</artifactId>

http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 8ec23cd..22a94de 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -118,19 +118,20 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         Session session = null;
         try {
             connection = JMSFactory.createConnection(jmsConfig);
-            connection.setExceptionListener(new ExceptionListener() {
+            ExceptionListener exListener = new ExceptionListener() {
                 public void onException(JMSException exception) {
                     if (!shutdown) {
                         LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect",
exception);
                         restartConnection();
                     }
                 }
-            });
+            };
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Destination destination = jmsConfig.getTargetDestination(session);
 
             PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
-                                                                                        
   destination, this);
+                                                                                        
   destination, 
+                                                                                        
   this, exListener);
             container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
             container.setTransactionManager(jmsConfig.getTransactionManager());
             container.setMessageSelector(jmsConfig.getMessageSelector());

http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index c4276eb..461a2b1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -36,24 +37,25 @@ import org.apache.cxf.common.logging.LogUtils;
 
 public class PollingMessageListenerContainer extends AbstractMessageListenerContainer {
     private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
+    private ExceptionListener exceptionListener;
 
     public PollingMessageListenerContainer(Connection connection, Destination destination,
-                                           MessageListener listenerHandler) {
+                                           MessageListener listenerHandler, ExceptionListener
exceptionListener) {
         this.connection = connection;
         this.destination = destination;
         this.listenerHandler = listenerHandler;
+        this.exceptionListener = exceptionListener;
     }
 
-    private class Poller extends AbstractPoller implements Runnable {
+    private class Poller implements Runnable {
 
         @Override
         public void run() {
             Session session = null;
-            init();
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
-                    // Create session early to optimize performance
+                    // Create session early to optimize performance                // In
                     session = closer.register(connection.createSession(transacted, acknowledgeMode));
                     MessageConsumer consumer = closer.register(createConsumer(session));
                     while (running) {
@@ -70,14 +72,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                             safeRollBack(session);
                         }
                     }
-                } catch (Throwable e) {
-                    catchUnexpectedExceptionDuringPolling(null, e);
+                } catch (Exception e) {
+                    handleException(e);
                 }
             }
-
         }
 
-        @Override
         protected void safeRollBack(Session session) {
             try {
                 if (session != null && session.getTransacted()) {
@@ -90,11 +90,10 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
 
     }
 
-    private class XAPoller extends AbstractPoller implements Runnable {
+    private class XAPoller implements Runnable {
 
         @Override
         public void run() {
-            init();
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
@@ -121,14 +120,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                         safeRollBack(session);
                     }
                 } catch (Exception e) {
-                    catchUnexpectedExceptionDuringPolling(null, e);
+                    handleException(e);
                 }
-
             }
 
         }
 
-        @Override
         protected void safeRollBack(Session session) {
             try {
                 transactionManager.rollback();
@@ -139,64 +136,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
 
     }
 
-    private abstract class AbstractPoller {
-        private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception";
-        private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry";
-        protected int retryCounter = -1;
-        protected int counter;
-        protected int sleepingTime = 5000;
-
-        protected void init() {
-            if (jndiEnvironment != null) {
-                if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) {
-                    retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION));
-                }
-                if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) {
-                    sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY));
-                }
-            }
-        }
-
-        protected boolean hasToCount() {
-            return retryCounter > -1;
-        }
-
-        protected boolean hasToStop() {
-            return counter > retryCounter;
-        }
-
-        protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e)
{
-            LOG.log(Level.WARNING, "Unexpected exception.", e);
-            if (hasToCount()) {
-                counter++;
-                if (hasToStop()) {
-                    stop(session, e);
-                }
-            }
-            if (running) {
-                try {
-                    String log = "Now sleeping for " + sleepingTime / 1000 + " seconds";
-                    log += hasToCount()
-                        ? ". Then restarting session and consumer: attempt " + counter +
"/" + retryCounter
-                        : "";
-                    LOG.log(Level.WARNING, log);
-                    Thread.sleep(sleepingTime);
-                } catch (InterruptedException e1) {
-                    LOG.log(Level.WARNING, e1.getMessage());
-                }
-            }
-        }
-
-        protected void stop(Session session, Throwable e) {
-            LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e);
-            safeRollBack(session);
-            running = false;
-        }
-
-        protected abstract void safeRollBack(Session session);
-
-    }
-    
     private MessageConsumer createConsumer(Session session) throws JMSException {
         if (durableSubscriptionName != null && destination instanceof Topic) {
             return session.createDurableSubscriber((Topic)destination, durableSubscriptionName,
@@ -205,6 +144,18 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
             return session.createConsumer(destination, messageSelector);
         }
     }
+    
+    protected void handleException(Exception e) {
+        running = false;
+        JMSException wrapped;
+        if (e  instanceof JMSException) {
+            wrapped = (JMSException) e;
+        } else {
+            wrapped = new JMSException("Wrapped exception. " + e.getMessage());
+            wrapped.addSuppressed(e);
+        }
+        this.exceptionListener.onException(wrapped);
+    }
 
     @Override
     public void start() {

http://git-wip-us.apache.org/repos/asf/cxf/blob/40cb28fb/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index 82cc37a..228ffa7 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms.util;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -36,14 +37,76 @@ import org.apache.activemq.ActiveMQXAConnectionFactory;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.pool.XaPooledConnectionFactory;
 import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.awaitility.Awaitility;
+import org.easymock.Capture;
 import org.junit.Assert;
 import org.junit.Test;
 
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
 public class MessageListenerTest {
 
     private static final String FAIL = "fail";
     private static final String FAILFIRST = "failfirst";
     private static final String OK = "ok";
+    
+    @Test
+    public void testConnectionProblem() throws JMSException {
+        Connection connection = createConnection("broker");
+        Queue dest = JMSUtil.createQueue(connection, "test");
+
+        MessageListener listenerHandler = new TestMessageListener();
+        ExceptionListener exListener = createMock(ExceptionListener.class);
+        
+        Capture<JMSException> captured = newCapture();
+        exListener.onException(capture(captured));
+        expectLastCall();
+        replay(exListener);
+
+        PollingMessageListenerContainer container = //
+            new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener);
+        connection.close(); // Simulate connection problem
+        container.start();
+        Awaitility.await().until(() -> !container.isRunning());
+        verify(exListener);
+        JMSException ex = captured.getValue();
+        Assert.assertEquals("The connection is already closed", ex.getMessage());
+    }
+    
+    @Test
+    public void testConnectionProblemXA() throws JMSException, XAException, InterruptedException
{
+        TransactionManager transactionManager = new GeronimoTransactionManager();
+        Connection connection = createXAConnection("brokerJTA", transactionManager);
+        Queue dest = JMSUtil.createQueue(connection, "test");
+
+        MessageListener listenerHandler = new TestMessageListener();
+        ExceptionListener exListener = createMock(ExceptionListener.class);
+        
+        Capture<JMSException> captured = newCapture();
+        exListener.onException(capture(captured));
+        expectLastCall();
+        replay(exListener);
+
+        PollingMessageListenerContainer container = //
+            new PollingMessageListenerContainer(connection, dest, listenerHandler, exListener);
+        container.setTransacted(false);
+        container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
+        container.setTransactionManager(transactionManager);
+
+        connection.close(); // Simulate connection problem
+        container.start();
+        Awaitility.await().until(() -> !container.isRunning());
+        verify(exListener);
+        JMSException ex = captured.getValue();
+        // Closing the pooled connection will result in a NPE when using it
+        Assert.assertEquals("Wrapped exception. null", ex.getMessage());
+    }
 
     @Test
     public void testWithJTA() throws JMSException, XAException, InterruptedException {
@@ -52,11 +115,16 @@ public class MessageListenerTest {
         Queue dest = JMSUtil.createQueue(connection, "test");
 
         MessageListener listenerHandler = new TestMessageListener();
+        ExceptionListener exListener = new ExceptionListener() {
+            
+            @Override
+            public void onException(JMSException exception) {
+            }
+        };
         PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
dest,
-                                                                                        listenerHandler);
+                                                                                        listenerHandler,
exListener);
         container.setTransacted(false);
         container.setAcknowledgeMode(Session.SESSION_TRANSACTED);
-
         container.setTransactionManager(transactionManager);
         container.start();
 


Mime
View raw message