activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r613830 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/jmx/ broker/region/ broker/region/policy/ command/
Date Mon, 21 Jan 2008 10:31:25 GMT
Author: rajdavies
Date: Mon Jan 21 02:31:22 2008
New Revision: 613830

URL: http://svn.apache.org/viewvc?rev=613830&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1510

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Mon Jan 21 02:31:22 2008
@@ -104,6 +104,32 @@
     public void setMemoryLimit(long limit) {
         brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
     }
+    
+    public long getStoreLimit() {
+        return brokerService.getSystemUsage().getStoreUsage().getLimit();
+    }
+
+    public int getStorePercentageUsed() {
+        return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
+    }
+
+ 
+    public long getTmpLimit() {
+       return brokerService.getSystemUsage().getTempUsage().getLimit();
+    }
+
+    public int getTmpPercentageUsed() {
+       return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
+    }
+
+    public void setStoreLimit(long limit) {
+        brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
+    }
+
+    public void setTmpLimit(long limit) {
+        brokerService.getSystemUsage().getTempUsage().setLimit(limit);
+    }
+    
 
     public void resetStatistics() {
         broker.getDestinationStatistics().reset();
@@ -289,5 +315,4 @@
             throw e.getTargetException();
         }
     }
-    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
Mon Jan 21 02:31:22 2008
@@ -20,6 +20,7 @@
 
 import org.apache.activemq.Service;
 
+
 /**
  * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties
method)
  * @version $Revision$
@@ -65,6 +66,18 @@
     long getMemoryLimit();
 
     void setMemoryLimit(long limit);
+        
+    int getStorePercentageUsed();
+
+    long getStoreLimit();
+
+    void setStoreLimit(long limit);
+    
+    int getTmpPercentageUsed();
+
+    long getTmpLimit();
+
+    void setTmpLimit(long limit);
     
     boolean isPersistent();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
Mon Jan 21 02:31:22 2008
@@ -94,15 +94,15 @@
     }
 
     public int getMemoryPercentageUsed() {
-        return destination.getBrokerMemoryUsage().getPercentUsage();
+        return destination.getMemoryUsage().getPercentUsage();
     }
 
     public long getMemoryLimit() {
-        return destination.getBrokerMemoryUsage().getLimit();
+        return destination.getMemoryUsage().getLimit();
     }
 
     public void setMemoryLimit(long limit) {
-        destination.getBrokerMemoryUsage().setLimit(limit);
+        destination.getMemoryUsage().setLimit(limit);
     }
 
     public double getAverageEnqueueTime() {
@@ -267,4 +267,51 @@
         }
 
     }
+
+    public int getMaxAuditDepth() {
+        return destination.getMaxAuditDepth();
+     }
+
+     public int getMaxProducersToAudit() {
+         return destination.getMaxProducersToAudit();
+     }
+
+     public boolean isEnableAudit() {
+         return destination.isEnableAudit();
+     }
+
+     
+     public void setEnableAudit(boolean enableAudit) {
+         destination.setEnableAudit(enableAudit);
+     }
+
+     public void setMaxAuditDepth(int maxAuditDepth) {
+         destination.setMaxAuditDepth(maxAuditDepth);
+     }
+ 
+     public void setMaxProducersToAudit(int maxProducersToAudit) {
+         destination.setMaxProducersToAudit(maxProducersToAudit);
+     }
+
+    
+    public float getMemoryLimitPortion() {
+        return destination.getMemoryUsage().getUsagePortion();
+    }
+
+    public long getProducerCount() {
+        return destination.getDestinationStatistics().getProducers().getCount();
+    }
+
+    public boolean isProducerFlowControl() {
+       return destination.isProducerFlowControl();
+    }
+    
+    public void setMemoryLimitPortion(float value) {
+        destination.getMemoryUsage().setUsagePortion(value);
+    }
+
+    public void setProducerFlowControl(boolean producerFlowControl) {
+      destination.setProducerFlowControl(producerFlowControl);      
+    }
+  
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
Mon Jan 21 02:31:22 2008
@@ -67,6 +67,11 @@
      * @return The number of consumers subscribed this destination.
      */
     long getConsumerCount();
+    
+    /**
+     * @return the number of producers publishing to the destination
+     */
+    long getProducerCount();
 
     /**
      * Returns the number of messages in this destination which are yet to be
@@ -119,11 +124,32 @@
      */
     String sendTextMessage(Map headers, String body) throws Exception;
 
+    /**
+     * @return the percentage of amount of memory used
+     */
     int getMemoryPercentageUsed();
 
+    /**
+     * @return the amount of memory allocated to this destination
+     */
     long getMemoryLimit();
 
+    /**
+     * set the amount of memory allocated to this destination
+     * @param limit
+     */
     void setMemoryLimit(long limit);
+    
+    /**
+     * @return the portion of memory from the broker memory limit for this destination
+     */
+    float getMemoryLimitPortion();
+    
+    /**
+     * set the portion of memory from the broker memory limit for this destination
+     * @param value
+     */
+    void setMemoryLimitPortion(float value);
 
     /**
      * Browses the current destination returning a list of messages
@@ -150,5 +176,34 @@
      * @return average time a message is held by a destination
      */
     double getAverageEnqueueTime();
