activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r389941 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/memory/ main/java/org/apach...
Date Thu, 30 Mar 2006 00:36:24 GMT
Author: chirino
Date: Wed Mar 29 16:36:21 2006
New Revision: 389941

URL: http://svn.apache.org/viewcvs?rev=389941&view=rev
Log:
- Since we now have per destination usage managers, I updated the MessageStore API so that each destination's store can be maded aware of the manager for that store. 
  Some stores like the journal hold messages around and use the usage manager to know when it should flush to disk.
- Moved alot of the message reference counting logic from the PrefetchSubscription up to it's subclasses since they all seem to do it just a slightly different way.
  I think it makes easier to see now how the usage manager is affected by operations.
- I fixed and verifed that the keepDurableSubsActive=true option actually works.  I think we should make this the default setting since it
  make recovery safer.  Once we have a better spooling implementation we can turn if off again.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Mar 29 16:36:21 2006
@@ -900,6 +900,7 @@
     protected Broker createRegionBroker() throws Exception {
         // we must start the persistence adaptor before we can create the region
         // broker
+        getPersistenceAdapter().setUsageManager(getMemoryManager());
         getPersistenceAdapter().start();
 		RegionBroker regionBroker = null;
         if (isUseJmx()) {
@@ -947,7 +948,6 @@
 
     protected DefaultPersistenceAdapterFactory createPersistenceFactory() {
         DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
-        factory.setMemManager(getMemoryManager());
         factory.setDataDirectory(getDataDirectory());
         factory.setTaskRunnerFactory(getTaskRunnerFactory());
         return factory;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Wed Mar 29 16:36:21 2006
@@ -22,6 +22,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.Map.Entry;
+
 import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -33,6 +34,7 @@
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
@@ -60,6 +62,7 @@
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
 
@@ -160,11 +163,11 @@
         String name="";
         SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
         if(sub.getConsumerInfo().isDurable()){
-            name=key.toString();
-        }
-        if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){
-            name+="."+sub.getConsumerInfo().getConsumerId();
+            name = key.toString();
+        } else {
+            name = sub.getConsumerInfo().getConsumerId().toString();
         }
+        
         try{
             ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
                             +","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+"");

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Wed Mar 29 16:36:21 2006
@@ -24,7 +24,6 @@
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.memory.UsageManager;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Mar 29 16:36:21 2006
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.broker.region;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import java.io.IOException;
+import java.util.Iterator;
+
+import javax.jms.InvalidSelectorException;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -26,10 +29,7 @@
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.util.SubscriptionKey;
 
-import javax.jms.InvalidSelectorException;
-
-import java.io.IOException;
-import java.util.Iterator;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 public class DurableTopicSubscription extends PrefetchSubscription {
     
@@ -103,14 +103,21 @@
             } else {
                 redeliveredMessages.put(node.getMessageId(), new Integer(1));
             }
-            
-            iter.remove();
+            if( keepDurableSubsActive ) {
+                pending.addFirst(node);
+            } else {
+                node.decrementReferenceCount();
+                iter.remove();
+            }
+        }
+        
+        if( !keepDurableSubsActive ) {
+            for (Iterator iter = pending.iterator(); iter.hasNext();) {
+                MessageReference node = (MessageReference) iter.next();
+                node.decrementReferenceCount();
+                iter.remove();
+            }
         }
-        for (Iterator iter = pending.iterator(); iter.hasNext();) {
-            MessageReference node = (MessageReference) iter.next();
-            // node.decrementTargetCount();
-            iter.remove();
-        }        
         prefetchExtension=0;
     }
 
@@ -127,9 +134,8 @@
         if( !active && !keepDurableSubsActive ) {
             return;
         }
-        node = new IndirectMessageReference(node.getRegionDestination(), (Message) node);
+        node.incrementReferenceCount();
         super.add(node);
-        node.decrementReferenceCount();
     }
     
     public int getPendingQueueSize() {
@@ -148,14 +154,10 @@
         return active;
     }
     
-    public synchronized void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
-        super.acknowledge(context, ack);
-    }
-
     protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
         node.getRegionDestination().acknowledge(context, this, ack, node);
         redeliveredMessages.remove(node.getMessageId());
