Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 20527 invoked from network); 2 Jun 2010 12:09:23 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 2 Jun 2010 12:09:23 -0000 Received: (qmail 3563 invoked by uid 500); 2 Jun 2010 12:09:23 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 3497 invoked by uid 500); 2 Jun 2010 12:09:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 3490 invoked by uid 99); 2 Jun 2010 12:09:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jun 2010 12:09:21 +0000 X-ASF-Spam-Status: No, hits=-1412.5 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Jun 2010 12:09:19 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7A3412388978; Wed, 2 Jun 2010 12:08:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r950515 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/network/ activemq-spring/src/test/java/org/apache/bugs/ activemq-spring/src/test/resources/ activemq-spring/src/test/resources/org/ activemq-spring/src/test/resou... Date: Wed, 02 Jun 2010 12:08:59 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100602120859.7A3412388978@eris.apache.org> Author: gtully Date: Wed Jun 2 12:08:58 2010 New Revision: 950515 URL: http://svn.apache.org/viewvc?rev=950515&view=rev Log: add test to veriry loadbalance across network, effect of prefetch on vm client vs nework consumer, allowed prefetch to be configured via xml property (ie string in schema) for xml config Added: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java (with props) activemq/trunk/activemq-spring/src/test/resources/log4j.properties (with props) activemq/trunk/activemq-spring/src/test/resources/org/ activemq/trunk/activemq-spring/src/test/resources/org/apache/ activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/ activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=950515&r1=950514&r2=950515&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Wed Jun 2 12:08:58 2010 @@ -185,6 +185,7 @@ public class NetworkBridgeConfiguration /** * @param prefetchSize the prefetchSize to set + * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" */ public void setPrefetchSize(int prefetchSize) { this.prefetchSize = prefetchSize; Added: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java?rev=950515&view=auto ============================================================================== --- activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java (added) +++ activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java Wed Jun 2 12:08:58 2010 @@ -0,0 +1,304 @@ +package org.apache.bugs; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.connection.SingleConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.listener.DefaultMessageListenerContainer; + +public class LoadBalanceTest { + private static final Log LOG = LogFactory.getLog(LoadBalanceTest.class); + private static final String TESTING_QUEUE = "testingqueue"; + private static int networkBridgePrefetch = 1000; + + @Test + public void does_load_balance_between_consumers() throws Exception { + BrokerService brokerService1 = null; + BrokerService brokerService2 = null; + final int total = 100; + final AtomicInteger broker1Count = new AtomicInteger(0); + final AtomicInteger broker2Count = new AtomicInteger(0); + try { + { + brokerService1 = new BrokerService(); + brokerService1.setBrokerName("one"); + brokerService1.setUseJmx(false); + brokerService1 + .setPersistenceAdapter(new MemoryPersistenceAdapter()); + brokerService1.addConnector("nio://0.0.0.0:61616"); + final NetworkConnector network1 = brokerService1 + .addNetworkConnector("static:(tcp://localhost:51515)"); + network1.setName("network1"); + network1.setDynamicOnly(true); + network1.setNetworkTTL(3); + network1.setPrefetchSize(networkBridgePrefetch); + network1.setConduitSubscriptions(false); + network1.setDecreaseNetworkConsumerPriority(false); + brokerService1.start(); + } + { + brokerService2 = new BrokerService(); + brokerService2.setBrokerName("two"); + brokerService2.setUseJmx(false); + brokerService2 + .setPersistenceAdapter(new MemoryPersistenceAdapter()); + brokerService2.addConnector("nio://0.0.0.0:51515"); + final NetworkConnector network2 = brokerService2 + .addNetworkConnector("static:(tcp://localhost:61616)"); + network2.setName("network1"); + network2.setDynamicOnly(true); + network2.setNetworkTTL(3); + network2.setPrefetchSize(networkBridgePrefetch); + network2.setConduitSubscriptions(false); + network2.setDecreaseNetworkConsumerPriority(false); + brokerService2.start(); + } + final ExecutorService pool = Executors.newSingleThreadExecutor(); + final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory( + "vm://one"); + final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory( + connectionFactory1); + singleConnectionFactory1.setReconnectOnException(true); + final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer(); + container1.setConnectionFactory(singleConnectionFactory1); + container1.setMaxConcurrentConsumers(1); + container1.setDestination(new ActiveMQQueue("testingqueue")); + container1.setMessageListener(new MessageListener() { + + public void onMessage(final Message message) { + broker1Count.incrementAndGet(); + } + }); + container1.afterPropertiesSet(); + container1.start(); + pool.submit(new Callable() { + + public Object call() throws Exception { + try { + final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory( + "vm://two"); + final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory( + connectionFactory2); + singleConnectionFactory2.setReconnectOnException(true); + final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer(); + container2 + .setConnectionFactory(singleConnectionFactory2); + container2.setMaxConcurrentConsumers(1); + container2.setDestination(new ActiveMQQueue( + "testingqueue")); + container2.setMessageListener(new MessageListener() { + + public void onMessage(final Message message) { + broker2Count.incrementAndGet(); + } + }); + container2.afterPropertiesSet(); + container2.start(); + final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory( + singleConnectionFactory2); + final JmsTemplate template = new JmsTemplate( + cachingConnectionFactory); + final ActiveMQQueue queue = new ActiveMQQueue( + "testingqueue"); + for (int i = 0; i < total; i++) { + template.send(queue, new MessageCreator() { + + public Message createMessage( + final Session session) + throws JMSException { + final TextMessage message = session + .createTextMessage(); + message.setText("Hello World!"); + return message; + } + }); + } + // give spring time to scale back again + while (container2.getActiveConsumerCount() > 1) { + System.out.println("active consumer count: " + + container2.getActiveConsumerCount()); + System.out.println("concurrent consumer count: " + + container2.getConcurrentConsumers()); + Thread.sleep(1000); + } + cachingConnectionFactory.destroy(); + container2.destroy(); + } catch (final Throwable t) { + t.printStackTrace(); + } + return null; + } + }); + pool.shutdown(); + pool.awaitTermination(10, TimeUnit.SECONDS); + LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get()); + + int count = 0; + // give it 10 seconds + while (count++ < 10 + && broker1Count.get() + broker2Count.get() != total) { + LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get()); + Thread.sleep(1000); + } + container1.destroy(); + } finally { + try { + if (brokerService1 != null) { + brokerService1.stop(); + } + } catch (final Throwable t) { + t.printStackTrace(); + } + try { + if (brokerService2 != null) { + brokerService2.stop(); + } + } catch (final Throwable t) { + t.printStackTrace(); + } + } + + if (broker1Count.get() < 25 || broker2Count.get() < 25) { + fail("Each broker should have gotten at least 25 messages but instead broker1 got " + + broker1Count.get() + + " and broker2 got " + + broker2Count.get()); + } + } + + @Test + public void does_xml_multicast_load_balance_between_consumers() throws Exception { + final int total = 100; + final AtomicInteger broker1Count = new AtomicInteger(0); + final AtomicInteger broker2Count = new AtomicInteger(0); + final ExecutorService pool = Executors.newSingleThreadExecutor(); + final CountDownLatch startProducer = new CountDownLatch(1); + final String xmlConfig = getClass().getPackage().getName().replace('.','/') + "/loadbalancetest.xml"; + System.setProperty("lbt.networkBridgePrefetch", String.valueOf(networkBridgePrefetch)); + System.setProperty("lbt.brokerName", "one"); + final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory( + "vm://one?brokerConfig=xbean:" + xmlConfig); + final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory( + connectionFactory1); + singleConnectionFactory1.setReconnectOnException(true); + final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer(); + container1.setConnectionFactory(singleConnectionFactory1); + container1.setMaxConcurrentConsumers(1); + container1.setDestination(new ActiveMQQueue(TESTING_QUEUE)); + container1.setMessageListener(new MessageListener() { + + public void onMessage(final Message message) { + broker1Count.incrementAndGet(); + } + }); + container1.afterPropertiesSet(); + container1.start(); + pool.submit(new Callable() { + + public Object call() throws Exception { + System.setProperty("lbt.brokerName", "two"); + final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory( + "vm://two?brokerConfig=xbean:" + xmlConfig); + final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory( + connectionFactory2); + singleConnectionFactory2.setReconnectOnException(true); + final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer(); + container2.setConnectionFactory(singleConnectionFactory2); + container2.setMaxConcurrentConsumers(1); + container2.setDestination(new ActiveMQQueue(TESTING_QUEUE)); + container2.setMessageListener(new MessageListener() { + + public void onMessage(final Message message) { + broker2Count.incrementAndGet(); + } + }); + container2.afterPropertiesSet(); + container2.start(); + + + assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS)); + + final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory( + singleConnectionFactory2); + final JmsTemplate template = new JmsTemplate( + cachingConnectionFactory); + final ActiveMQQueue queue = new ActiveMQQueue(TESTING_QUEUE); + for (int i = 0; i < total; i++) { + template.send(queue, new MessageCreator() { + + public Message createMessage(final Session session) + throws JMSException { + final TextMessage message = session + .createTextMessage(); + message.setText("Hello World!"); + return message; + } + }); + } + return null; + } + }); + + // give network a chance to build, needs advisories + waitForBridgeFormation(10000); + startProducer.countDown(); + + pool.shutdown(); + pool.awaitTermination(10, TimeUnit.SECONDS); + + LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get()); + + int count = 0; + // give it 10 seconds + while (count++ < 10 && broker1Count.get() + broker2Count.get() != total) { + LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get()); + Thread.sleep(1000); + } + if (broker1Count.get() < 25 || broker2Count.get() < 25) { + fail("Each broker should have gotten at least 25 messages but instead broker1 got " + + broker1Count.get() + + " and broker2 got " + + broker2Count.get()); + } + } + + // need to ensure broker bridge is alive before starting the consumer + // peeking at the internals will give us this info + private void waitForBridgeFormation(long delay) throws Exception { + long done = System.currentTimeMillis() + delay; + while (done > System.currentTimeMillis()) { + BrokerService broker = BrokerRegistry.getInstance().lookup("two"); + if (broker != null && !broker.getNetworkConnectors().isEmpty()) { + if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) { + return; + } + } + Thread.sleep(1000); + } + } +} Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-spring/src/test/java/org/apache/bugs/LoadBalanceTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-spring/src/test/resources/log4j.properties URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/log4j.properties?rev=950515&view=auto ============================================================================== --- activemq/trunk/activemq-spring/src/test/resources/log4j.properties (added) +++ activemq/trunk/activemq-spring/src/test/resources/log4j.properties Wed Jun 2 12:08:58 2010 @@ -0,0 +1,36 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# +# The logging properties used during tests.. +# +log4j.rootLogger=INFO, out, stdout + +#log4j.logger.org.apache.activemq=DEBUG +#log4j.logger.org.apache.activemq.broker.region=TRACE + +# CONSOLE appender not used by default +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + +# File appender +log4j.appender.out=org.apache.log4j.FileAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.out.file=target/activemq-test.log +log4j.appender.out.append=true Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties ------------------------------------------------------------------------------ svn:executable = * Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties ------------------------------------------------------------------------------ svn:keywords = Rev Date Propchange: activemq/trunk/activemq-spring/src/test/resources/log4j.properties ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml?rev=950515&view=auto ============================================================================== --- activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml (added) +++ activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml Wed Jun 2 12:08:58 2010 @@ -0,0 +1,34 @@ + + + + + + + SYSTEM_PROPERTIES_MODE_OVERRIDE + + + + + + + + + + + + + + + + + + + + + + Propchange: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml ------------------------------------------------------------------------------ svn:keywords = Rev Date Propchange: activemq/trunk/activemq-spring/src/test/resources/org/apache/bugs/loadbalancetest.xml ------------------------------------------------------------------------------ svn:mime-type = text/xml