Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A821CD810 for ; Tue, 2 Oct 2012 20:51:50 +0000 (UTC) Received: (qmail 52315 invoked by uid 500); 2 Oct 2012 20:51:50 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 52282 invoked by uid 500); 2 Oct 2012 20:51:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 52274 invoked by uid 99); 2 Oct 2012 20:51:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Oct 2012 20:51:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Oct 2012 20:51:46 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0EF2A23888CD for ; Tue, 2 Oct 2012 20:51:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1393174 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/jmx/ src/main/java/org/apache/activemq/broker/region/ src/test/java/org/apache/activemq/usecases/ Date: Tue, 02 Oct 2012 20:51:01 -0000 To: commits@activemq.apache.org From: gtully@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121002205102.0EF2A23888CD@eris.apache.org> Author: gtully Date: Tue Oct 2 20:51:00 2012 New Revision: 1393174 URL: http://svn.apache.org/viewvc?rev=1393174&view=rev Log: https://issues.apache.org/jira/browse/AMQ-4091 - apply destination policy to temp topics, allows a memory limit to be specified to partition temp dest usage and enable producer flow control Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java (with props) Modified: activemq/trunk/activemq-core/pom.xml activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java Modified: activemq/trunk/activemq-core/pom.xml URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/pom.xml (original) +++ activemq/trunk/activemq-core/pom.xml Tue Oct 2 20:51:00 2012 @@ -607,6 +607,7 @@ org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.* org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.* org/apache/activemq/usecases/TopicProducerFlowControlTest.* + org/apache/activemq/usecases/TempTopicProducerFlowControlTest.* org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.* org/apache/activemq/bugs/AMQ2314Test.* org/apache/activemq/kaha/LoadTest.* Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Tue Oct 2 20:51:00 2012 @@ -147,7 +147,7 @@ public class ManagedRegionBroker extends @Override protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } @Override Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Tue Oct 2 20:51:00 2012 @@ -35,9 +35,9 @@ public class ManagedTempQueueRegion exte private final ManagedRegionBroker regionBroker; - public ManagedTempQueueRegion(ManagedRegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, + public ManagedTempQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - super(broker, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); this.regionBroker = broker; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java Tue Oct 2 20:51:00 2012 @@ -78,17 +78,13 @@ public abstract class AbstractTempRegion } } - protected abstract Destination doCreateDestination( - ConnectionContext context, ActiveMQDestination destination) - throws Exception; - protected synchronized Destination createDestination( ConnectionContext context, ActiveMQDestination destination) throws Exception { Destination result = cachedDestinations.remove(new CachedDestination( destination)); if (result == null) { - result = doCreateDestination(context, destination); + result = destinationFactory.createDestination(context, destination, destinationStatistics); } return result; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Tue Oct 2 20:51:00 2012 @@ -77,6 +77,7 @@ public class DestinationFactoryImpl exte if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory); + configureQueue(queue, destination); queue.initialize(); return queue; } else { @@ -89,6 +90,7 @@ public class DestinationFactoryImpl exte } else if (destination.isTemporary()) { Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory); + configureTopic(topic, destination); topic.initialize(); return topic; } else { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Oct 2 20:51:00 2012 @@ -169,7 +169,7 @@ public class RegionBroker extends EmptyB } protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Tue Oct 2 20:51:00 2012 @@ -33,24 +33,10 @@ import org.slf4j.LoggerFactory; * */ public class TempQueueRegion extends AbstractTempRegion { - private static final Logger LOG = LoggerFactory.getLogger(TempQueueRegion.class); - private final BrokerService brokerService; - - public TempQueueRegion(RegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, + + public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); - // We should allow the following to be configurable via a Destination - // Policy - // setAutoCreateDestinations(false); - this.brokerService = brokerService; - } - - protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { - TempQueue result = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory); - brokerService.getDestinationPolicy(); - configureQueue(result, destination); - result.initialize(); - return result; } protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { @@ -87,17 +73,4 @@ public class TempQueueRegion extends Abs public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { processDispatchNotificationViaDestination(messageDispatchNotification); } - - protected void configureQueue(Queue queue, ActiveMQDestination destination) { - if (broker == null) { - throw new IllegalStateException("broker property is not set"); - } - if (broker.getDestinationPolicy() != null) { - PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); - if (entry != null) { - entry.configure(broker,queue); - } - } - } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Tue Oct 2 20:51:00 2012 @@ -21,6 +21,7 @@ import javax.jms.JMSException; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; @@ -37,9 +38,6 @@ public class TempTopicRegion extends Abs public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); - // We should allow the following to be configurable via a Destination - // Policy - // setAutoCreateDestinations(false); } protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException { @@ -82,9 +80,4 @@ public class TempTopicRegion extends Abs super.removeDestination(context, destination, timeout); } - - protected Destination doCreateDestination(ConnectionContext context, - ActiveMQDestination destination) throws Exception { - return destinationFactory.createDestination(context, destination, destinationStatistics); - } } Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java?rev=1393174&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java Tue Oct 2 20:51:00 2012 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import javax.jms.Destination; +import javax.jms.Session; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; + +public class TempTopicProducerFlowControlTest extends TopicProducerFlowControlTest { + + @Override + protected void setDestinationPolicy(BrokerService broker, PolicyMap pm) { + PolicyEntry tpe = new PolicyEntry(); + tpe.setTempTopic(true); + tpe.setMemoryLimit(destinationMemLimit); + tpe.setProducerFlowControl(true); + tpe.setAdvisoryWhenFull(true); + pm.setDefaultEntry(tpe); + + broker.setDestinationPolicy(pm); + } + + @Override + protected Destination createDestination(Session session) throws Exception { + return session.createTemporaryTopic(); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TempTopicProducerFlowControlTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java?rev=1393174&r1=1393173&r2=1393174&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java Tue Oct 2 20:51:00 2012 @@ -30,6 +30,8 @@ import javax.jms.Session; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -103,6 +105,9 @@ public class TopicProducerFlowControlTes connectionFactory.setAlwaysSyncSend(true); connectionFactory.setProducerWindowSize(1024); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(5000); + connectionFactory.setPrefetchPolicy(prefetchPolicy); // Start the test destination listener Connection c = connectionFactory.createConnection(); c.start(); @@ -158,10 +163,10 @@ public class TopicProducerFlowControlTes assertEquals("Didn't consume all messages", numMessagesToSend, consumed.get()); assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() { - public boolean isSatisified() throws Exception { - return blockedCounter.get() > 0; - } - }, 5 * 1000)); + public boolean isSatisified() throws Exception { + return blockedCounter.get() > 0; + } + }, 5 * 1000)); } protected Destination createDestination(Session listenerSession) throws Exception {