-        ((IndirectMessageReference)node).drop();
+        node.decrementReferenceCount();
     }
     
     public String getSubscriptionName() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Mar 29 16:36:21 2006
@@ -79,7 +79,6 @@
                     try{
                         MessageDispatch md=createMessageDispatch(node,node.getMessage());
                         dispatched.addLast(node);
-                        node.decrementReferenceCount();
                     }catch(Exception e){
                         log.error("Problem processing MessageDispatchNotification: "+mdn,e);
                     }
@@ -166,22 +165,7 @@
                     inAckRange=true;
                 }
                 if(inAckRange){
-                    // Send the message to the DLQ
-                    node.incrementReferenceCount();
-                    try{
-                        Message message=node.getMessage();
-                        if(message!=null){
-                            // The original destination and transaction id do not get filled when the message is first
-                            // sent,
-                            // it is only populated if the message is routed to another destination like the DLQ
-                            DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
-                            ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
-                            BrokerSupport.resend(context, message, deadLetterDestination);
-
-                        }
-                    }finally{
-                        node.decrementReferenceCount();
-                    }
+                    sendToDLQ(context, node);
                     iter.remove();
                     dequeueCounter++;
                     index++;
@@ -200,6 +184,26 @@
         throw new JMSException("Invalid acknowledgment: "+ack);
     }
 
+    /**
+     * @param context
+     * @param node
+     * @throws IOException
+     * @throws Exception
+     */
+    protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
+        // Send the message to the DLQ
+        Message message=node.getMessage();
+        if(message!=null){
+            // The original destination and transaction id do not get filled when the message is first
+            // sent,
+            // it is only populated if the message is routed to another destination like the DLQ
+            DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
+            ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
+            BrokerSupport.resend(context, message, deadLetterDestination);
+
+        }
+    }
+
     protected boolean isFull(){
         return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
     }
@@ -240,11 +244,10 @@
         }
     }
 
-    private void dispatch(final MessageReference node) throws IOException{
-        node.incrementReferenceCount();
+    protected boolean dispatch(final MessageReference node) throws IOException{
         final Message message=node.getMessage();
         if(message==null){
-            return;
+            return false;
         }
         // Make sure we can dispatch a message.
         if(canDispatch(node)&&!isSlaveBroker()){
@@ -264,16 +267,14 @@
                 context.getConnection().dispatchSync(md);
                 onDispatch(node,message);
             }
-            // The onDispatch() does the node.decrementReferenceCount();
-        }else{
-            // We were not allowed to dispatch that message (an other consumer grabbed it before we did)
-            node.decrementReferenceCount();
+            return true;
+        } else {
+            return false;
         }
     }
 
