cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ff...@apache.org
Subject [cxf] branch master updated: [CXF-6454] Handle InvalidClientIdException and allow to set retryInterval
Date Fri, 27 Oct 2017 07:13:04 GMT
This is an automated email from the ASF dual-hosted git repository.

ffang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a617b4  [CXF-6454] Handle InvalidClientIdException and allow to set retryInterval
4a617b4 is described below

commit 4a617b492a2185285372594570b6706c9c9e8f50
Author: Christian Schneider <chris@die-schneider.net>
AuthorDate: Thu Apr 21 15:25:52 2016 +0200

    [CXF-6454] Handle InvalidClientIdException and allow to set retryInterval
    
    (cherry picked from commit 5e3ac2b252412b90d6c91dea855773a294c3a565)
    
    Conflicts:
    	rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
    	rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    	rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
    	rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
---
 .../apache/cxf/transport/jms/JMSConfigFactory.java |  2 ++
 .../apache/cxf/transport/jms/JMSConfiguration.java |  9 ++++++++
 .../apache/cxf/transport/jms/JMSDestination.java   | 12 +++++++---
 .../apache/cxf/transport/jms/uri/JMSEndpoint.java  | 11 +++++++++
 .../jms/util/PollingMessageListenerContainer.java  |  5 +++++
 .../cxf/transport/jms/AbstractJMSTester.java       |  6 ++---
 .../cxf/transport/jms/JMSDestinationTest.java      | 26 ++++++++++++++++++++++
 7 files changed, 65 insertions(+), 6 deletions(-)

diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
index d8c588a..48953f5 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
@@ -130,6 +130,8 @@ public final class JMSConfigFactory {
         String targetService = endpoint.getTargetService();
         jmsConfig.setTargetService(targetService);
         jmsConfig.setMessageSelector(endpoint.getMessageSelector());
+        int retryInterval = endpoint.getRetryInterval();
+        jmsConfig.setRetryInterval(retryInterval);
         return jmsConfig;
     }
 
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index dc91253..64b21ad 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -97,6 +97,7 @@ public class JMSConfiguration {
     // For jms spec. Do not configure manually
     private String targetService;
     private String requestURI;
+    private int retryInterval;
 
     public void ensureProperlyConfigured() {
         ConnectionFactory cf = getConnectionFactory();
@@ -504,4 +505,12 @@ public class JMSConfiguration {
         this.transactionManager = transactionManager;
     }
 
+    public int getRetryInterval() {
+        return this.retryInterval;
+    }
+    
+    public void setRetryInterval(int retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+
 }
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index ef2be92..b85411e 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -27,6 +27,7 @@ import java.util.logging.Logger;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -101,7 +102,10 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         try {
             this.jmsListener = createTargetDestinationListener();
         } catch (Exception e) {
-            // If first connect fails we will try to establish the connection in the background
+            if (e.getCause() != null && InvalidClientIDException.class.isInstance(e.getCause()))
{
+                throw e;
+            }
+            // If first connect fails we will try to establish the connection in the background

             new Thread(new Runnable() {
 
                 @Override
@@ -148,6 +152,8 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
             connection.start();
             return container;
         } catch (JMSException e) {
+            ResourceCloser.close(connection);
+            this.connection = null;
             throw JMSUtil.convertJmsException(e);
         } finally {
             ResourceCloser.close(session);
@@ -171,9 +177,9 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
                     LOG.log(Level.WARNING, message);
                 }
                 try {
-                    Thread.sleep(5000);
+                    Thread.sleep(jmsConfig.getRetryInterval());
                 } catch (InterruptedException e2) {
-                    // Ignore
+                    shutdown = true;
                 }
             }
         } while (jmsListener == null && !shutdown);
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index ad4bbf4..89c12fb 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -87,6 +87,7 @@ public class JMSEndpoint {
     private String username;
     private int concurrentConsumers = 1;
     private String messageSelector;
+    private int retryInterval = 5000;
 
     /**
      * @param uri
@@ -488,4 +489,14 @@ public class JMSEndpoint {
 
 
 
+    public int getRetryInterval() {
+        return retryInterval;
+    }
+    public void setRetryInterval(int retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+    public void setRetryInterval(String retryInterval) {
+        this.retryInterval = Integer.valueOf(retryInterval);
+    }
+    
 }
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 80d5f89..9edd0da 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -74,6 +74,11 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                     }
                 } catch (Throwable e) {
                     handleException(e);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        // Ignore
+                    }
                 }
             }
         }
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index 8746628..c373665 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -32,7 +32,6 @@ import javax.xml.namespace.QName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -57,6 +56,7 @@ public abstract class AbstractJMSTester extends Assert {
     protected static final int MAX_RECEIVE_TIME = 10;
     protected static final String MESSAGE_CONTENT = "HelloWorld";
     protected static Bus bus;
+    protected static ActiveMQConnectionFactory cf1;
     protected static ConnectionFactory cf;
     protected static BrokerService broker;
 
@@ -78,8 +78,8 @@ public abstract class AbstractJMSTester extends Assert {
         broker.addConnector(brokerUri);
         broker.start();
         bus = BusFactory.getDefaultBus();
-        ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory(brokerUri);
-        cf = new PooledConnectionFactory(cf1);
+        cf1 = new ActiveMQConnectionFactory(brokerUri);
+        cf = cf1;
     }
 
     @AfterClass
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 3f8f154..52e3f54 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -25,8 +25,10 @@ import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.Topic;
@@ -41,6 +43,7 @@ import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.MultiplexDestination;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -76,6 +79,29 @@ public class JMSDestinationTest extends AbstractJMSTester {
         conduit.close();
         destination.shutdown();
     }
+    
+    @Test(expected = InvalidClientIDException.class)
+    public void testDurableInvalidClientId() throws Throwable {
+        Connection con = cf1.createConnection();
+        JMSDestination destination = null;
+        try {
+            con.setClientID("testClient");
+            con.start();
+            destMessage = null;
+            EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
+            JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei,
null);
+            jmsConfig.setDurableSubscriptionClientId("testClient");
+            jmsConfig.setDurableSubscriptionName("testsub");
+            jmsConfig.setConnectionFactory(cf);
+            destination = new JMSDestination(bus, ei, jmsConfig);
+            destination.setMessageObserver(createMessageObserver());
+        } catch (RuntimeException e) {
+            throw e.getCause();
+        } finally {
+            ResourceCloser.close(con);
+            destination.shutdown();
+        }
+    }
 
     @Test
     public void testOneWayDestination() throws Exception {

-- 
To stop receiving notification emails like this one, please contact
['"commits@cxf.apache.org" <commits@cxf.apache.org>'].

Mime
View raw message