cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject git commit: CXF-5680 Support reconnect in destination and conduit
Date Wed, 09 Apr 2014 11:13:25 GMT
Repository: cxf
Updated Branches:
  refs/heads/master 2e6ce3106 -> 811e10d4d


CXF-5680 Support reconnect in destination and conduit


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/811e10d4
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/811e10d4
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/811e10d4

Branch: refs/heads/master
Commit: 811e10d4d556f01377b11df3fe204c7f1dd32c02
Parents: 2e6ce31
Author: Christian Schneider <chris@die-schneider.net>
Authored: Wed Apr 9 13:13:15 2014 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Wed Apr 9 13:13:15 2014 +0200

----------------------------------------------------------------------
 .../apache/cxf/transport/jms/JMSConduit.java    | 15 ++++++-
 .../cxf/transport/jms/JMSDestination.java       | 44 ++++++++++++++++++--
 .../jms/util/MessageListenerContainer.java      |  7 ++++
 .../cxf/transport/jms/util/ResourceCloser.java  |  6 +--
 4 files changed, 62 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/811e10d4/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index e77efd7..93771a2 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -157,7 +157,20 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender,
Me
                 sendAndReceiveMessage(exchange, request, outMessage, closer, session);
             }
         } catch (JMSException e) {
-            throw new RuntimeException(e.getMessage(), e);
+            // Close connection so it will be refreshed on next try
+            ResourceCloser.close(connection);
+            this.connection = null;
+            this.staticReplyDestination = null;
+            if (this.jmsListener != null) {
+                this.jmsListener.shutdown();
+            }
+            this.jmsListener = null;
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e1) {
+                // Ignore
+            }
+            throw JMSUtil.convertJmsException(e);
         } finally {
             closer.close();
         }

http://git-wip-us.apache.org/repos/asf/cxf/blob/811e10d4/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index cda7c5e..7e5ac35 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -26,6 +26,7 @@ import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -63,6 +64,7 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
     private ThrottlingCounter suspendedContinuations;
     private ClassLoader loader;
     private Connection connection;
+    private boolean shutdown;
 
     public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
         super(b, getTargetReference(info, b), info);
@@ -87,12 +89,10 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
     public void activate() {
         getLogger().log(Level.FINE, "JMSDestination activate().... ");
         jmsConfig.ensureProperlyConfigured();
-
         jmsListener = createTargetDestinationListener();
-        int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax()
-                           / 100;
+        int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax()
/ 100;
         this.suspendedContinuations = new ThrottlingCounter(this.jmsListener, restartLimit,
-                                                            jmsConfig.getMaxSuspendedContinuations());
+                                             jmsConfig.getMaxSuspendedContinuations());
     }
     
     
@@ -100,6 +100,13 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         Session session = null;
         try {
             connection = JMSFactory.createConnection(jmsConfig);
+            connection.setExceptionListener(new ExceptionListener() {
+                
+                @Override
+                public void onException(JMSException exception) {
+                    restartConnection(exception);
+                }
+            });
             connection.start();
             session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE);
             Destination destination = jmsConfig.getTargetDestination(session);
@@ -116,13 +123,42 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         }
     }
 
+    protected void restartConnection(JMSException e) {
+        LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", e);
+        int tries = 0;
+        do {
+            tries++;
+            try {
+                deactivate();
+                activate();
+                LOG.log(Level.INFO, "Reestablished JMS connection");
+            } catch (Exception e1) {
+                jmsListener = null;
+                String message = "Exception on reconnect. Trying again, attempt num " + tries;
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.log(Level.WARNING, message, e);
+                } else {
+                    LOG.log(Level.WARNING, message);
+                }
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException e2) {
+                    // Ignore
+                }
+            }
+        } while (jmsListener == null && !shutdown);
+    }
+
     public void deactivate() {
         if (jmsListener != null) {
             jmsListener.shutdown();
         }
+        this.suspendedContinuations = null;
+        connection = null;
     }
 
     public void shutdown() {
+        this.shutdown = true;
         getLogger().log(Level.FINE, "JMSDestination shutdown()");
         this.deactivate();
     }

http://git-wip-us.apache.org/repos/asf/cxf/blob/811e10d4/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
index 828da94..106934c 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
@@ -38,6 +38,13 @@ import javax.transaction.TransactionManager;
 
 import org.apache.cxf.common.logging.LogUtils;
 
+/**
+ * Listen for messages on a queue or topic asynchronously by registering a
+ * MessageListener.
+ * 
+ * Warning: This class does not refresh connections when the server goes away
+ * This has to be handled outside.
+ */
 public class MessageListenerContainer implements JMSListenerContainer {
     private static final Logger LOG = LogUtils.getL7dLogger(MessageListenerContainer.class);
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/811e10d4/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
index 0314255..fa70cf5 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
@@ -23,12 +23,10 @@ import java.util.AbstractSequentialList;
 import java.util.LinkedList;
 
 import javax.jms.Connection;
-import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.naming.Context;
-import javax.naming.NamingException;
 
 public class ResourceCloser implements Closeable {
     private AbstractSequentialList<Object> resources;
@@ -73,9 +71,7 @@ public class ResourceCloser implements Closeable {
             } else {
                 throw new IllegalArgumentException("Can not handle resource " + resource.getClass());
             }
-        } catch (JMSException e) {
-            // Ignore
-        } catch (NamingException e) {
+        } catch (Exception e) {
             // Ignore
         }
     }


Mime
View raw message