From commits-return-11574-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Tue Aug 18 11:14:20 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 52808 invoked from network); 18 Aug 2009 11:14:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Aug 2009 11:14:20 -0000 Received: (qmail 89849 invoked by uid 500); 18 Aug 2009 11:14:39 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 89793 invoked by uid 500); 18 Aug 2009 11:14:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 89784 invoked by uid 99); 18 Aug 2009 11:14:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Aug 2009 11:14:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Aug 2009 11:13:30 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 78D3E2388867; Tue, 18 Aug 2009 11:13:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r805361 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/network/ src/main/java/org/apache/activemq/transport/discovery/ src/main/java/org/apache/activemq/transport/discovery/multicast/ src/main/java/org/apache/activ... Date: Tue, 18 Aug 2009 11:13:09 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090818111309.78D3E2388867@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Tue Aug 18 11:13:08 2009 New Revision: 805361 URL: http://svn.apache.org/viewvc?rev=805361&view=rev Log: apply parameters from discoveryURI to subsequent network connections so that options like inactivityTimeout can be configured on all discovered uris Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java (with props) Modified: activemq/trunk/activemq-core/pom.xml activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java Modified: activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=805361&r1=805360&r2=805361&view=diff ============================================================================== --- activemq/trunk/activemq-core/pom.xml (original) +++ activemq/trunk/activemq-core/pom.xml Tue Aug 18 11:13:08 2009 @@ -246,8 +246,12 @@ org.jmock - jmock - ${jmock-version} + jmock-junit4 + test + + + org.jmock + jmock-legacy test Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=805361&r1=805360&r2=805361&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Tue Aug 18 11:13:08 2009 @@ -20,6 +20,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.SslContext; @@ -29,8 +30,10 @@ import org.apache.activemq.transport.discovery.DiscoveryAgent; import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; import org.apache.activemq.transport.discovery.DiscoveryListener; +import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceSupport; +import org.apache.activemq.util.URISupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,7 +49,8 @@ private DiscoveryAgent discoveryAgent; private ConcurrentHashMap bridges = new ConcurrentHashMap(); - + private Map parameters; + public DiscoveryNetworkConnector() { } @@ -56,6 +60,14 @@ public void setUri(URI discoveryURI) throws IOException { setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI)); + try { + parameters = URISupport.parseParamters(discoveryURI); + // allow discovery agent to grab it's parameters + IntrospectionSupport.setProperties(getDiscoveryAgent(), parameters); + } catch (URISyntaxException e) { + LOG.warn("failed to parse query parameters from discoveryURI: " + discoveryURI, e); + } + } public void onServiceAdd(DiscoveryEvent event) { @@ -83,6 +95,11 @@ return; } URI connectUri = uri; + try { + connectUri = URISupport.applyParameters(connectUri, parameters); + } catch (URISyntaxException e) { + LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); + } LOG.info("Establishing network connection from " + localURIName + " to " + connectUri); Transport remoteTransport; @@ -93,7 +110,7 @@ try { remoteTransport = TransportFactory.connect(connectUri); } catch (Exception e) { - LOG.warn("Could not connect to remote URI: " + localURIName + ": " + e.getMessage()); + LOG.warn("Could not connect to remote URI: " + connectUri + ": " + e.getMessage()); LOG.debug("Connection failure exception: " + e, e); return; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=805361&r1=805360&r2=805361&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Tue Aug 18 11:13:08 2009 @@ -25,6 +25,7 @@ import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.URISupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -74,27 +75,13 @@ URI uri = new URI(url); serviceURIs.put(event.getServiceName(), uri); LOG.info("Adding new broker connection URL: " + uri); - next.add(new URI[] {applyParameters(uri)}); + next.add(new URI[] {URISupport.applyParameters(uri, parameters)}); } catch (URISyntaxException e) { LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e); } } } - private URI applyParameters(URI uri) throws URISyntaxException { - if (parameters != null && !parameters.isEmpty()) { - StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ; - for ( Map.Entry param: parameters.entrySet()) { - if (newQuery.length()!=0) { - newQuery.append(';'); - } - newQuery.append(param.getKey()).append('=').append(param.getValue()); - } - uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), newQuery.toString(), uri.getFragment()); - } - return uri; -} - public void onServiceRemove(DiscoveryEvent event) { URI uri = serviceURIs.get(event.getServiceName()); if (uri != null) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=805361&r1=805360&r2=805361&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Tue Aug 18 11:13:08 2009 @@ -27,7 +27,6 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java?rev=805361&r1=805360&r2=805361&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java Tue Aug 18 11:13:08 2009 @@ -127,6 +127,20 @@ return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(), "?")); } + public static URI applyParameters(URI uri, Map queryParameters) throws URISyntaxException { + if (queryParameters != null && !queryParameters.isEmpty()) { + StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ; + for ( Map.Entry param: queryParameters.entrySet()) { + if (newQuery.length()!=0) { + newQuery.append('&'); + } + newQuery.append(param.getKey()).append('=').append(param.getValue()); + } + uri = createURIWithQuery(uri, newQuery.toString()); + } + return uri; + } + @SuppressWarnings("unchecked") private static Map emptyMap() { return Collections.EMPTY_MAP; Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=805361&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java Tue Aug 18 11:13:08 2009 @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.discovery; + +import static org.junit.Assert.*; + +import java.net.URI; + +import javax.management.ObjectName; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory; +import org.apache.activemq.util.SocketProxy; +import org.apache.activemq.util.Wait; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.jmock.integration.junit4.JMock; +import org.jmock.integration.junit4.JUnit4Mockery; +import org.jmock.lib.legacy.ClassImposteriser; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + + +@RunWith(JMock.class) +public class DiscoveryNetworkReconnectTest { + + private static final Log LOG = LogFactory.getLog(DiscoveryNetworkReconnectTest.class); + + BrokerService brokerA, brokerB; + Mockery context; + ManagementContext managementContext; + + final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest"; + final String discoveryAddress = "multicast://default?group=" + groupName + "&initialReconnectDelay=600"; + + private DiscoveryAgent agent; + + @Before + public void setUp() throws Exception { + context = new JUnit4Mockery() {{ + setImposteriser(ClassImposteriser.INSTANCE); + }}; + + brokerA = new BrokerService(); + brokerA.setBrokerName("BrokerA"); + configure(brokerA); + brokerA.addConnector("tcp://localhost:0"); + brokerA.start(); + } + + private void configure(BrokerService broker) { + broker.setPersistent(false); + broker.setUseJmx(true); + } + + @Test + public void testReconnect() throws Exception { + final SocketProxy proxy = new SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri()); + + // control multicast publish advertise agent to inject proxy + agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new URI(discoveryAddress)); + agent.registerService(proxy.getUrl().toString()); + agent.start(); + + managementContext = context.mock(ManagementContext.class); + + context.checking(new Expectations(){{ + allowing (managementContext).getJmxDomainName(); will (returnValue("Test")); + allowing (managementContext).start(); + allowing (managementContext).stop(); + allowing (managementContext).unregisterMBean(with(any(ObjectName.class))); + + // expected MBeans + allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( + new ObjectName("Test:BrokerName=BrokerNC,Type=Broker")))); + allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( + new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost")))); + allowing (managementContext).registerMBean(with(any(Object.class)), with(equal( + new ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection")))); + + // due to reconnect we get two registrations + atLeast(2).of (managementContext).registerMBean(with(any(Object.class)), with(equal( + new ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_" + + proxy.getUrl().getPort())))); + }}); + + brokerB = new BrokerService(); + brokerB.setManagementContext(managementContext); + brokerB.setBrokerName("BrokerNC"); + configure(brokerB); + brokerB.addNetworkConnector(discoveryAddress + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000&trace=true"); + brokerB.start(); + + Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return proxy.connections.size() == 1; + } + }); + + // force an inactivity timeout timeout + proxy.pause(); + + // wait for the inactivity timeout + Thread.sleep(2000); + + // let a reconnect succeed + proxy.goOn(); + + assertTrue("got a reconnect", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + return proxy.connections.size() == 1; + } + })); + + brokerB.stop(); + // let mockery validate minimal duplicate mbean registrations + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java ------------------------------------------------------------------------------ svn:executable = * Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date