activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r805361 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/network/ src/main/java/org/apache/activemq/transport/discovery/ src/main/java/org/apache/activemq/transport/discovery/multicast/ src/main/java/org/apache/activ...
Date Tue, 18 Aug 2009 11:13:09 GMT
Author: gtully
Date: Tue Aug 18 11:13:08 2009
New Revision: 805361

URL: http://svn.apache.org/viewvc?rev=805361&view=rev
Log:
apply parameters from discoveryURI to subsequent network connections so that options like
inactivityTimeout can be configured on all discovered uris

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Aug 18 11:13:08 2009
@@ -246,8 +246,12 @@
     </dependency>
     <dependency>
       <groupId>org.jmock</groupId>
-      <artifactId>jmock</artifactId>
-      <version>${jmock-version}</version>
+      <artifactId>jmock-junit4</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.jmock</groupId>
+      <artifactId>jmock-legacy</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Tue Aug 18 11:13:08 2009
@@ -20,6 +20,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.SslContext;
@@ -29,8 +30,10 @@
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -46,7 +49,8 @@
 
     private DiscoveryAgent discoveryAgent;
     private ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI,
NetworkBridge>();
-
+    private Map<String, String> parameters;
+    
     public DiscoveryNetworkConnector() {
     }
 
@@ -56,6 +60,14 @@
 
     public void setUri(URI discoveryURI) throws IOException {
         setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
+        try {
+            parameters = URISupport.parseParamters(discoveryURI);
+            // allow discovery agent to grab it's parameters
+            IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters);
+        } catch (URISyntaxException e) {
+            LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI,
e);
+        }  
+        
     }
 
     public void onServiceAdd(DiscoveryEvent event) {
@@ -83,6 +95,11 @@
                 return;
             }
             URI connectUri = uri;
+            try {
+                connectUri = URISupport.applyParameters(connectUri, parameters);
+            } catch (URISyntaxException e) {
+                LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri,
e);
+            }
             LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
 
             Transport remoteTransport;
@@ -93,7 +110,7 @@
                 try {
                     remoteTransport = TransportFactory.connect(connectUri);
                 } catch (Exception e) {
-                    LOG.warn("Could not connect to remote URI: " + localURIName + ": " +
e.getMessage());
+                    LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage());
                     LOG.debug("Connection failure exception: " + e, e);
                     return;
                 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
Tue Aug 18 11:13:08 2009
@@ -25,6 +25,7 @@
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -74,27 +75,13 @@
                 URI uri = new URI(url);
                 serviceURIs.put(event.getServiceName(), uri);
                 LOG.info("Adding new broker connection URL: " + uri);
-                next.add(new URI[] {applyParameters(uri)});
+                next.add(new URI[] {URISupport.applyParameters(uri, parameters)});
             } catch (URISyntaxException e) {
                 LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax:
" + e, e);
             }
         }
     }
 
-    private URI applyParameters(URI uri) throws URISyntaxException {
-        if (parameters != null && !parameters.isEmpty()) {
-            StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery())
: new StringBuffer() ;
-            for ( Map.Entry<String, String> param: parameters.entrySet()) {
-                if (newQuery.length()!=0) {
-                    newQuery.append(';');
-                }
-                newQuery.append(param.getKey()).append('=').append(param.getValue());
-            }
-            uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString(),
uri.getFragment());
-        }
-        return uri;
-}
-
     public void onServiceRemove(DiscoveryEvent event) {
         URI uri = serviceURIs.get(event.getServiceName());
         if (uri != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Tue Aug 18 11:13:08 2009
@@ -27,7 +27,6 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java Tue
Aug 18 11:13:08 2009
@@ -127,6 +127,20 @@
         return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(),
"?"));
     }
 
+    public static URI applyParameters(URI uri, Map<String, String> queryParameters)
throws URISyntaxException {
+        if (queryParameters != null && !queryParameters.isEmpty()) {
+            StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery())
: new StringBuffer() ;
+            for ( Map.Entry<String, String> param: queryParameters.entrySet()) {
+                if (newQuery.length()!=0) {
+                    newQuery.append('&');
+                }
+                newQuery.append(param.getKey()).append('=').append(param.getValue());
+            }
+            uri = createURIWithQuery(uri, newQuery.toString());
+        }
+        return uri;
+    }
+    
     @SuppressWarnings("unchecked")
     private static Map<String, String> emptyMap() {
         return Collections.EMPTY_MAP;

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=805361&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
Tue Aug 18 11:13:08 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
+import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+
+@RunWith(JMock.class)
+public class DiscoveryNetworkReconnectTest {
+
+    private static final Log LOG = LogFactory.getLog(DiscoveryNetworkReconnectTest.class);
+
+    BrokerService brokerA, brokerB;
+    Mockery context;
+    ManagementContext managementContext;
+    
+    final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
+    final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=600";
+
+    private DiscoveryAgent agent;
+
+    @Before
+    public void setUp() throws Exception {
+        context = new JUnit4Mockery() {{
+            setImposteriser(ClassImposteriser.INSTANCE);
+        }};
+            
+        brokerA = new BrokerService();
+        brokerA.setBrokerName("BrokerA");
+        configure(brokerA);
+        brokerA.addConnector("tcp://localhost:0");
+        brokerA.start();
+    }
+
+    private void configure(BrokerService broker) {
+        broker.setPersistent(false);
+        broker.setUseJmx(true);      
+    }
+    
+    @Test
+    public void testReconnect() throws Exception {
+        final SocketProxy proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
+        
+        // control multicast publish advertise agent to inject proxy
+        agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress));
+        agent.registerService(proxy.getUrl().toString());
+        agent.start();
+        
+        managementContext = context.mock(ManagementContext.class);
+        
+        context.checking(new Expectations(){{
+            allowing (managementContext).getJmxDomainName(); will (returnValue("Test"));
+            allowing (managementContext).start();
+            allowing (managementContext).stop();            
+            allowing (managementContext).unregisterMBean(with(any(ObjectName.class)));
+            
+            // expected MBeans
+            allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
+                    new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
+            allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
+                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost"))));
+            allowing (managementContext).registerMBean(with(any(Object.class)), with(equal(
           
+                    new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
+            
+            // due to reconnect we get two registrations
+            atLeast(2).of (managementContext).registerMBean(with(any(Object.class)), with(equal(
+                    new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"

+                            + proxy.getUrl().getPort()))));
+        }});
+
+        brokerB = new BrokerService();
+        brokerB.setManagementContext(managementContext);
+        brokerB.setBrokerName("BrokerNC");
+        configure(brokerB);       
+        brokerB.addNetworkConnector(discoveryAddress + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000&trace=true");
+        brokerB.start();
+
+        Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+               return proxy.connections.size() == 1;
+            }
+        });
+       
+        // force an inactivity timeout timeout
+        proxy.pause();
+        
+        // wait for the inactivity timeout
+        Thread.sleep(2000);
+        
+        // let a reconnect succeed
+        proxy.goOn();
+        
+        assertTrue("got a reconnect", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+               return proxy.connections.size() == 1;
+            }
+        }));
+        
+        brokerB.stop();
+        // let mockery validate minimal duplicate mbean registrations
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message