+    
+    /**
+     * @return the producerFlowControl
+     */
+    boolean isProducerFlowControl();
+    /**
+     * @param producerFlowControl the producerFlowControl to set
+     */
+    public void setProducerFlowControl(boolean producerFlowControl);
+    
+    /**
+     * @return the maxProducersToAudit
+     */
+    public int getMaxProducersToAudit();
+    
+    /**
+     * @param maxProducersToAudit the maxProducersToAudit to set
+     */
+    public void setMaxProducersToAudit(int maxProducersToAudit);
+    
+    /**
+     * @return the maxAuditDepth
+     */
+    public int getMaxAuditDepth();
+    
+    /**
+     * @param maxAuditDepth the maxAuditDepth to set
+     */
+    public void setMaxAuditDepth(int maxAuditDepth);
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Mon Jan 21 02:31:22 2008
@@ -64,6 +64,12 @@
         this.destinationStatistics.setEnabled(parentStats.isEnabled());
         this.destinationStatistics.setParent(parentStats);        
     }
+    
+    /**
+     * initialize the destination
+     * @throws Exception
+     */
+    public abstract void initialize() throws Exception;
     /**
      * @return the producerFlowControl
      */
@@ -121,7 +127,7 @@
         destinationStatistics.getProducers().decrement();
     }
     
-    public final MemoryUsage getBrokerMemoryUsage() {
+    public final MemoryUsage getMemoryUsage() {
         return memoryUsage;
     }
 
@@ -143,6 +149,11 @@
     
     public final MessageStore getMessageStore() {
         return store;
+    }
+    
+    public final boolean isActive() {
+        return destinationStatistics.getConsumers().getCount() != 0 ||
+            destinationStatistics.getProducers().getCount() != 0;
     }
 
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Mon Jan 21 02:31:22 2008
@@ -53,7 +53,7 @@
 
     ActiveMQDestination getActiveMQDestination();
 
-    MemoryUsage getBrokerMemoryUsage();
+    MemoryUsage getMemoryUsage();
 
     void dispose(ConnectionContext context) throws IOException;
 
@@ -70,4 +70,20 @@
     boolean isProducerFlowControl();
     
     void setProducerFlowControl(boolean value);
+    
+    int getMaxProducersToAudit();
+    
+    void setMaxProducersToAudit(int maxProducersToAudit);
+   
+    int getMaxAuditDepth();
+   
+    void setMaxAuditDepth(int maxAuditDepth);
+  
+    boolean isEnableAudit();
+    
+    void setEnableAudit(boolean enableAudit);
+    
+    boolean isActive();
+    
+    
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
Mon Jan 21 02:31:22 2008
@@ -115,6 +115,7 @@
             }
             Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager,
destinationStatistics, taskRunnerFactory);
             configureTopic(topic, destination);
+            topic.initialize();
             return topic;
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Mon Jan 21 02:31:22 2008
@@ -81,8 +81,8 @@
         return next.getName();
     }
 
-    public MemoryUsage getBrokerMemoryUsage() {
-        return next.getBrokerMemoryUsage();
+    public MemoryUsage getMemoryUsage() {
+        return next.getMemoryUsage();
     }
 
     public boolean lock(MessageReference node, LockOwner lockOwner) {
@@ -141,6 +141,34 @@
             throws Exception {
        next.removeProducer(context, info);
     }
+
+    public int getMaxAuditDepth() {
+       return next.getMaxAuditDepth();
+    }
+
+    public int getMaxProducersToAudit() {
+        return next.getMaxProducersToAudit();
+    }
+
+    public boolean isEnableAudit() {
+        return next.isEnableAudit();
+    }
+
     
+    public void setEnableAudit(boolean enableAudit) {
+        next.setEnableAudit(enableAudit);
+    }
+
+    public void setMaxAuditDepth(int maxAuditDepth) {
+       next.setMaxAuditDepth(maxAuditDepth);
+    }
+
     
+    public void setMaxProducersToAudit(int maxProducersToAudit) {
+       next.setMaxProducersToAudit(maxProducersToAudit);
+    }
+    
+    public boolean isActive() {
+        return next.isActive();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Jan 21 02:31:22 2008
@@ -17,11 +17,12 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+
 import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@@ -31,9 +32,10 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
-import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,13 +49,23 @@
     private final boolean keepDurableSubsActive;
     private boolean active;
 
-    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext
context, ConsumerInfo info, boolean keepDurableSubsActive)
-        throws InvalidSelectorException {
+    public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager,
ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
+        throws JMSException {
         super(broker,usageManager, context, info);
         this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(),
broker.getTempDataStore(), info.getPrefetchSize(), this);
         this.pending.setSystemUsage(usageManager);
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
+        if (dest != null && dest.getMessageStore() != null) {
+            TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
+            try {
+                this.enqueueCounter=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
+            } catch (IOException e) {
+                JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from
store "+ e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+        }
     }
 
     public boolean isActive() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Jan 21 02:31:22 2008
@@ -150,6 +150,9 @@
                         return true;
                     }
                 });
+            }else {
+                int messageCount = store.getMessageCount();
+                destinationStatistics.getMessages().setCount(messageCount);
             }
         }
     }
