activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r950515 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/network/ activemq-spring/src/test/java/org/apache/bugs/ activemq-spring/src/test/resources/ activemq-spring/src/test/resources/org/ activemq-spring/src/test/resou...
Date Wed, 02 Jun 2010 12:08:59 GMT
Author: gtully
Date: Wed Jun  2 12:08:58 2010
New Revision: 950515

URL: http://svn.apache.org/viewvc?rev=950515&view=rev
Log:
add test to veriry loadbalance across network, effect of prefetch on vm client vs nework consumer,
allowed prefetch to be configured via xml property (ie string in schema) for xml config

Added:
    activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java   (with
props)
    activemq/trunk/activemq-spring/src/test/resources/log4j.properties   (with props)
    activemq/trunk/activemq-spring/src/test/resources/org/
    activemq/trunk/activemq-spring/src/test/resources/org/apache/
    activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/
    activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=950515&r1=950514&r2=950515&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
Wed Jun  2 12:08:58 2010
@@ -185,6 +185,7 @@ public class NetworkBridgeConfiguration 
 
     /**
      * @param prefetchSize the prefetchSize to set
+     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
      */
     public void setPrefetchSize(int prefetchSize) {
         this.prefetchSize = prefetchSize;

Added: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java?rev=950515&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java (added)
+++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java Wed
Jun  2 12:08:58 2010
@@ -0,0 +1,304 @@
+package org.apache.bugs;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.connection.SingleConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
+public class LoadBalanceTest {
+    private static final Log LOG = LogFactory.getLog(LoadBalanceTest.class);
+    private static final String TESTING_QUEUE = "testingqueue";
+    private static int networkBridgePrefetch = 1000;
+
+    @Test
+    public void does_load_balance_between_consumers() throws Exception {
+        BrokerService brokerService1 = null;
+        BrokerService brokerService2 = null;
+        final int total = 100;
+        final AtomicInteger broker1Count = new AtomicInteger(0);
+        final AtomicInteger broker2Count = new AtomicInteger(0);
+        try {
+            {
+                brokerService1 = new BrokerService();
+                brokerService1.setBrokerName("one");
+                brokerService1.setUseJmx(false);
+                brokerService1
+                        .setPersistenceAdapter(new MemoryPersistenceAdapter());
+                brokerService1.addConnector("nio://0.0.0.0:61616");
+                final NetworkConnector network1 = brokerService1
+                        .addNetworkConnector("static:(tcp://localhost:51515)");
+                network1.setName("network1");
+                network1.setDynamicOnly(true);
+                network1.setNetworkTTL(3);
+                network1.setPrefetchSize(networkBridgePrefetch);
+                network1.setConduitSubscriptions(false);
+                network1.setDecreaseNetworkConsumerPriority(false);
+                brokerService1.start();
+            }
+            {
+                brokerService2 = new BrokerService();
+                brokerService2.setBrokerName("two");
+                brokerService2.setUseJmx(false);
+                brokerService2
+                        .setPersistenceAdapter(new MemoryPersistenceAdapter());
+                brokerService2.addConnector("nio://0.0.0.0:51515");
+                final NetworkConnector network2 = brokerService2
+                        .addNetworkConnector("static:(tcp://localhost:61616)");
+                network2.setName("network1");
+                network2.setDynamicOnly(true);
+                network2.setNetworkTTL(3);
+                network2.setPrefetchSize(networkBridgePrefetch);
+                network2.setConduitSubscriptions(false);
+                network2.setDecreaseNetworkConsumerPriority(false);
+                brokerService2.start();
+            }
+            final ExecutorService pool = Executors.newSingleThreadExecutor();
+            final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
+                    "vm://one");
+            final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
+                    connectionFactory1);
+            singleConnectionFactory1.setReconnectOnException(true);
+            final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
+            container1.setConnectionFactory(singleConnectionFactory1);
+            container1.setMaxConcurrentConsumers(1);
+            container1.setDestination(new ActiveMQQueue("testingqueue"));
+            container1.setMessageListener(new MessageListener() {
+
+                public void onMessage(final Message message) {
+                    broker1Count.incrementAndGet();
+                }
+            });
+            container1.afterPropertiesSet();
+            container1.start();
+            pool.submit(new Callable<Object>() {
+
+                public Object call() throws Exception {
+                    try {
+                        final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
+                                "vm://two");
+                        final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory(
+                                connectionFactory2);
+                        singleConnectionFactory2.setReconnectOnException(true);
+                        final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer();
+                        container2
+                                .setConnectionFactory(singleConnectionFactory2);
+                        container2.setMaxConcurrentConsumers(1);
+                        container2.setDestination(new ActiveMQQueue(
+                                "testingqueue"));
+                        container2.setMessageListener(new MessageListener() {
+
+                            public void onMessage(final Message message) {
+                                broker2Count.incrementAndGet();
+                            }
+                        });
+                        container2.afterPropertiesSet();
+                        container2.start();
+                        final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
+                                singleConnectionFactory2);
+                        final JmsTemplate template = new JmsTemplate(
+                                cachingConnectionFactory);
+                        final ActiveMQQueue queue = new ActiveMQQueue(
+                                "testingqueue");
+                        for (int i = 0; i < total; i++) {
+                            template.send(queue, new MessageCreator() {
+
+                                public Message createMessage(
+                                        final Session session)
+                                        throws JMSException {
+                                    final TextMessage message = session
+                                            .createTextMessage();
+                                    message.setText("Hello World!");
+                                    return message;
+                                }
+                            });
+                        }
+                        // give spring time to scale back again
+                        while (container2.getActiveConsumerCount() > 1) {
+                            System.out.println("active consumer count: "
+                                    + container2.getActiveConsumerCount());
+                            System.out.println("concurrent consumer count: "
+                                    + container2.getConcurrentConsumers());
+                            Thread.sleep(1000);
+                        }
+                        cachingConnectionFactory.destroy();
+                        container2.destroy();
+                    } catch (final Throwable t) {
+                        t.printStackTrace();
+                    }
+                    return null;
+                }
+            });
+            pool.shutdown();
+            pool.awaitTermination(10, TimeUnit.SECONDS);
+            LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
+
+            int count = 0;
+            // give it 10 seconds
+            while (count++ < 10
+                    && broker1Count.get() + broker2Count.get() != total) {
+                LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
+                Thread.sleep(1000);
+            }
+            container1.destroy();
+        } finally {
+            try {
+                if (brokerService1 != null) {
+                    brokerService1.stop();
+                }
+            } catch (final Throwable t) {
+                t.printStackTrace();
+            }
+            try {
+                if (brokerService2 != null) {
+                    brokerService2.stop();
+                }
+            } catch (final Throwable t) {
+                t.printStackTrace();
+            }
+        }
+        
+        if (broker1Count.get() < 25 || broker2Count.get() < 25) {
+            fail("Each broker should have gotten at least 25 messages but instead broker1
got "
+                    + broker1Count.get()
+                    + " and broker2 got "
+                    + broker2Count.get());
+        }
+    }
+
+    @Test
+    public void does_xml_multicast_load_balance_between_consumers() throws Exception {
+        final int total = 100;
+        final AtomicInteger broker1Count = new AtomicInteger(0);
+        final AtomicInteger broker2Count = new AtomicInteger(0);
+        final ExecutorService pool = Executors.newSingleThreadExecutor();
+        final CountDownLatch startProducer = new CountDownLatch(1);
+        final String xmlConfig = getClass().getPackage().getName().replace('.','/') + "/loadbalancetest.xml";
+        System.setProperty("lbt.networkBridgePrefetch", String.valueOf(networkBridgePrefetch));
+        System.setProperty("lbt.brokerName", "one");
+        final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
+                "vm://one?brokerConfig=xbean:" + xmlConfig);
+        final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
+                connectionFactory1);
+        singleConnectionFactory1.setReconnectOnException(true);
+        final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
+        container1.setConnectionFactory(singleConnectionFactory1);
+        container1.setMaxConcurrentConsumers(1);
+        container1.setDestination(new ActiveMQQueue(TESTING_QUEUE));
+        container1.setMessageListener(new MessageListener() {
+
+            public void onMessage(final Message message) {
+                broker1Count.incrementAndGet();
+            }
+        });
+        container1.afterPropertiesSet();
+        container1.start();
+        pool.submit(new Callable<Object>() {
+
+            public Object call() throws Exception {
+                System.setProperty("lbt.brokerName", "two");
+                final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
+                        "vm://two?brokerConfig=xbean:" + xmlConfig);
+                final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory(
+                        connectionFactory2);
+                singleConnectionFactory2.setReconnectOnException(true);
+                final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer();
+                container2.setConnectionFactory(singleConnectionFactory2);
+                container2.setMaxConcurrentConsumers(1);
+                container2.setDestination(new ActiveMQQueue(TESTING_QUEUE));
+                container2.setMessageListener(new MessageListener() {
+
+                    public void onMessage(final Message message) {
+                        broker2Count.incrementAndGet();
+                    }
+                });
+                container2.afterPropertiesSet();
+                container2.start();
+                
+                
+                assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS));
+                
+                final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
+                        singleConnectionFactory2);
+                final JmsTemplate template = new JmsTemplate(
+                        cachingConnectionFactory);
+                final ActiveMQQueue queue = new ActiveMQQueue(TESTING_QUEUE);
+                for (int i = 0; i < total; i++) {
+                    template.send(queue, new MessageCreator() {
+
+                        public Message createMessage(final Session session)
+                                throws JMSException {
+                            final TextMessage message = session
+                                    .createTextMessage();
+                            message.setText("Hello World!");
+                            return message;
+                        }
+                    });
+                }
+                return null;
+            }
+        });
+        
+        // give network a chance to build, needs advisories
+        waitForBridgeFormation(10000);
+        startProducer.countDown();
+        
+        pool.shutdown();
+        pool.awaitTermination(10, TimeUnit.SECONDS);
+        
+        LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
+
+        int count = 0;
+        // give it 10 seconds
+        while (count++ < 10 && broker1Count.get() + broker2Count.get() != total)
{
+            LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
+            Thread.sleep(1000);
+        }
+        if (broker1Count.get() < 25 || broker2Count.get() < 25) {
+            fail("Each broker should have gotten at least 25 messages but instead broker1
got "
+                    + broker1Count.get()
+                    + " and broker2 got "
+                    + broker2Count.get());
+        }
+    }
+
+    // need to ensure broker bridge is alive before starting the consumer
+    // peeking at the internals will give us this info
+    private void waitForBridgeFormation(long delay) throws Exception {
+        long done = System.currentTimeMillis() + delay;
+        while (done > System.currentTimeMillis()) {
+            BrokerService broker = BrokerRegistry.getInstance().lookup("two");
+            if (broker != null && !broker.getNetworkConnectors().isEmpty()) {
+                 if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+                     return;
+                 }
+            }
+            Thread.sleep(1000);
+        }
+    }
+}

Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-spring/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/log4j.properties?rev=950515&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/resources/log4j.properties (added)
+++ activemq/trunk/activemq-spring/src/test/resources/log4j.properties Wed Jun  2 12:08:58
2010
@@ -0,0 +1,36 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+## 
+## http://www.apache.org/licenses/LICENSE-2.0
+## 
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=INFO, out, stdout
+
+#log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.activemq.broker.region=TRACE
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/activemq-test.log
+log4j.appender.out.append=true

Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:executable = *

Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml?rev=950515&view=auto
==============================================================================
--- activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml
(added)
+++ activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml
Wed Jun  2 12:08:58 2010
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+    xmlns:amq="http://activemq.apache.org/schema/core"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+	<!-- Allows us to use system properties as variables in this configuration file -->
+    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+       <property name="systemPropertiesModeName">
+         <value>SYSTEM_PROPERTIES_MODE_OVERRIDE</value>
+       </property>
+    </bean>   
+	
+	<!-- Configures a broker with the system's hostname as its name with jmx but no persistence
or adivsory support -->
+	<amq:broker brokerName="${lbt.brokerName}" useJmx="false" persistent="false" advisorySupport="true"
useLocalHostBrokerName="false">
+
+	    <amq:networkConnectors>
+            <amq:networkConnector conduitSubscriptions="false" decreaseNetworkConsumerPriority="false"
networkTTL="3" 
+                dynamicOnly="true" uri="multicast://239.255.2.25:6155" name="network" prefetchSize="${lbt.networkBridgePrefetch}">
+                <amq:excludedDestinations>
+	                <amq:topic physicalName=">"/>
+	            </amq:excludedDestinations>
+                <amq:staticallyIncludedDestinations>
+                    <amq:queue physicalName=">"/>
+                </amq:staticallyIncludedDestinations>
+            </amq:networkConnector>
+	    </amq:networkConnectors>
+
+	    <amq:transportConnectors>
+	       <amq:transportConnector name="transport" uri="nio://0.0.0.0:0" discoveryUri="multicast://239.255.2.25:6155"/>
+        </amq:transportConnectors>
+	</amq:broker>
+</beans>

Propchange: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml



Mime
View raw message