cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject cxf git commit: [CXF-6454] Handle InvalidClientIdException and allow to set retryInterval
Date Thu, 21 Apr 2016 13:31:51 GMT
Repository: cxf
Updated Branches:
  refs/heads/CXF-6454 [created] 5e3ac2b25


[CXF-6454] Handle InvalidClientIdException and allow to set retryInterval


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/5e3ac2b2
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/5e3ac2b2
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/5e3ac2b2

Branch: refs/heads/CXF-6454
Commit: 5e3ac2b252412b90d6c91dea855773a294c3a565
Parents: ea6a524
Author: Christian Schneider <chris@die-schneider.net>
Authored: Thu Apr 21 15:25:52 2016 +0200
Committer: Christian Schneider <chris@die-schneider.net>
Committed: Thu Apr 21 15:25:52 2016 +0200

----------------------------------------------------------------------
 .../cxf/transport/jms/JMSConfigFactory.java     |  2 ++
 .../cxf/transport/jms/JMSConfiguration.java     |  9 +++++++
 .../cxf/transport/jms/JMSDestination.java       | 10 ++++++--
 .../cxf/transport/jms/uri/JMSEndpoint.java      | 11 +++++++++
 .../util/PollingMessageListenerContainer.java   |  5 ++++
 .../cxf/transport/jms/AbstractJMSTester.java    |  6 ++---
 .../cxf/transport/jms/JMSDestinationTest.java   | 26 ++++++++++++++++++++
 7 files changed, 64 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/5e3ac2b2/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
index 464fc7a..972c4a0 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfigFactory.java
@@ -129,6 +129,8 @@ public final class JMSConfigFactory {
         
         String targetService = endpoint.getTargetService();
         jmsConfig.setTargetService(targetService);
+        int retryInterval = endpoint.getRetryInterval();
+        jmsConfig.setRetryInterval(retryInterval);
         return jmsConfig;
     }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/5e3ac2b2/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index 4ab0c89..04b67ef 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -95,6 +95,7 @@ public class JMSConfiguration {
     // For jms spec. Do not configure manually
     private String targetService;
     private String requestURI;
+    private int retryInterval;
 
 
 
@@ -481,4 +482,12 @@ public class JMSConfiguration {
         this.transactionManager = transactionManager;
     }
 
+    public int getRetryInterval() {
+        return this.retryInterval;
+    }
+    
+    public void setRetryInterval(int retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/5e3ac2b2/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 2b5d8cd..24b6ec1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -27,6 +27,7 @@ import java.util.logging.Logger;
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -105,6 +106,9 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
         try {
             this.jmsListener = createTargetDestinationListener();
         } catch (Exception e) {
+            if (e.getCause() != null && InvalidClientIDException.class.isInstance(e.getCause()))
{
+                throw e;
+            }
             // If first connect fails we will try to establish the connection in the background

             new Thread(new Runnable() {
                 
@@ -150,6 +154,8 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
             connection.start();
             return container;
         } catch (JMSException e) {
+            ResourceCloser.close(connection);
+            this.connection = null;
             throw JMSUtil.convertJmsException(e);
         } finally {
             ResourceCloser.close(session);
@@ -173,9 +179,9 @@ public class JMSDestination extends AbstractMultiplexDestination implements
Mess
                     LOG.log(Level.WARNING, message);
                 }
                 try {
-                    Thread.sleep(5000);
+                    Thread.sleep(jmsConfig.getRetryInterval());
                 } catch (InterruptedException e2) {
-                    // Ignore
+                    shutdown = true;
                 }
             }
         } while (jmsListener == null && !shutdown);

http://git-wip-us.apache.org/repos/asf/cxf/blob/5e3ac2b2/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
index f5e9d03..17a5705 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/uri/JMSEndpoint.java
@@ -85,6 +85,7 @@ public class JMSEndpoint {
     private boolean useConduitIdSelector = true;
     private String username;
     private int concurrentConsumers = 1;
+    private int retryInterval = 5000;
 
     /**
      * @param uri
@@ -475,5 +476,15 @@ public class JMSEndpoint {
             throw new IllegalArgumentException(v);
         }
     }
+
+    public int getRetryInterval() {
+        return retryInterval;
+    }
+    public void setRetryInterval(int retryInterval) {
+        this.retryInterval = retryInterval;
+    }
+    public void setRetryInterval(String retryInterval) {
+        this.retryInterval = Integer.valueOf(retryInterval);
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/cxf/blob/5e3ac2b2/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index 9f8fcb2..830f2ba 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -70,6 +70,11 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                     }
                 } catch (Exception e) {
                     LOG.log(Level.WARNING, "Unexpected exception. Restarting session and
consumer", e);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        // Ignore
+                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/cxf/blob/5e3ac2b2/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index b34f53d..3030f9e 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -32,7 +32,6 @@ import javax.xml.namespace.QName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.pool.PooledConnectionFactory;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -57,6 +56,7 @@ public abstract class AbstractJMSTester extends Assert {
     protected static final int MAX_RECEIVE_TIME = 10;
     protected static final String MESSAGE_CONTENT = "HelloWorld";
     protected static Bus bus;
+    protected static ActiveMQConnectionFactory cf1;
     protected static ConnectionFactory cf;
     protected static BrokerService broker;
 
@@ -78,8 +78,8 @@ public abstract class AbstractJMSTester extends Assert {
         broker.addConnector(brokerUri);
         broker.start();
         bus = BusFactory.getDefaultBus();
-        ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory(brokerUri);
-        cf = new PooledConnectionFactory(cf1);
+        cf1 = new ActiveMQConnectionFactory(brokerUri);
+        cf = cf1;
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/cxf/blob/5e3ac2b2/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 690c721..a4e3980 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -25,8 +25,10 @@ import java.io.InputStream;
 import java.io.Reader;
 import java.io.StringReader;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.InvalidClientIDException;
 import javax.jms.JMSException;
 import javax.jms.Queue;
 import javax.jms.Topic;
@@ -41,6 +43,7 @@ import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.MultiplexDestination;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -76,6 +79,29 @@ public class JMSDestinationTest extends AbstractJMSTester {
         conduit.close();
         destination.shutdown();
     }
+    
+    @Test(expected = InvalidClientIDException.class)
+    public void testDurableInvalidClientId() throws Throwable {
+        Connection con = cf1.createConnection();
+        JMSDestination destination = null;
+        try {
+            con.setClientID("testClient");
+            con.start();
+            destMessage = null;
+            EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
+            JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei,
null);
+            jmsConfig.setDurableSubscriptionClientId("testClient");
+            jmsConfig.setDurableSubscriptionName("testsub");
+            jmsConfig.setConnectionFactory(cf);
+            destination = new JMSDestination(bus, ei, jmsConfig);
+            destination.setMessageObserver(createMessageObserver());
+        } catch (RuntimeException e) {
+            throw e.getCause();
+        } finally {
+            ResourceCloser.close(con);
+            destination.shutdown();
+        }
+    }
 
     @Test
     public void testOneWayDestination() throws Exception {


Mime
View raw message