Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 34153 invoked from network); 21 Jan 2008 10:31:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Jan 2008 10:31:48 -0000 Received: (qmail 39137 invoked by uid 500); 21 Jan 2008 10:31:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 39079 invoked by uid 500); 21 Jan 2008 10:31:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 39069 invoked by uid 99); 21 Jan 2008 10:31:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jan 2008 02:31:38 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Jan 2008 10:31:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 0ECE11A9832; Mon, 21 Jan 2008 02:31:27 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r613830 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ command/ Date: Mon, 21 Jan 2008 10:31:25 -0000 To: commits@activemq.apache.org From: rajdavies@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080121103127.0ECE11A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rajdavies Date: Mon Jan 21 02:31:22 2008 New Revision: 613830 URL: http://svn.apache.org/viewvc?rev=613830&view=rev Log: Fix for https://issues.apache.org/activemq/browse/AMQ-1510 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Mon Jan 21 02:31:22 2008 @@ -104,6 +104,32 @@ public void setMemoryLimit(long limit) { brokerService.getSystemUsage().getMemoryUsage().setLimit(limit); } + + public long getStoreLimit() { + return brokerService.getSystemUsage().getStoreUsage().getLimit(); + } + + public int getStorePercentageUsed() { + return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); + } + + + public long getTmpLimit() { + return brokerService.getSystemUsage().getTempUsage().getLimit(); + } + + public int getTmpPercentageUsed() { + return brokerService.getSystemUsage().getTempUsage().getPercentUsage(); + } + + public void setStoreLimit(long limit) { + brokerService.getSystemUsage().getStoreUsage().setLimit(limit); + } + + public void setTmpLimit(long limit) { + brokerService.getSystemUsage().getTempUsage().setLimit(limit); + } + public void resetStatistics() { broker.getDestinationStatistics().reset(); @@ -289,5 +315,4 @@ throw e.getTargetException(); } } - } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Mon Jan 21 02:31:22 2008 @@ -20,6 +20,7 @@ import org.apache.activemq.Service; + /** * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties method) * @version $Revision$ @@ -65,6 +66,18 @@ long getMemoryLimit(); void setMemoryLimit(long limit); + + int getStorePercentageUsed(); + + long getStoreLimit(); + + void setStoreLimit(long limit); + + int getTmpPercentageUsed(); + + long getTmpLimit(); + + void setTmpLimit(long limit); boolean isPersistent(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Jan 21 02:31:22 2008 @@ -94,15 +94,15 @@ } public int getMemoryPercentageUsed() { - return destination.getBrokerMemoryUsage().getPercentUsage(); + return destination.getMemoryUsage().getPercentUsage(); } public long getMemoryLimit() { - return destination.getBrokerMemoryUsage().getLimit(); + return destination.getMemoryUsage().getLimit(); } public void setMemoryLimit(long limit) { - destination.getBrokerMemoryUsage().setLimit(limit); + destination.getMemoryUsage().setLimit(limit); } public double getAverageEnqueueTime() { @@ -267,4 +267,51 @@ } } + + public int getMaxAuditDepth() { + return destination.getMaxAuditDepth(); + } + + public int getMaxProducersToAudit() { + return destination.getMaxProducersToAudit(); + } + + public boolean isEnableAudit() { + return destination.isEnableAudit(); + } + + + public void setEnableAudit(boolean enableAudit) { + destination.setEnableAudit(enableAudit); + } + + public void setMaxAuditDepth(int maxAuditDepth) { + destination.setMaxAuditDepth(maxAuditDepth); + } + + public void setMaxProducersToAudit(int maxProducersToAudit) { + destination.setMaxProducersToAudit(maxProducersToAudit); + } + + + public float getMemoryLimitPortion() { + return destination.getMemoryUsage().getUsagePortion(); + } + + public long getProducerCount() { + return destination.getDestinationStatistics().getProducers().getCount(); + } + + public boolean isProducerFlowControl() { + return destination.isProducerFlowControl(); + } + + public void setMemoryLimitPortion(float value) { + destination.getMemoryUsage().setUsagePortion(value); + } + + public void setProducerFlowControl(boolean producerFlowControl) { + destination.setProducerFlowControl(producerFlowControl); + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Mon Jan 21 02:31:22 2008 @@ -67,6 +67,11 @@ * @return The number of consumers subscribed this destination. */ long getConsumerCount(); + + /** + * @return the number of producers publishing to the destination + */ + long getProducerCount(); /** * Returns the number of messages in this destination which are yet to be @@ -119,11 +124,32 @@ */ String sendTextMessage(Map headers, String body) throws Exception; + /** + * @return the percentage of amount of memory used + */ int getMemoryPercentageUsed(); + /** + * @return the amount of memory allocated to this destination + */ long getMemoryLimit(); + /** + * set the amount of memory allocated to this destination + * @param limit + */ void setMemoryLimit(long limit); + + /** + * @return the portion of memory from the broker memory limit for this destination + */ + float getMemoryLimitPortion(); + + /** + * set the portion of memory from the broker memory limit for this destination + * @param value + */ + void setMemoryLimitPortion(float value); /** * Browses the current destination returning a list of messages @@ -150,5 +176,34 @@ * @return average time a message is held by a destination */ double getAverageEnqueueTime(); + + /** + * @return the producerFlowControl + */ + boolean isProducerFlowControl(); + /** + * @param producerFlowControl the producerFlowControl to set + */ + public void setProducerFlowControl(boolean producerFlowControl); + + /** + * @return the maxProducersToAudit + */ + public int getMaxProducersToAudit(); + + /** + * @param maxProducersToAudit the maxProducersToAudit to set + */ + public void setMaxProducersToAudit(int maxProducersToAudit); + + /** + * @return the maxAuditDepth + */ + public int getMaxAuditDepth(); + + /** + * @param maxAuditDepth the maxAuditDepth to set + */ + public void setMaxAuditDepth(int maxAuditDepth); } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Jan 21 02:31:22 2008 @@ -64,6 +64,12 @@ this.destinationStatistics.setEnabled(parentStats.isEnabled()); this.destinationStatistics.setParent(parentStats); } + + /** + * initialize the destination + * @throws Exception + */ + public abstract void initialize() throws Exception; /** * @return the producerFlowControl */ @@ -121,7 +127,7 @@ destinationStatistics.getProducers().decrement(); } - public final MemoryUsage getBrokerMemoryUsage() { + public final MemoryUsage getMemoryUsage() { return memoryUsage; } @@ -143,6 +149,11 @@ public final MessageStore getMessageStore() { return store; + } + + public final boolean isActive() { + return destinationStatistics.getConsumers().getCount() != 0 || + destinationStatistics.getProducers().getCount() != 0; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Jan 21 02:31:22 2008 @@ -53,7 +53,7 @@ ActiveMQDestination getActiveMQDestination(); - MemoryUsage getBrokerMemoryUsage(); + MemoryUsage getMemoryUsage(); void dispose(ConnectionContext context) throws IOException; @@ -70,4 +70,20 @@ boolean isProducerFlowControl(); void setProducerFlowControl(boolean value); + + int getMaxProducersToAudit(); + + void setMaxProducersToAudit(int maxProducersToAudit); + + int getMaxAuditDepth(); + + void setMaxAuditDepth(int maxAuditDepth); + + boolean isEnableAudit(); + + void setEnableAudit(boolean enableAudit); + + boolean isActive(); + + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Mon Jan 21 02:31:22 2008 @@ -115,6 +115,7 @@ } Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager, destinationStatistics, taskRunnerFactory); configureTopic(topic, destination); + topic.initialize(); return topic; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Jan 21 02:31:22 2008 @@ -81,8 +81,8 @@ return next.getName(); } - public MemoryUsage getBrokerMemoryUsage() { - return next.getBrokerMemoryUsage(); + public MemoryUsage getMemoryUsage() { + return next.getMemoryUsage(); } public boolean lock(MessageReference node, LockOwner lockOwner) { @@ -141,6 +141,34 @@ throws Exception { next.removeProducer(context, info); } + + public int getMaxAuditDepth() { + return next.getMaxAuditDepth(); + } + + public int getMaxProducersToAudit() { + return next.getMaxProducersToAudit(); + } + + public boolean isEnableAudit() { + return next.isEnableAudit(); + } + + public void setEnableAudit(boolean enableAudit) { + next.setEnableAudit(enableAudit); + } + + public void setMaxAuditDepth(int maxAuditDepth) { + next.setMaxAuditDepth(maxAuditDepth); + } + + public void setMaxProducersToAudit(int maxProducersToAudit) { + next.setMaxProducersToAudit(maxProducersToAudit); + } + + public boolean isActive() { + return next.isActive(); + } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Jan 21 02:31:22 2008 @@ -17,11 +17,12 @@ package org.apache.activemq.broker.region; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; + import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; @@ -31,9 +32,10 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.Usage; import org.apache.activemq.usage.UsageListener; -import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,13 +49,23 @@ private final boolean keepDurableSubsActive; private boolean active; - public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) - throws InvalidSelectorException { + public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) + throws JMSException { super(broker,usageManager, context, info); this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this); this.pending.setSystemUsage(usageManager); this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); + if (dest != null && dest.getMessageStore() != null) { + TopicMessageStore store = (TopicMessageStore)dest.getMessageStore(); + try { + this.enqueueCounter=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName()); + } catch (IOException e) { + JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e); + jmsEx.setLinkedException(e); + throw jmsEx; + } + } } public boolean isActive() { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Jan 21 02:31:22 2008 @@ -150,6 +150,9 @@ return true; } }); + }else { + int messageCount = store.getMessageCount(); + destinationStatistics.getMessages().setCount(messageCount); } } } @@ -320,7 +323,8 @@ final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode(); if (message.isExpired()) { broker.messageExpired(context, message); - destinationStatistics.getMessages().decrement(); + //message not added to stats yet + //destinationStatistics.getMessages().decrement(); if (sendProducerAck) { ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); context.getConnection().dispatchAsync(ack); @@ -346,7 +350,8 @@ // message may have expired. if (broker.isExpired(message)) { broker.messageExpired(context, message); - destinationStatistics.getMessages().decrement(); + //message not added to stats yet + //destinationStatistics.getMessages().decrement(); } else { doMessageSend(producerExchange, message); } @@ -436,7 +441,8 @@ // op, by that time the message could have expired.. if (broker.isExpired(message)) { broker.messageExpired(context, message); - destinationStatistics.getMessages().decrement(); + //message not added to stats yet + //destinationStatistics.getMessages().decrement(); return; } sendMessage(context, message); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Jan 21 02:31:22 2008 @@ -56,8 +56,6 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.kaha.Store; import org.apache.activemq.state.ConnectionState; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.BrokerSupport; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Jan 21 02:31:22 2008 @@ -99,6 +99,13 @@ } this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName()); } + + public void initialize() throws Exception{ + if (store != null) { + int messageCount = store.getMessageCount(); + destinationStatistics.getMessages().setCount(messageCount); + } + } public boolean lock(MessageReference node, LockOwner sub) { return true; @@ -288,7 +295,8 @@ // message may have expired. if (broker.isExpired(message)) { broker.messageExpired(context, message); - destinationStatistics.getMessages().decrement(); + //destinationStatistics.getEnqueues().increment(); + //destinationStatistics.getMessages().decrement(); } else { doMessageSend(producerExchange, message); } @@ -394,7 +402,8 @@ if (broker.isExpired(message)) { broker.messageExpired(context, message); message.decrementReferenceCount(); - destinationStatistics.getMessages().decrement(); + //destinationStatistics.getEnqueues().increment(); + //destinationStatistics.getMessages().decrement(); return; } try { @@ -543,6 +552,7 @@ // Implementation methods // ------------------------------------------------------------------------- protected void dispatch(final ConnectionContext context, Message message) throws Exception { + destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); dispatchValve.increment(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Jan 21 02:31:22 2008 @@ -229,9 +229,17 @@ } SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); DurableTopicSubscription sub = durableSubscriptions.get(key); + ActiveMQDestination destination = info.getDestination(); if (sub == null) { - sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive); - ActiveMQDestination destination = info.getDestination(); + Destination dest=null; + try { + dest = lookup(context, destination); + } catch (Exception e) { + JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e); + jmsEx.setLinkedException(e); + throw jmsEx; + } + sub = new DurableTopicSubscription(broker,dest, usageManager, context, info, keepDurableSubsActive); if (destination != null && broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Jan 21 02:31:22 2008 @@ -67,7 +67,7 @@ } queue.setMessageGroupMapFactory(getMessageGroupMapFactory()); if (memoryLimit > 0) { - queue.getBrokerMemoryUsage().setLimit(memoryLimit); + queue.getMemoryUsage().setLimit(memoryLimit); } if (pendingQueuePolicy != null) { PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore); @@ -91,7 +91,7 @@ } topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers); if (memoryLimit > 0) { - topic.getBrokerMemoryUsage().setLimit(memoryLimit); + topic.getMemoryUsage().setLimit(memoryLimit); } topic.setProducerFlowControl(isProducerFlowControl()); topic.setEnableAudit(isEnableAudit()); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=613830&r1=613829&r2=613830&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon Jan 21 02:31:22 2008 @@ -571,7 +571,7 @@ public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) { this.regionDestination = destination; if(this.memoryUsage==null) { - this.memoryUsage=regionDestination.getBrokerMemoryUsage(); + this.memoryUsage=regionDestination.getMemoryUsage(); } }