-    synchronized private void onDispatch(final MessageReference node,final Message message){
+    synchronized protected void onDispatch(final MessageReference node,final Message message){
         boolean wasFull=isFull();
-        node.decrementReferenceCount();
         if(node.getRegionDestination()!=null){
             node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
             context.getConnection().getStatistics().onMessageDequeue(message);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Mar 29 16:36:21 2006
@@ -81,9 +81,14 @@
         this.destination = destination;
         this.usageManager = new UsageManager(memoryManager);
         this.usageManager.setLimit(Long.MAX_VALUE);
-        
         this.store = store;
 
+        // Let the store know what usage manager we are using so that he can flush messages to disk
+        // when usage gets high.
+        if( store!=null ) {
+            store.setUsageManager(usageManager);
+        }
+        
         destinationStatistics.setParent(parentStats);
         this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Wed Mar 29 16:36:21 2006
@@ -24,10 +24,11 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
-public class QueueBrowserSubscription extends PrefetchSubscription {
+public class QueueBrowserSubscription extends QueueSubscription {
         
     boolean browseDone;
     
@@ -65,7 +66,15 @@
             return super.createMessageDispatch(node, message);
         }
     }
+    
     public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
         return !browseDone && super.matches(node, context);
     }
+
+    /**
+     * Since we are a browser we don't really remove the message from the queue.
+     */
+    protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Wed Mar 29 16:36:21 2006
@@ -21,6 +21,7 @@
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.transaction.Synchronization;
 
@@ -33,11 +34,7 @@
     public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
         super(broker,context, info);
     }
-    
-    public void add(MessageReference node) throws Exception {
-        super.add(node);
-    }
-    
+        
     /**
      * In the queue case, mark the node as dropped and then a gc cycle will remove it from 
      * the queue.
@@ -138,4 +135,53 @@
         return info.isExclusive();
     }
 
+    /**
+     * Override so that the message ref count is > 0 only when the message is being dispatched
+     * to a client.  Keeping it at 0 when it is in the pending list allows the message to be swapped out
+     * to disk.
+     * 
+     * @return true if the message was dispatched.
+     */
+    protected boolean dispatch(MessageReference node) throws IOException {
+        boolean rc = false;
+        // This brings the message into memory if it was swapped out.
+        node.incrementReferenceCount();
+        try {
+            rc = super.dispatch(node);
+        } finally {
+            // If the message was dispatched, it could be getting dispatched async, so we
+            // can only drop the reference count when that completes @see onDispatch
+            if( !rc ) {
+                node.incrementReferenceCount();
+            }
+        }
+        return rc;
+    }
+
+    /**
+     * OK Message was transmitted, we can now drop the reference count.
+     * 
+     * @see org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.command.Message)
+     */
+    protected void onDispatch(MessageReference node, Message message) {
+        // Now that the message has been sent over the wire to the client, 
+        // we can let it get swapped out.
+        node.decrementReferenceCount();
+        super.onDispatch(node, message);
+    }
+    
+    /**
+     * Sending a message to the DQL will require us to increment the ref count so we can get at the content.
+     */
+    protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException, Exception {
+        // This brings the message into memory if it was swapped out.
+        node.incrementReferenceCount();
+        try{
+            super.sendToDLQ(context, node);
+        } finally {
+            // This let's the message be swapped out of needed.
+            node.decrementReferenceCount();
+        }
+    }
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Mar 29 16:36:21 2006
@@ -75,6 +75,13 @@
         this.store = store;
         this.usageManager = new UsageManager(memoryManager);
         this.usageManager.setLimit(Long.MAX_VALUE);
+        
+        // Let the store know what usage manager we are using so that he can flush messages to disk
+        // when usage gets high.
+        if( store!=null ) {
+            store.setUsageManager(usageManager);
+        }
+
         this.destinationStatistics.setParent(parentStats);
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Wed Mar 29 16:36:21 2006
@@ -171,10 +171,13 @@
                 DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
                 ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
                 if( sub == null ) { 
-                    sub = (DurableTopicSubscription) createSubscription(context, consumerInfo );
+                    ConnectionContext c = new ConnectionContext();
+                    c.setBroker(context.getBroker());
+                    c.setClientId(key.getClientId());
+                    c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
+                    sub = (DurableTopicSubscription) createSubscription(c, consumerInfo );
                 }
                 
-                subscriptions.put(consumerInfo.getConsumerId(), sub);
                 topic.addSubscription(context, sub);
             }            
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java Wed Mar 29 16:36:21 2006
@@ -43,7 +43,7 @@
     private long usage;
     
     private int percentUsage;
-    private int percentUsageMinDelta=10;
+    private int percentUsageMinDelta=1;
     
     private final Object usageMutex = new Object();
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java Wed Mar 29 16:36:21 2006
@@ -23,7 +23,6 @@
 
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.active.JournalImpl;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.Statements;
@@ -42,7 +41,6 @@
     private int journalLogFileSize = 1024*1024*20;
     private int journalLogFiles = 2;
     private File dataDirectory;
-    private UsageManager memManager;
     private DataSource dataSource;
     private TaskRunnerFactory taskRunnerFactory;
     private Journal journal;
@@ -60,9 +58,9 @@
         
         // Setup the Journal
         if( useQuickJournal ) {
-            return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory());
+            return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
         }  else {
-            return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory());
+            return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
         }
     }
 
