Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EFE6A79A1 for ; Mon, 10 Oct 2011 15:31:31 +0000 (UTC) Received: (qmail 37666 invoked by uid 500); 10 Oct 2011 15:31:31 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 37621 invoked by uid 500); 10 Oct 2011 15:31:31 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 37614 invoked by uid 99); 10 Oct 2011 15:31:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2011 15:31:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2011 15:31:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3AE38238889B for ; Mon, 10 Oct 2011 15:31:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1181022 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java Date: Mon, 10 Oct 2011 15:31:10 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111010153110.3AE38238889B@eris.apache.org> Author: tabish Date: Mon Oct 10 15:31:09 2011 New Revision: 1181022 URL: http://svn.apache.org/viewvc?rev=1181022&view=rev Log: Test for: https://issues.apache.org/jira/browse/AMQ-3324 Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java (with props) Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java?rev=1181022&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java Mon Oct 10 15:31:09 2011 @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.MirroredQueue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3324Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class); + + private static final String bindAddress = "tcp://0.0.0.0:0"; + private BrokerService broker; + private ActiveMQConnectionFactory cf; + + private static final int MESSAGE_COUNT = 100; + + @Before + public void setUp() throws Exception { + broker = this.createBroker(); + String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); + broker.start(); + broker.waitUntilStarted(); + + cf = new ActiveMQConnectionFactory(address); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception { + + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final TemporaryQueue queue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(queue); + + final Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); + + MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); + MessageProducer producer = session.createProducer(queue); + + // send lots of messages to the tempQueue + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = session.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + + // consume one message from tempQueue + Message msg = consumer.receive(5000); + assertNotNull(msg); + + // check one advisory message has produced on the advisoryTopic + Message advCmsg = advisoryConsumer.receive(5000); + assertNotNull(advCmsg); + + connection.close(); + LOG.debug("Connection closed, destinations should now become inactive."); + + assertTrue("The destination " + advisoryTopic + "was not removed. ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getTopics().length == 0; + } + })); + + assertTrue("The destination " + queue + " was not removed. ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getTemporaryQueues().length == 0; + } + })); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseMirroredQueues(true); + answer.setPersistent(false); + answer.setSchedulePeriodForDestinationPurge(1000); + + PolicyEntry entry = new PolicyEntry(); + entry.setGcInactiveDestinations(true); + entry.setInactiveTimoutBeforeGC(2000); + entry.setProducerFlowControl(true); + entry.setAdvisoryForConsumed(true); + entry.setAdvisdoryForFastProducers(true); + entry.setAdvisoryForDelivery(true); + PolicyMap map = new PolicyMap(); + map.setDefaultEntry(entry); + + MirroredQueue mirrorQ = new MirroredQueue(); + mirrorQ.setCopyMessage(true); + DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ}; + answer.setDestinationInterceptors(destinationInterceptors); + + answer.setDestinationPolicy(map); + answer.addConnector(bindAddress); + + return answer; + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java ------------------------------------------------------------------------------ svn:eol-style = native