@@ -320,7 +323,8 @@
         final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize()
> 0 && !context.isInRecoveryMode();
         if (message.isExpired()) {
             broker.messageExpired(context, message);
-            destinationStatistics.getMessages().decrement();
+            //message not added to stats yet
+            //destinationStatistics.getMessages().decrement();
             if (sendProducerAck) {
                 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
                 context.getConnection().dispatchAsync(ack);
@@ -346,7 +350,8 @@
                                 // message may have expired.
                                 if (broker.isExpired(message)) {
                                     broker.messageExpired(context, message);
-                                    destinationStatistics.getMessages().decrement();
+                                    //message not added to stats yet
+                                    //destinationStatistics.getMessages().decrement();
                                 } else {
                                     doMessageSend(producerExchange, message);
                                 }
@@ -436,7 +441,8 @@
                         // op, by that time the message could have expired..
                         if (broker.isExpired(message)) {
                             broker.messageExpired(context, message);
-                            destinationStatistics.getMessages().decrement();
+                            //message not added to stats yet
+                            //destinationStatistics.getMessages().decrement();
                             return;
                         }
                         sendMessage(context, message);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Jan 21 02:31:22 2008
@@ -56,8 +56,6 @@
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Mon Jan 21 02:31:22 2008
@@ -99,6 +99,13 @@
         } 
         this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
     }
+    
+    public void initialize() throws Exception{
+        if (store != null) {
+            int messageCount = store.getMessageCount();
+            destinationStatistics.getMessages().setCount(messageCount);
+        }
+    }
 
     public boolean lock(MessageReference node, LockOwner sub) {
         return true;
@@ -288,7 +295,8 @@
                                 // message may have expired.
                                 if (broker.isExpired(message)) {
                                     broker.messageExpired(context, message);
-                                    destinationStatistics.getMessages().decrement();
+                                    //destinationStatistics.getEnqueues().increment();
+                                    //destinationStatistics.getMessages().decrement();
                                 } else {
                                     doMessageSend(producerExchange, message);
                                 }
@@ -394,7 +402,8 @@
                     if (broker.isExpired(message)) {
                         broker.messageExpired(context, message);
                         message.decrementReferenceCount();
-                        destinationStatistics.getMessages().decrement();
+                        //destinationStatistics.getEnqueues().increment();
+                        //destinationStatistics.getMessages().decrement();
                         return;
                     }
                     try {
@@ -543,6 +552,7 @@
     // Implementation methods
     // -------------------------------------------------------------------------
     protected void dispatch(final ConnectionContext context, Message message) throws Exception
{
+        destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
         dispatchValve.increment();
         MessageEvaluationContext msgContext = context.getMessageEvaluationContext();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Mon Jan 21 02:31:22 2008
@@ -229,9 +229,17 @@
             }
             SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
             DurableTopicSubscription sub = durableSubscriptions.get(key);
+            ActiveMQDestination destination = info.getDestination();
             if (sub == null) {
-                sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
-                ActiveMQDestination destination = info.getDestination();
+                Destination dest=null;
+                try {
+                    dest = lookup(context, destination);
+                } catch (Exception e) {
+                    JMSException jmsEx = new JMSException("Failed to retrieve destination
from region "+ e);
+                    jmsEx.setLinkedException(e);
+                    throw jmsEx;
+                }
+                sub = new DurableTopicSubscription(broker,dest, usageManager, context, info,
keepDurableSubsActive);
                 if (destination != null && broker.getDestinationPolicy() != null)
{
                     PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
                     if (entry != null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Jan 21 02:31:22 2008
@@ -67,7 +67,7 @@
         }
         queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
         if (memoryLimit > 0) {
-            queue.getBrokerMemoryUsage().setLimit(memoryLimit);
+            queue.getMemoryUsage().setLimit(memoryLimit);
         }
         if (pendingQueuePolicy != null) {
             PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue,
tmpStore);
@@ -91,7 +91,7 @@
         }
         topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
         if (memoryLimit > 0) {
-            topic.getBrokerMemoryUsage().setLimit(memoryLimit);
+            topic.getMemoryUsage().setLimit(memoryLimit);
         }
         topic.setProducerFlowControl(isProducerFlowControl());
         topic.setEnableAudit(isEnableAudit());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=613830&r1=613829&r2=613830&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon
Jan 21 02:31:22 2008
@@ -571,7 +571,7 @@
     public void setRegionDestination(org.apache.activemq.broker.region.Destination destination)
{
         this.regionDestination = destination;
         if(this.memoryUsage==null) {
-            this.memoryUsage=regionDestination.getBrokerMemoryUsage();
+            this.memoryUsage=regionDestination.getMemoryUsage();
         }
     }
     



Mime
View raw message