Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 99350 invoked from network); 14 Jul 2010 13:57:18 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 14 Jul 2010 13:57:18 -0000 Received: (qmail 48442 invoked by uid 500); 14 Jul 2010 13:57:18 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 48376 invoked by uid 500); 14 Jul 2010 13:57:15 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 48369 invoked by uid 99); 14 Jul 2010 13:57:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Jul 2010 13:57:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Jul 2010 13:57:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DD63623889BB; Wed, 14 Jul 2010 13:56:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r964051 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java Date: Wed, 14 Jul 2010 13:56:17 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100714135617.DD63623889BB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gtully Date: Wed Jul 14 13:56:17 2010 New Revision: 964051 URL: http://svn.apache.org/viewvc?rev=964051&view=rev Log: add test that validates https://issues.apache.org/activemq/browse/AMQ-1893 is resolved on trunk Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java (with props) Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java?rev=964051&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java Wed Jul 14 13:56:17 2010 @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class AMQ1893Test extends TestCase { + + private static final Log log = LogFactory.getLog(AMQ1893Test.class); + + static final String QUEUE_NAME = "TEST"; + + static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000; + + static final int[] PRIORITIES = new int[]{0, 5, 10}; + + static final boolean debug = false; + + private BrokerService brokerService; + + private ActiveMQQueue destination; + + @Override + protected void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + destination = new ActiveMQQueue(QUEUE_NAME); + } + + @Override + protected void tearDown() throws Exception { + // Stop any running threads. + brokerService.stop(); + } + + + public void testProduceConsumeWithSelector() throws Exception { + new TestProducer().produceMessages(); + new TestConsumer().consume(); + } + + + class TestProducer { + + public void produceMessages() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + brokerService.getTransportConnectors().get(0).getConnectUri().toString() + ); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(QUEUE_NAME); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + long start = System.currentTimeMillis(); + + for (int priority : PRIORITIES) { + + String name = null; + if (priority == 10) { + name = "high"; + } else if (priority == 5) { + name = "mid"; + } else { + name = "low"; + } + + for (int i = 1; i <= MESSAGE_COUNT_OF_ONE_GROUP; i++) { + + TextMessage message = session.createTextMessage(name + "_" + i); + message.setIntProperty("priority", priority); + + producer.send(message); + } + } + + long end = System.currentTimeMillis(); + + log.info("sent " + (MESSAGE_COUNT_OF_ONE_GROUP * 3) + " messages in " + (end - start) + " ms"); + + producer.close(); + session.close(); + connection.close(); + } + } + + class TestConsumer { + + private CountDownLatch finishLatch = new CountDownLatch(1); + + + + public void consume() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + brokerService.getTransportConnectors().get(0).getConnectUri().toString() + ); + + + final int totalMessageCount = MESSAGE_COUNT_OF_ONE_GROUP * PRIORITIES.length; + final AtomicInteger counter = new AtomicInteger(); + final MessageListener listener = new MessageListener() { + public void onMessage(Message message) { + + if (debug) { + try { + log.info(((TextMessage) message).getText()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + if (counter.incrementAndGet() == totalMessageCount) { + + finishLatch.countDown(); + + } + } + }; + + int consumerCount = PRIORITIES.length; + Connection[] connections = new Connection[consumerCount]; + Session[] sessions = new Session[consumerCount]; + MessageConsumer[] consumers = new MessageConsumer[consumerCount]; + + for (int i = 0; i < consumerCount; i++) { + String selector = "priority = " + PRIORITIES[i]; + + connections[i] = connectionFactory.createConnection(); + sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + consumers[i] = sessions[i].createConsumer(destination, selector); + consumers[i].setMessageListener(listener); + } + + for (Connection connection : connections) { + connection.start(); + } + + log.info("received " + counter.get() + " messages"); + + assertTrue("got all messages in time", finishLatch.await(60, TimeUnit.SECONDS)); + + log.info("received " + counter.get() + " messages"); + + for (MessageConsumer consumer : consumers) { + consumer.close(); + } + + for (Session session : sessions) { + session.close(); + } + + for (Connection connection : connections) { + connection.close(); + } + } + + } + +} \ No newline at end of file Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java ------------------------------------------------------------------------------ svn:keywords = Rev Date