From activemq-commits-return-4272-apmail-geronimo-activemq-commits-archive=geronimo.apache.org@geronimo.apache.org Tue Dec 26 16:15:09 2006 Return-Path: Delivered-To: apmail-geronimo-activemq-commits-archive@www.apache.org Received: (qmail 84498 invoked from network); 26 Dec 2006 16:15:08 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Dec 2006 16:15:08 -0000 Received: (qmail 58522 invoked by uid 500); 26 Dec 2006 16:15:15 -0000 Delivered-To: apmail-geronimo-activemq-commits-archive@geronimo.apache.org Received: (qmail 58500 invoked by uid 500); 26 Dec 2006 16:15:15 -0000 Mailing-List: contact activemq-commits-help@geronimo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: activemq-dev@geronimo.apache.org Delivered-To: mailing list activemq-commits@geronimo.apache.org Received: (qmail 58491 invoked by uid 99); 26 Dec 2006 16:15:15 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Dec 2006 08:15:15 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Dec 2006 08:15:07 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id 70A6D1A981C; Tue, 26 Dec 2006 08:14:15 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r490319 - in /incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases: AMQFailoverIssue.java AMQStackOverFlowTest.java Date: Tue, 26 Dec 2006 16:14:15 -0000 To: activemq-commits@geronimo.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061226161415.70A6D1A981C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Tue Dec 26 08:14:14 2006 New Revision: 490319 URL: http://svn.apache.org/viewvc?view=rev&rev=490319 Log: uses cases submitted by Brian Diesenhaus Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java (with props) incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java (with props) Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java?view=auto&rev=490319 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java Tue Dec 26 08:14:14 2006 @@ -0,0 +1,218 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.usecases; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import junit.framework.Assert; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.pool.PooledConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.listener.DefaultMessageListenerContainer; + +public class AMQFailoverIssue extends TestCase{ + + private static final String URL1="tcp://localhost:61616"; + private static final String QUEUE1_NAME="test.queue.1"; + private static final int MAX_CONSUMERS=10; + private static final int MAX_PRODUCERS=5; + private static final int NUM_MESSAGE_TO_SEND=10000; + private static final int TOTAL_MESSAGES=MAX_PRODUCERS * NUM_MESSAGE_TO_SEND; + private static final boolean USE_FAILOVER=true; + private AtomicInteger messageCount=new AtomicInteger(); + private CountDownLatch doneLatch; + + @Override public void setUp() throws Exception{ + } + + @Override public void tearDown() throws Exception{ + } + + // This should fail with incubator-activemq-fuse-4.1.0.5 + public void testFailoverIssue() throws Exception{ + BrokerService brokerService1=null; + ActiveMQConnectionFactory acf=null; + PooledConnectionFactory pcf=null; + DefaultMessageListenerContainer container1=null; + try{ + brokerService1=createBrokerService("broker1",URL1,null); + brokerService1.start(); + acf=createConnectionFactory(URL1,USE_FAILOVER); + pcf=new PooledConnectionFactory(acf); + // Only listen on the first queue.. let the 2nd queue fill up. + doneLatch=new CountDownLatch(TOTAL_MESSAGES); + container1=createDefaultMessageListenerContainer(acf,new TestMessageListener1(0),QUEUE1_NAME); + container1.afterPropertiesSet(); + Thread.sleep(5000); + final ExecutorService executor=Executors.newCachedThreadPool(); + for(int i=0;i policyEntries=new ArrayList(); + final PolicyEntry entry=new PolicyEntry(); + entry.setQueue(">"); + // entry.setQueue(QUEUE1_NAME); + entry.setMemoryLimit(1); + policyEntries.add(entry); + final PolicyMap policyMap=new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + brokerService.setDestinationPolicy(policyMap); + final TransportConnector tConnector=new TransportConnector(); + tConnector.setUri(new URI(uri1)); + tConnector.setBrokerName(brokerName); + tConnector.setName(brokerName+".transportConnector"); + brokerService.addConnector(tConnector); + if(uri2!=null){ + final NetworkConnector nc=new DiscoveryNetworkConnector(new URI("static:"+uri2)); + nc.setBridgeTempDestinations(true); + nc.setBrokerName(brokerName); + nc.setName(brokerName+".nc"); + nc.setPrefetchSize(1); + brokerService.addNetworkConnector(nc); + } + return brokerService; + } + + public DefaultMessageListenerContainer createDefaultMessageListenerContainer(final ConnectionFactory acf, + final MessageListener listener,final String queue){ + final DefaultMessageListenerContainer container=new DefaultMessageListenerContainer(); + container.setConnectionFactory(acf); + container.setDestinationName(queue); + container.setMessageListener(listener); + container.setSessionTransacted(false); + container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); + container.setConcurrentConsumers(MAX_CONSUMERS); + return container; + } + + public ActiveMQConnectionFactory createConnectionFactory(final String url,final boolean useFailover){ + final String failoverUrl="failover:("+url+")"; + final ActiveMQConnectionFactory acf=new ActiveMQConnectionFactory(useFailover?failoverUrl:url); + acf.setCopyMessageOnSend(false); + acf.setUseAsyncSend(false); + acf.setDispatchAsync(true); + acf.setUseCompression(false); + acf.setOptimizeAcknowledge(false); + acf.setOptimizedMessageDispatch(true); + acf.setUseAsyncSend(false); + return acf; + } + + private class TestMessageListener1 implements MessageListener{ + + private final long waitTime; + + public TestMessageListener1(long waitTime){ + this.waitTime=waitTime; + } + + public void onMessage(Message msg){ + try{ + messageCount.incrementAndGet(); + doneLatch.countDown(); + Thread.sleep(waitTime); + }catch(InterruptedException e){ + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + + private class PooledProducerTask implements Runnable{ + + private final String queueName; + private final PooledConnectionFactory pcf; + + public PooledProducerTask(final PooledConnectionFactory pcf,final String queueName){ + this.pcf=pcf; + this.queueName=queueName; + } + + public void run(){ + try{ + final JmsTemplate jmsTemplate=new JmsTemplate(pcf); + jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + jmsTemplate.setExplicitQosEnabled(true); + jmsTemplate.setMessageIdEnabled(false); + jmsTemplate.setMessageTimestampEnabled(false); + jmsTemplate.afterPropertiesSet(); + final byte[] bytes=new byte[2048]; + final Random r=new Random(); + r.nextBytes(bytes); + Thread.sleep(2000); + final AtomicInteger count=new AtomicInteger(); + for(int i=0;i policyEntries = new ArrayList(); + + final PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1); + policyEntries.add(entry); + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + brokerService.setDestinationPolicy(policyMap); + + final TransportConnector tConnector = new TransportConnector(); + tConnector.setUri(new URI(uri1)); + tConnector.setBrokerName(brokerName); + tConnector.setName(brokerName + ".transportConnector"); + brokerService.addConnector(tConnector); + + if (uri2 != null) { + final NetworkConnector nc = new DiscoveryNetworkConnector(new URI( + "static:" + uri2)); + nc.setBridgeTempDestinations(true); + nc.setBrokerName(brokerName); + nc.setName(brokerName + ".nc"); + nc.setPrefetchSize(1); + brokerService.addNetworkConnector(nc); + } + + return brokerService; + + } +} Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java ------------------------------------------------------------------------------ svn:eol-style = native