Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A118310AE7 for ; Mon, 11 Nov 2013 16:49:14 +0000 (UTC) Received: (qmail 76790 invoked by uid 500); 11 Nov 2013 16:49:12 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 76692 invoked by uid 500); 11 Nov 2013 16:49:09 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 76670 invoked by uid 99); 11 Nov 2013 16:49:06 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Nov 2013 16:49:06 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 59B298A24C3; Mon, 11 Nov 2013 16:49:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: <99f96b26d4094ddebcae1b79a4b1e933@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: https://issues.apache.org/jira/browse/AMQ-4853 Date: Mon, 11 Nov 2013 16:49:06 +0000 (UTC) Updated Branches: refs/heads/trunk 38ab4b10a -> 655228722 https://issues.apache.org/jira/browse/AMQ-4853 adds a little test and preserves some performance tests if needed later. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/65522872 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/65522872 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/65522872 Branch: refs/heads/trunk Commit: 6552287221c1706684fb2babe5d9777aabd09531 Parents: 38ab4b1 Author: Timothy Bish Authored: Mon Nov 11 11:49:03 2013 -0500 Committer: Timothy Bish Committed: Mon Nov 11 11:49:03 2013 -0500 ---------------------------------------------------------------------- .../org/apache/activemq/bugs/AMQ4853Test.java | 300 +++++++++++++++++++ 1 file changed, 300 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/65522872/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java new file mode 100644 index 0000000..a347279 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4853Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4853Test.class); + private static BrokerService brokerService; + private static final String BROKER_ADDRESS = "tcp://localhost:0"; + private static final ActiveMQQueue DESTINATION = new ActiveMQQueue("TEST.QUEUE"); + private CountDownLatch cycleDoneLatch; + + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.setAdvisorySupport(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + /** + * Test to shows the performance of the removing consumers while other stay active. + * @throws Exception + */ + @Ignore + @Test + public void test() throws Exception { + + // Create a stable set of consumers to fill in the advisory broker's consumer list. + ArrayList fixedConsumers = new ArrayList(100); + for (int i = 0; i < 200; ++i) { + fixedConsumers.add(new Consumer()); + } + + // Create a set of consumers that comes online for a short time and then + // goes offline again. Cycles will repeat as each batch completes + final int fixedDelayConsumers = 300; + final int fixedDelayCycles = 25; + + final CountDownLatch fixedDelayCycleLatch = new CountDownLatch(fixedDelayCycles); + + // Update so done method can track state. + cycleDoneLatch = fixedDelayCycleLatch; + + CyclicBarrier barrier = new CyclicBarrier(fixedDelayConsumers, new Runnable() { + @Override + public void run() { + LOG.info("Fixed delay consumers cycle {} completed.", fixedDelayCycleLatch.getCount()); + fixedDelayCycleLatch.countDown(); + } + }); + + for (int i = 0; i < fixedDelayConsumers; ++i) { + new Thread(new FixedDelyConsumer(barrier)).start(); + } + + fixedDelayCycleLatch.await(10, TimeUnit.MINUTES); + + // Clean up. + + for (Consumer consumer : fixedConsumers) { + consumer.close(); + } + fixedConsumers.clear(); + } + + private ConnectionInfo createConnectionInfo() { + ConnectionId id = new ConnectionId(); + id.setValue("ID:123456789:0:1"); + + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(id); + return info; + } + + private SessionInfo createSessionInfo(ConnectionInfo connection) { + SessionId id = new SessionId(connection.getConnectionId(), 1); + + SessionInfo info = new SessionInfo(); + info.setSessionId(id); + + return info; + } + + public ConsumerInfo createConsumerInfo(SessionInfo session, int value, ActiveMQDestination destination) { + ConsumerId id = new ConsumerId(); + id.setConnectionId(session.getSessionId().getConnectionId()); + id.setSessionId(1); + id.setValue(value); + + ConsumerInfo info = new ConsumerInfo(); + info.setConsumerId(id); + info.setDestination(destination); + return info; + } + + /** + * Test to shows the performance impact of removing consumers in various scenarios. + * @throws Exception + */ + @Ignore + @Test + public void testPerformanceOfRemovals() throws Exception { + // setup + AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); + ActiveMQDestination destination = new ActiveMQQueue("foo"); + ConnectionInfo connectionInfo = createConnectionInfo(); + ConnectionContext connectionContext = new ConnectionContext(connectionInfo); + connectionContext.setBroker(brokerService.getBroker()); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + + long start = System.currentTimeMillis(); + + for (int i = 0; i < 200; ++i) { + + for (int j = 1; j <= 500; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.addConsumer(connectionContext, consumerInfo); + } + + for (int j = 500; j > 0; j--) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.removeConsumer(connectionContext, consumerInfo); + } + + for (int j = 1; j <= 500; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.addConsumer(connectionContext, consumerInfo); + } + + for (int j = 1; j <= 500; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.removeConsumer(connectionContext, consumerInfo); + } + } + + long finish = System.currentTimeMillis(); + + long totalTime = finish - start; + + LOG.info("Total test time: {} seconds", TimeUnit.MILLISECONDS.toSeconds(totalTime)); + + assertEquals(0, testObj.getAdvisoryConsumers().size()); + } + + @Test + public void testEqualsNeeded() throws Exception { + // setup + AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); + ActiveMQDestination destination = new ActiveMQQueue("foo"); + ConnectionInfo connectionInfo = createConnectionInfo(); + ConnectionContext connectionContext = new ConnectionContext(connectionInfo); + connectionContext.setBroker(brokerService.getBroker()); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + + for (int j = 1; j <= 5; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.addConsumer(connectionContext, consumerInfo); + } + + for (int j = 1; j <= 5; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.removeConsumer(connectionContext, consumerInfo); + } + + assertEquals(0, testObj.getAdvisoryConsumers().size()); + } + + private boolean done() { + if (cycleDoneLatch == null) { + return true; + } + return cycleDoneLatch.getCount() == 0; + } + + class Consumer implements MessageListener { + + Connection connection; + Session session; + Destination destination; + MessageConsumer consumer; + + Consumer() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(DESTINATION); + consumer.setMessageListener(this); + connection.start(); + } + + @Override + public void onMessage(Message message) { + } + + public void close() { + try { + connection.close(); + } catch(Exception e) { + } + + connection = null; + session = null; + consumer = null; + } + } + + class FixedDelyConsumer implements Runnable { + + private final CyclicBarrier barrier; + private final int sleepInterval; + + public FixedDelyConsumer(CyclicBarrier barrier) { + this.barrier = barrier; + this.sleepInterval = 1000; + } + + public FixedDelyConsumer(CyclicBarrier barrier, int sleepInterval) { + this.barrier = barrier; + this.sleepInterval = sleepInterval; + } + + @Override + public void run() { + while (!done()) { + + try { + Consumer consumer = new Consumer(); + TimeUnit.MILLISECONDS.sleep(sleepInterval); + consumer.close(); + barrier.await(); + } catch (Exception ex) { + return; + } + } + } + } + +}