@@ -91,17 +89,6 @@
 
     public void setJournalLogFileSize(int journalLogFileSize) {
         this.journalLogFileSize = journalLogFileSize;
-    }
-
-    public UsageManager getMemManager() {
-        if( memManager==null ) {
-            memManager = new UsageManager();
-        }
-        return memManager;
-    }
-
-    public void setMemManager(UsageManager memManager) {
-        this.memManager = memManager;
     }
     
     public DataSource getDataSource() throws IOException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Wed Mar 29 16:36:21 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
 
 /**
  * Represents a message store which is used by the persistent {@link org.apache.activemq.service.MessageContainer}
@@ -92,5 +93,10 @@
      * @return
      */
     public ActiveMQDestination getDestination();
-        
+
+    /**
+     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager);
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java Wed Mar 29 16:36:21 2006
@@ -20,6 +20,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.memory.UsageManager;
 
 import java.io.IOException;
 import java.util.Set;
@@ -97,5 +98,11 @@
     
     public boolean isUseExternalMessageReferences();
     public void setUseExternalMessageReferences(boolean enable);
+    
+    /**
+     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager);
+
    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java Wed Mar 29 16:36:21 2006
@@ -23,6 +23,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
 
 /**
  * A simple proxy that delegates to another MessageStore.
@@ -70,5 +71,9 @@
 
     public String getMessageReference(MessageId identity) throws IOException {
         return delegate.getMessageReference(identity);
+    }
+
+    public void setUsageManager(UsageManager usageManager) {
+        delegate.setUsageManager(usageManager);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java Wed Mar 29 16:36:21 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.memory.UsageManager;
 
 /**
  * A simple proxy that delegates to another MessageStore.
@@ -93,5 +94,9 @@
 
     public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return delegate.getAllSubscriptions();
+    }
+    
+    public void setUsageManager(UsageManager usageManager) {
+        delegate.setUsageManager(usageManager);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Wed Mar 29 16:36:21 2006
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -196,4 +197,7 @@
         return destination;
     }
 
+    public void setUsageManager(UsageManager usageManager) {
+        // we can ignore since we don't buffer up messages.
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Wed Mar 29 16:36:21 2006
@@ -28,6 +28,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -355,6 +356,12 @@
 
     public void setStatements(Statements statements) {
         this.statements = statements;
+    }
+
+    /**
+     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager) {
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Wed Mar 29 16:36:21 2006
@@ -30,6 +30,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -62,6 +63,8 @@
     
     protected RecordLocation lastLocation;
     protected HashSet inFlightTxLocations = new HashSet();
+
+    private UsageManager usageManager;
     
     public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
         this.peristenceAdapter = adapter;
@@ -70,6 +73,12 @@
         this.destination = destination;
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
     }
+    
+    public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
+        longTermStore.setUsageManager(usageManager);
+    }
+
 
     /**
      * Not synchronized since the Journal has better throughput if you increase
@@ -334,11 +343,15 @@
     }
 
     public void start() throws Exception {
+        if( this.usageManager != null )
+            this.usageManager.addUsageListener(peristenceAdapter);
         longTermStore.start();
     }
 
     public void stop() throws Exception {
         longTermStore.stop();
+        if( this.usageManager != null )
+            this.usageManager.removeUsageListener(peristenceAdapter);
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Wed Mar 29 16:36:21 2006
@@ -81,13 +81,13 @@
 
     private final Journal journal;
     private final PersistenceAdapter longTermPersistence;
-    final UsageManager usageManager;
 
     private final WireFormat wireFormat = new OpenWireFormat();
 
     private final ConcurrentHashMap queues = new ConcurrentHashMap();
     private final ConcurrentHashMap topics = new ConcurrentHashMap();
     
+    private UsageManager usageManager;
     private long checkpointInterval = 1000 * 60 * 5;
     private long lastCheckpointRequest = System.currentTimeMillis();
     private long lastCleanup = System.currentTimeMillis();
@@ -111,7 +111,7 @@
         }
     };
     
-    public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, UsageManager memManager, TaskRunnerFactory taskRunnerFactory) throws IOException {
+    public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
 
         this.journal = journal;
         journal.setJournalEventListener(this);
@@ -123,7 +123,14 @@
         });
 
         this.longTermPersistence = longTermPersistence;
-        this.usageManager = memManager;
+    }
+
+    /**
+     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
+        longTermPersistence.setUsageManager(usageManager);
     }
 
     public Set getDestinations() {
@@ -216,6 +223,7 @@
 
     public void stop() throws Exception {
         
+        this.usageManager.removeUsageListener(this);
         if( !started.compareAndSet(true, false) )
             return;
         

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java Wed Mar 29 16:36:21 2006
@@ -31,6 +31,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -63,6 +64,8 @@
     
     protected RecordLocation lastLocation;
     protected HashSet inFlightTxLocations = new HashSet();
+
+    private UsageManager usageManager;
     
     public QuickJournalMessageStore(QuickJournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
         this.peristenceAdapter = adapter;
@@ -71,6 +74,11 @@
         this.destination = destination;
         this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
     }
+    
+    public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
+        longTermStore.setUsageManager(usageManager);
+    }
 
     /**
      * Not synchronized since the Journal has better throughput if you increase
@@ -368,11 +376,15 @@
     }
 
     public void start() throws Exception {
+        if( this.usageManager != null )
+            this.usageManager.addUsageListener(peristenceAdapter);
         longTermStore.start();
     }
 
     public void stop() throws Exception {
         longTermStore.stop();
+        if( this.usageManager != null )
+            this.usageManager.removeUsageListener(peristenceAdapter);
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java Wed Mar 29 16:36:21 2006
@@ -81,13 +81,13 @@
 
     private final Journal journal;
     private final PersistenceAdapter longTermPersistence;
-    final UsageManager usageManager;
 
     private final WireFormat wireFormat = new OpenWireFormat();
 
     private final ConcurrentHashMap queues = new ConcurrentHashMap();
     private final ConcurrentHashMap topics = new ConcurrentHashMap();
     
+    private UsageManager usageManager;
     private long checkpointInterval = 1000 * 60 * 5;
     private long lastCheckpointRequest = System.currentTimeMillis();
     private long lastCleanup = System.currentTimeMillis();
@@ -111,7 +111,7 @@
         }
     };
     
-    public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, UsageManager memManager, TaskRunnerFactory taskRunnerFactory) throws IOException {
+    public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
 
         this.journal = journal;
         journal.setJournalEventListener(this);
@@ -123,7 +123,14 @@
         });
 
         this.longTermPersistence = longTermPersistence;
-        this.usageManager = memManager;
+    }
+    
+    /**
+     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager) {
+        this.usageManager = usageManager;
+        longTermPersistence.setUsageManager(usageManager);
     }
 
     public Set getDestinations() {
@@ -216,6 +223,7 @@
 
     public void stop() throws Exception {
         
+        this.usageManager.removeUsageListener(this);
         if( !started.compareAndSet(true, false) )
             return;
         

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Wed Mar 29 16:36:21 2006
@@ -22,6 +22,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 /**
@@ -90,4 +91,11 @@
     public void delete(){
         messageContainer.clear();
     }
+    
+    /**
+     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager) {
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java Wed Mar 29 16:36:21 2006
@@ -28,6 +28,7 @@
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
 import org.apache.activemq.kaha.StringMarshaller;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
@@ -149,5 +150,11 @@
         }
         container.load();
         return container;
+    }
+
+    /**
+     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager) {
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Wed Mar 29 16:36:21 2006
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 
@@ -102,6 +103,12 @@
 
     public void delete() {
         messageTable.clear();
+    }
+    
+    /**
+     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager) {
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java Wed Mar 29 16:36:21 2006
@@ -25,6 +25,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
@@ -147,4 +148,9 @@
         return null;
     }
 
+    /**
+     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     */
+    public void setUsageManager(UsageManager usageManager) {
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java?rev=389941&r1=389940&r2=389941&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java Wed Mar 29 16:36:21 2006
@@ -16,15 +16,14 @@
  */
 package org.apache.activemq.broker.policy;
 
+import java.util.Iterator;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TopicSubscriptionTest;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
 import org.apache.activemq.util.MessageIdList;
-
-import java.util.List;
-import java.util.Iterator;
 
 public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
 



Mime
View raw message