Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 18478 invoked from network); 16 Mar 2011 22:48:57 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 16 Mar 2011 22:48:57 -0000 Received: (qmail 87859 invoked by uid 500); 16 Mar 2011 22:48:57 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 87831 invoked by uid 500); 16 Mar 2011 22:48:57 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 87824 invoked by uid 99); 16 Mar 2011 22:48:57 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2011 22:48:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,NORMAL_HTTP_TO_IP,WEIRD_PORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Mar 2011 22:48:51 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 74233238899C; Wed, 16 Mar 2011 22:48:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1082333 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/thread/ main/java/org/apache/activemq/transport/discovery/ main/java/org/apache/activemq/transport/failover/ main/java/org... Date: Wed, 16 Mar 2011 22:48:27 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110316224827.74233238899C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Wed Mar 16 22:48:26 2011 New Revision: 1082333 URL: http://svn.apache.org/viewvc?rev=1082333&view=rev Log: https://issues.apache.org/jira/browse/AMQ-3222 - Failover and SimpleDiscovery - query parameters getting dropped, resolve by leaving composite prams in place and seperating out params that need to be applied to discovered transports, new format 'discovered.x' for the mulitcast case. Revisit https://issues.apache.org/jira/browse/AMQ-2981,https://issues.apache.org/jira/browse/AMQ-2598,https://issues.apache.org/activemq/browse/AMQ-2939 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Wed Mar 16 22:48:26 2011 @@ -100,8 +100,7 @@ public class DiscoveryNetworkConnector e } URI connectUri = uri; try { - connectUri = URISupport.removeQuery(connectUri); - connectUri = URISupport.applyParameters(connectUri, parameters); + connectUri = URISupport.applyParameters(connectUri, parameters, DISCOVERED_OPTION_PREFIX); } catch (URISyntaxException e) { LOG.warn("could not apply query parameters: " + parameters + " to: " + connectUri, e); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java Wed Mar 16 22:48:26 2011 @@ -22,12 +22,13 @@ import java.util.concurrent.SynchronousQ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * Manages the thread pool for long running tasks. Long running tasks are not * always active but when they are active, they may need a few iterations of * processing for them to become idle. The manager ensures that each task is - * processes but that no one task overtakes the system. This is kina like + * processes but that no one task overtakes the system. This is kinda like * cooperative multitasking. * * @@ -39,6 +40,7 @@ public class TaskRunnerFactory implement private String name; private int priority; private boolean daemon; + private AtomicLong id = new AtomicLong(0); public TaskRunnerFactory() { this("ActiveMQ Task", Thread.NORM_PRIORITY, true, 1000); @@ -89,14 +91,14 @@ public class TaskRunnerFactory implement if (executor != null) { executor.execute(runnable); } else { - new Thread(runnable, name).start(); + new Thread(runnable, name + "-" + id.incrementAndGet()).start(); } } protected ExecutorService createDefaultExecutor() { ThreadPoolExecutor rc = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, name); + Thread thread = new Thread(runnable, name + "-" + id.incrementAndGet()); thread.setDaemon(daemon); thread.setPriority(priority); return thread; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryListener.java Wed Mar 16 22:48:26 2011 @@ -24,6 +24,7 @@ import org.apache.activemq.command.Disco * */ public interface DiscoveryListener { + public static final String DISCOVERED_OPTION_PREFIX = "discovered."; void onServiceAdd(DiscoveryEvent event); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Wed Mar 16 22:48:26 2011 @@ -75,7 +75,7 @@ public class DiscoveryTransport extends try { URI uri = new URI(url); LOG.info("Adding new broker connection URL: " + uri); - uri = URISupport.applyParameters(uri, parameters); + uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX); serviceURIs.put(event.getServiceName(), uri); next.add(false,new URI[] {uri}); } catch (URISyntaxException e) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Mar 16 22:48:26 2011 @@ -141,7 +141,7 @@ public class FailoverTransport implement buildBackups(); } else { // build backups on the next iteration - result = true; + buildBackup = true; try { reconnectTask.wakeup(); } catch (InterruptedException e) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java Wed Mar 16 22:48:26 2011 @@ -131,24 +131,29 @@ public class URISupport { CompositeData data = URISupport.parseComposite(uri); Map parameters = new HashMap(); parameters.putAll(data.getParameters()); - for (URI component : data.getComponents()) { - parameters.putAll(component.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(component.getQuery(), "?"))); - } - if (parameters.isEmpty()) + if (parameters.isEmpty()) { parameters = emptyMap(); + } return parameters; } } public static URI applyParameters(URI uri, Map queryParameters) throws URISyntaxException { + return applyParameters(uri, queryParameters, ""); + } + + public static URI applyParameters(URI uri, Map queryParameters, String optionPrefix) throws URISyntaxException { if (queryParameters != null && !queryParameters.isEmpty()) { StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer() ; for ( Map.Entry param: queryParameters.entrySet()) { - if (newQuery.length()!=0) { - newQuery.append('&'); + if (param.getKey().startsWith(optionPrefix)) { + if (newQuery.length()!=0) { + newQuery.append('&'); + } + final String key = param.getKey().substring(optionPrefix.length()); + newQuery.append(key).append('=').append(param.getValue()); } - newQuery.append(param.getKey()).append('=').append(param.getValue()); } uri = createURIWithQuery(uri, newQuery.toString()); } @@ -219,7 +224,6 @@ public class URISupport { * @param uri * @param rc * @param ssp - * @param p * @throws URISyntaxException */ private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException { @@ -269,7 +273,7 @@ public class URISupport { } /** - * @param componentString + * @param str * @return */ private static String[] splitComponents(String str) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/xbean/XBeanBrokerFactory.java Wed Mar 16 22:48:26 2011 @@ -60,14 +60,14 @@ public class XBeanBrokerFactory implemen } public BrokerService createBroker(URI config) throws Exception { - - Map map = URISupport.parseParameters(config); - if (!map.isEmpty()) { - IntrospectionSupport.setProperties(this, map); - config = URISupport.removeQuery(config); - } String uri = config.getSchemeSpecificPart(); + Map parameters = URISupport.parseQuery(uri); + if (!parameters.isEmpty()) { + IntrospectionSupport.setProperties(this, parameters); + uri = uri.substring(0, uri.lastIndexOf('?')); + } + ApplicationContext context = createApplicationContext(uri); BrokerService broker = null; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java Wed Mar 16 22:48:26 2011 @@ -16,6 +16,13 @@ */ package org.apache.activemq.network; +import java.net.URI; +import java.util.HashMap; +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + + import static org.junit.Assert.assertTrue; import javax.jms.Connection; @@ -32,6 +39,7 @@ import org.apache.activemq.broker.SslCon import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.transport.tcp.SslBrokerServiceTest; +import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.Wait; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +52,21 @@ public class FailoverStaticNetworkTest { private final static String DESTINATION_NAME = "testQ"; protected BrokerService brokerA; + protected BrokerService brokerA1; protected BrokerService brokerB; + protected BrokerService brokerC; private SslContext sslContext; - + protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception { + return createBroker(scheme, listenPort, networkToPorts, null); + } + + protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts, + HashMap networkProps) throws Exception { BrokerService broker = new BrokerService(); - broker.setUseJmx(true); + broker.setUseJmx(false); broker.getManagementContext().setCreateConnector(false); broker.setSslContext(sslContext); broker.setDeleteAllMessagesOnStartup(true); @@ -63,12 +78,30 @@ public class FailoverStaticNetworkTest { for (int i=1;i errors = new Vector(); + final String dataDir = "target/data/shared"; + brokerA = createBroker("61617", dataDir); + brokerA.start(); + + final BrokerService slave = createBroker("63617", dataDir); + brokerA1 = slave; + ExecutorService executor = Executors.newCachedThreadPool(); + executor.execute(new Runnable() { + public void run() { + try { + slave.start(); + } catch (Exception e) { + e.printStackTrace(); + errors.add(e); + } + } + }); + executor.shutdown(); + + HashMap networkConnectorProps = new HashMap(); + networkConnectorProps.put("duplex", "true"); + brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerB.start(); + + doTestNetworkSendReceive(brokerA, brokerB); + doTestNetworkSendReceive(brokerB, brokerA); + + LOG.info("stopping brokerA (master shared_broker)"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + // wait for slave to start + brokerA1.waitUntilStarted(); + + doTestNetworkSendReceive(brokerA1, brokerB); + doTestNetworkSendReceive(brokerB, brokerA1); + + assertTrue("No unexpected exceptions " + errors, errors.isEmpty()); + } + + @Test + // master slave piggy in the middle setup + public void testSendReceiveFailoverDuplexWithPIM() throws Exception { + final String dataDir = "target/data/shared/pim"; + brokerA = createBroker("61617", dataDir); + brokerA.start(); + + final BrokerService slave = createBroker("63617", dataDir); + brokerA1 = slave; + ExecutorService executor = Executors.newCachedThreadPool(); + executor.execute(new Runnable() { + public void run() { + try { + slave.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + executor.shutdown(); + + HashMap networkConnectorProps = new HashMap(); + networkConnectorProps.put("duplex", "true"); + networkConnectorProps.put("networkTTL", "2"); + + brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerB.start(); + + assertTrue("all props applied", networkConnectorProps.isEmpty()); + networkConnectorProps.put("duplex", "true"); + networkConnectorProps.put("networkTTL", "2"); + + brokerC = createBroker("tcp", "64617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerC.start(); + assertTrue("all props applied a second time", networkConnectorProps.isEmpty()); + + //Thread.sleep(4000); + doTestNetworkSendReceive(brokerC, brokerB); + doTestNetworkSendReceive(brokerB, brokerC); + + LOG.info("stopping brokerA (master shared_broker)"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + doTestNetworkSendReceive(brokerC, brokerB); + doTestNetworkSendReceive(brokerB, brokerC); + + brokerC.stop(); + brokerC.waitUntilStopped(); + } + /** * networked broker started after target so first connect attempt succeeds * start order is important @@ -150,20 +287,25 @@ public class FailoverStaticNetworkTest { } private void doTestNetworkSendReceive() throws Exception, JMSException { - LOG.info("Creating Consumer on the networked brokerA ..."); + doTestNetworkSendReceive(brokerB, brokerA); + } + + private void doTestNetworkSendReceive(BrokerService to, BrokerService from) throws Exception, JMSException { + + LOG.info("Creating Consumer on the networked broker ..." + from); SslContext.setCurrentSslContext(sslContext); // Create a consumer on brokerA - ConnectionFactory consFactory = createConnectionFactory(brokerA); + ConnectionFactory consFactory = createConnectionFactory(from); Connection consConn = consFactory.createConnection(); consConn.start(); Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME); final MessageConsumer consumer = consSession.createConsumer(destination); - LOG.info("publishing to brokerB"); + LOG.info("publishing to " + to); - sendMessageTo(destination, brokerB); + sendMessageTo(destination, to); boolean gotMessage = Wait.waitFor(new Wait.Condition() { public boolean isSatisified() throws Exception { Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java Wed Mar 16 22:48:26 2011 @@ -105,7 +105,7 @@ public class DiscoveryTransportNoBrokerT String groupId = "WillNotMatch" + startT; try { String urlStr = "discovery:(multicast://default?group=" + groupId + - ")?useExponentialBackOff=false&maxReconnectAttempts=2&reconnectDelay=" + initialReconnectDelay; + ")?useExponentialBackOff=false&maxReconnectAttempts=2&reconnectDelay=" + initialReconnectDelay; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlStr); LOG.info("Connecting."); Connection connection = factory.createConnection(); @@ -121,7 +121,8 @@ public class DiscoveryTransportNoBrokerT public void testSetDiscoveredBrokerProperties() throws Exception { final String extraParameterName = "connectionTimeout"; final String extraParameterValue = "3000"; - final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + extraParameterName + "=" + extraParameterValue); + final URI uri = new URI("discovery:(multicast://default)?initialReconnectDelay=100&" + + DiscoveryListener.DISCOVERED_OPTION_PREFIX + extraParameterName + "=" + extraParameterValue); CompositeData compositeData = URISupport.parseComposite(uri); StubCompositeTransport compositeTransport = new StubCompositeTransport(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java?rev=1082333&r1=1082332&r2=1082333&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/URISupportTest.java Wed Mar 16 22:48:26 2011 @@ -18,6 +18,7 @@ package org.apache.activemq.util; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import java.util.Map; import junit.framework.TestCase; @@ -103,7 +104,7 @@ public class URISupportTest extends Test } public void testParsingParams() throws Exception { - URI uri = new URI("static:(http://localhost:61617?proxyHost=localhost&proxyPort=80)"); + URI uri = new URI("static:(http://localhost:61617?proxyHost=jo&proxyPort=90)?proxyHost=localhost&proxyPort=80"); Mapparameters = URISupport.parseParameters(uri); verifyParams(parameters); uri = new URI("static://http://localhost:61617?proxyHost=localhost&proxyPort=80"); @@ -134,6 +135,28 @@ public class URISupportTest extends Test assertEquals(querylessURI, URISupport.createURIWithQuery(originalURI, "")); assertEquals(new URI(querylessURI + "?" + queryString), URISupport.createURIWithQuery(originalURI, queryString)); } + + public void testApplyParameters() throws Exception { + + URI uri = new URI("http://0.0.0.0:61616"); + Map parameters = new HashMap(); + parameters.put("t.proxyHost", "localhost"); + parameters.put("t.proxyPort", "80"); + + uri = URISupport.applyParameters(uri, parameters); + Map appliedParameters = URISupport.parseParameters(uri); + assertEquals("all params applied with no prefix", 2, appliedParameters.size()); + + // strip off params again + uri = URISupport.createURIWithQuery(uri, null); + + uri = URISupport.applyParameters(uri, parameters, "joe"); + appliedParameters = URISupport.parseParameters(uri); + assertTrue("no params applied as none match joe", appliedParameters.isEmpty()); + + uri = URISupport.applyParameters(uri, parameters, "t."); + verifyParams(URISupport.parseParameters(uri)); + } private void verifyParams(Map parameters) { assertEquals(parameters.get("proxyHost"), "localhost");