activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r567647 [1/3] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jmx/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/...
Date Mon, 20 Aug 2007 10:37:34 GMT
Author: rajdavies
Date: Mon Aug 20 03:37:29 2007
New Revision: 567647

URL: http://svn.apache.org/viewvc?rev=567647&view=rev
Log:
UpdateManager changed to account for Store and Temp data usage as well as memory usage

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/Cache.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntry.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEntryList.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictionUsageListener.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheEvictor.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/CacheFilter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/LRUMap.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/MapCache.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManagerCacheFilter.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/DefaultUsageCapacity.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/StoreUsage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/SystemUsage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/TempUsage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/UsageCapacity.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/UsageListener.java
      - copied, changed from r565381, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageListener.java
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageListener.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/config/ConfigTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/hash/HashTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/tree/TreeTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/memory/buffer/MemoryBufferTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/memory/buffer/OrderBasedMemoryBufferTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/memory/buffer/SizeBasedMessageBufferTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/proxy/ProxyTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQDeadlockTestW4Brokers.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQFailoverIssue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/AMQStackOverFlowTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Mon Aug 20 03:37:29 2007
@@ -33,7 +33,7 @@
 import org.apache.activemq.management.JMSProducerStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.util.IntrospectionSupport;
 
 /**
@@ -77,7 +77,7 @@
     private AtomicLong messageSequence;
     private long startTime;
     private MessageTransformer transformer;
-    private UsageManager producerWindow;
+    private MemoryUsage producerWindow;
 
     protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException {
         super(session);
@@ -92,7 +92,7 @@
         // Enable producer window flow control if protocol > 3 and the window
         // size > 0
         if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
-            producerWindow = new UsageManager("Producer Window: " + producerId);
+            producerWindow = new MemoryUsage("Producer Window: " + producerId);
             producerWindow.setLimit(this.info.getWindowSize());
         }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Mon Aug 20 03:37:29 2007
@@ -79,9 +79,11 @@
 import org.apache.activemq.management.JMSSessionStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.Callback;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.commons.logging.Log;
@@ -1532,7 +1534,7 @@
      * @throws JMSException
      */
     protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
-                        UsageManager producerWindow) throws JMSException {
+                        MemoryUsage producerWindow) throws JMSException {
 
         checkClosed();
         if (destination.isTemporary() && connection.isDeleted(destination)) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Aug 20 03:37:29 2007
@@ -62,7 +62,6 @@
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.network.ConnectionFilter;
 import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
@@ -78,6 +77,7 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.JMXSupport;
@@ -118,9 +118,9 @@
     private ObjectName brokerObjectName;
     private TaskRunnerFactory taskRunnerFactory;
     private TaskRunnerFactory persistenceTaskRunnerFactory;
-    private UsageManager usageManager;
-    private UsageManager producerUsageManager;
-    private UsageManager consumerUsageManager;
+    private SystemUsage usageManager;
+    private SystemUsage producerSystemUsage;
+    private SystemUsage consumerSystemUsage;
     private PersistenceAdapter persistenceAdapter;
     private PersistenceAdapterFactory persistenceFactory;
     private DestinationFactory destinationFactory;
@@ -646,51 +646,61 @@
         this.populateJMSXUserID = populateJMSXUserID;
     }
 
-    public UsageManager getMemoryManager() {
+    public SystemUsage getUsageManager() {
+        try {
         if (usageManager == null) {
-            usageManager = new UsageManager("Main");
-            usageManager.setLimit(1024 * 1024 * 64); // Default to 64 Meg
-            // limit
+            usageManager = new SystemUsage("Main",getPersistenceAdapter(),getTempDataStore());
+            usageManager.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default to 64 Meg
+            usageManager.getTempDiskUsage().setLimit(1024 * 1024 * 1024 * 100);//10 Gb
+            usageManager.getStoreUsage().setLimit(1024 * 1024 * 1024 * 100); //100 GB
         }
         return usageManager;
+        }catch(IOException e) {
+            LOG.fatal("Cannot create SystemUsage",e);
+            throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
+        }
     }
 
-    public void setMemoryManager(UsageManager memoryManager) {
+    public void setUsageManager(SystemUsage memoryManager) {
         this.usageManager = memoryManager;
     }
 
     /**
      * @return the consumerUsageManager
+     * @throws IOException 
      */
-    public UsageManager getConsumerUsageManager() {
-        if (consumerUsageManager == null) {
-            consumerUsageManager = new UsageManager(getMemoryManager(), "Consumer", 0.5f);
+    public SystemUsage getConsumerSystemUsage() throws IOException {
+        if (consumerSystemUsage == null) {
+            consumerSystemUsage = new SystemUsage(getUsageManager(), "Consumer");
+            consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
         }
-        return consumerUsageManager;
+        return consumerSystemUsage;
     }
 
     /**
      * @param consumerUsageManager the consumerUsageManager to set
      */
-    public void setConsumerUsageManager(UsageManager consumerUsageManager) {
-        this.consumerUsageManager = consumerUsageManager;
+    public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
+        this.consumerSystemUsage = consumerUsageManager;
     }
 
     /**
      * @return the producerUsageManager
+     * @throws IOException 
      */
-    public UsageManager getProducerUsageManager() {
-        if (producerUsageManager == null) {
-            producerUsageManager = new UsageManager(getMemoryManager(), "Producer", 0.45f);
+    public SystemUsage getProducerSystemUsage() throws IOException {
+        if (producerSystemUsage == null) {
+            producerSystemUsage = new SystemUsage(getUsageManager(), "Producer");
+            producerSystemUsage.getMemoryUsage().setUsagePortion(0.45f);
         }
-        return producerUsageManager;
+        return producerSystemUsage;
     }
 
     /**
      * @param producerUsageManager the producerUsageManager to set
      */
-    public void setProducerUsageManager(UsageManager producerUsageManager) {
-        this.producerUsageManager = producerUsageManager;
+    public void setProducerSystemUsage(SystemUsage producerUsageManager) {
+        this.producerSystemUsage = producerUsageManager;
     }
 
     public PersistenceAdapter getPersistenceAdapter() throws IOException {
@@ -1377,7 +1387,7 @@
     protected Broker createRegionBroker() throws Exception {
         // we must start the persistence adaptor before we can create the region
         // broker
-        getPersistenceAdapter().setUsageManager(getProducerUsageManager());
+        getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
         getPersistenceAdapter().setBrokerName(getBrokerName());
         if (this.deleteAllMessagesOnStartup) {
             getPersistenceAdapter().deleteAllMessages();
@@ -1392,14 +1402,14 @@
         }
         RegionBroker regionBroker = null;
         if (destinationFactory == null) {
-            destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(), getPersistenceAdapter());
+            destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter());
         }
         if (isUseJmx()) {
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
-            regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory,
+            regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
                                                    destinationInterceptor);
         } else {
-            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor);
+            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
         }
         destinationFactory.setRegionBroker(regionBroker);
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Mon Aug 20 03:37:29 2007
@@ -85,15 +85,15 @@
     }
 
     public int getMemoryPercentageUsed() {
-        return brokerService.getMemoryManager().getPercentUsage();
+        return brokerService.getUsageManager().getMemoryUsage().getPercentUsage();
     }
 
     public long getMemoryLimit() {
-        return brokerService.getMemoryManager().getLimit();
+        return brokerService.getUsageManager().getMemoryUsage().getLimit();
     }
 
     public void setMemoryLimit(long limit) {
-        brokerService.getMemoryManager().setLimit(limit);
+        brokerService.getUsageManager().getMemoryUsage().setLimit(limit);
     }
 
     public void resetStatistics() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Mon Aug 20 03:37:29 2007
@@ -94,15 +94,15 @@
     }
 
     public int getMemoryPercentageUsed() {
-        return destination.getUsageManager().getPercentUsage();
+        return destination.getBrokerMemoryUsage().getPercentUsage();
     }
 
     public long getMemoryLimit() {
-        return destination.getUsageManager().getLimit();
+        return destination.getBrokerMemoryUsage().getLimit();
     }
 
     public void setMemoryLimit(long limit) {
-        destination.getUsageManager().setLimit(limit);
+        destination.getBrokerMemoryUsage().setLimit(limit);
     }
 
     public double getAverageEnqueueTime() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedQueueRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 
 public class ManagedQueueRegion extends QueueRegion {
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                               DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         regionBroker = broker;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Mon Aug 20 03:37:29 2007
@@ -61,11 +61,11 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.SubscriptionKey;
@@ -92,7 +92,7 @@
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
 
-    public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager,
+    public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
                                DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
         super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
         this.mbeanServer = mbeanServer;
@@ -121,19 +121,19 @@
         registeredMBeans.clear();
     }
 
-    protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
 import org.apache.activemq.broker.region.TempQueueRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 
 public class ManagedTempQueueRegion extends TempQueueRegion {
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                                   DestinationFactory destinationFactory) {
         super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         this.regionBroker = regionBroker;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempTopicRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
 import org.apache.activemq.broker.region.TempTopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 
 public class ManagedTempTopicRegion extends TempTopicRegion {
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                                   DestinationFactory destinationFactory) {
         super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         this.regionBroker = regionBroker;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java Mon Aug 20 03:37:29 2007
@@ -27,14 +27,14 @@
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 
 public class ManagedTopicRegion extends TopicRegion {
 
     private final ManagedRegionBroker regionBroker;
 
-    public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                               DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         regionBroker = broker;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Mon Aug 20 03:37:29 2007
@@ -41,8 +41,8 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.DestinationMap;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -56,7 +56,7 @@
     protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     protected final DestinationMap destinationMap = new DestinationMap();
     protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
-    protected final UsageManager memoryManager;
+    protected final SystemUsage memoryManager;
     protected final DestinationFactory destinationFactory;
     protected final DestinationStatistics destinationStatistics;
     protected final RegionBroker broker;
@@ -66,7 +66,7 @@
     protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
     protected boolean started;
 
-    public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                           DestinationFactory destinationFactory) {
         if (broker == null) {
             throw new IllegalArgumentException("null broker");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java Mon Aug 20 03:37:29 2007
@@ -25,8 +25,9 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * @version $Revision: 1.12 $
@@ -47,7 +48,7 @@
 
     ActiveMQDestination getActiveMQDestination();
 
-    UsageManager getUsageManager();
+    MemoryUsage getBrokerMemoryUsage();
 
     void dispose(ConnectionContext context) throws IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Mon Aug 20 03:37:29 2007
@@ -29,11 +29,11 @@
 import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * Creates standard ActiveMQ implementations of
@@ -44,12 +44,12 @@
  */
 public class DestinationFactoryImpl extends DestinationFactory {
 
-    protected final UsageManager memoryManager;
+    protected final SystemUsage memoryManager;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final PersistenceAdapter persistenceAdapter;
     protected RegionBroker broker;
 
-    public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
+    public DestinationFactoryImpl(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
         this.memoryManager = memoryManager;
         this.taskRunnerFactory = taskRunnerFactory;
         if (persistenceAdapter == null) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java Mon Aug 20 03:37:29 2007
@@ -27,8 +27,9 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * 
@@ -79,8 +80,8 @@
         return next.getName();
     }
 
-    public UsageManager getUsageManager() {
-        return next.getUsageManager();
+    public MemoryUsage getBrokerMemoryUsage() {
+        return next.getBrokerMemoryUsage();
     }
 
     public boolean lock(MessageReference node, LockOwner lockOwner) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Mon Aug 20 03:37:29 2007
@@ -29,8 +29,9 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,10 +43,10 @@
     private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final SubscriptionKey subscriptionKey;
     private final boolean keepDurableSubsActive;
-    private final UsageManager usageManager;
+    private final SystemUsage usageManager;
     private boolean active;
 
-    public DurableTopicSubscription(Broker broker, UsageManager usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
+    public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
         throws InvalidSelectorException {
         super(broker, context, info, new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize()));
         this.usageManager = usageManager;
@@ -77,7 +78,7 @@
         dispatchMatched();
     }
 
-    public synchronized void activate(UsageManager memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
+    public synchronized void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
         LOG.debug("Activating " + this);
         if (!active) {
             this.active = true;
@@ -89,7 +90,7 @@
                     topic.activate(context, this);
                 }
             }
-            pending.setUsageManager(memoryManager);
+            pending.setSystemUsage(memoryManager);
             pending.start();
 
             // If nothing was in the persistent store, then try to use the
@@ -101,13 +102,13 @@
                 }
             }
             dispatchMatched();
-            this.usageManager.addUsageListener(this);
+            this.usageManager.getMemoryUsage().addUsageListener(this);
         }
     }
 
     public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception {
         active = false;
-        this.usageManager.removeUsageListener(this);
+        this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (pending) {
             pending.stop();
         }
@@ -239,10 +240,10 @@
      * @param memoryManager
      * @param oldPercentUsage
      * @param newPercentUsage
-     * @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager,
+     * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
      *      int, int)
      */
-    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
             try {
                 dispatchMatched();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Aug 20 03:37:29 2007
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region;
 
 import java.io.IOException;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -51,7 +52,6 @@
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
@@ -60,6 +60,8 @@
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -78,7 +80,8 @@
     private final ActiveMQDestination destination;
     private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
     private final Valve dispatchValve = new Valve(true);
-    private final UsageManager usageManager;
+    private final SystemUsage systemUsage;
+    private final MemoryUsage memoryUsage;
     private final DestinationStatistics destinationStatistics = new DestinationStatistics();
     private PendingMessageCursor messages;
     private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>();
@@ -107,12 +110,13 @@
         };
     };
 
-    public Queue(Broker broker, ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
+    public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
         this.broker = broker;
         this.destination = destination;
-        this.usageManager = new UsageManager(memoryManager, destination.toString());
-        this.usageManager.setUsagePortion(1.0f);
+        this.systemUsage=systemUsage;
+        this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
+        this.memoryUsage.setUsagePortion(1.0f);
         this.store = store;
         if (destination.isTemporary()) {
             this.messages = new VMPendingMessageCursor();
@@ -126,7 +130,7 @@
         // flush messages to disk
         // when usage gets high.
         if (store != null) {
-            store.setUsageManager(usageManager);
+            store.setMemoryUsage(memoryUsage);
         }
 
         // let's copy the enabled property from the parent DestinationStatistics
@@ -139,7 +143,7 @@
     public void initialize() throws Exception {
         if (store != null) {
             // Restore the persistent messages.
-            messages.setUsageManager(getUsageManager());
+            messages.setSystemUsage(systemUsage);
             if (messages.isRecoveryRequired()) {
                 store.recover(new MessageRecoveryListener() {
 
@@ -359,9 +363,9 @@
             }
             return;
         }
-        if (context.isProducerFlowControl() && usageManager.isFull()) {
-            if (usageManager.isSendFailIfNoSpace()) {
-                throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
+        if (context.isProducerFlowControl() && memoryUsage.isFull()) {
+            if (systemUsage.isSendFailIfNoSpace()) {
+                throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
             }
 
             // We can avoid blocking due to low usage if the producer is sending
@@ -404,7 +408,7 @@
 
                     // If the user manager is not full, then the task will not
                     // get called..
-                    if (!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+                    if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
                         // so call it directly here.
                         sendMessagesWaitingForSpaceTask.run();
                     }
@@ -417,7 +421,7 @@
                 // Producer flow control cannot be used, so we have do the flow
                 // control at the broker
                 // by blocking this thread until there is space available.
-                while (!usageManager.waitForSpace(1000)) {
+                while (!memoryUsage.waitForSpace(1000)) {
                     if (context.getStopping().get()) {
                         throw new IOException("Connection closed, send aborted.");
                     }
@@ -444,6 +448,7 @@
         final ConnectionContext context = producerExchange.getConnectionContext();
         message.setRegionDestination(this);
         if (store != null && message.isPersistent()) {
+            systemUsage.getStoreUsage().waitForSpace();
             store.addMessage(context, message);
         }
         if (context.isInTransaction()) {
@@ -552,13 +557,13 @@
         synchronized (messages) {
             size = messages.size();
         }
-        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage() + "%, size=" + size
+        return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size
                + ", in flight groups=" + messageGroupOwners;
     }
 
     public void start() throws Exception {
-        if (usageManager != null) {
-            usageManager.start();
+        if (memoryUsage != null) {
+            memoryUsage.start();
         }
         messages.start();
         doPageIn(false);
@@ -571,8 +576,8 @@
         if (messages != null) {
             messages.stop();
         }
-        if (usageManager != null) {
-            usageManager.stop();
+        if (memoryUsage != null) {
+            memoryUsage.stop();
         }
     }
 
@@ -586,8 +591,8 @@
         return destination.getPhysicalName();
     }
 
-    public UsageManager getUsageManager() {
-        return usageManager;
+    public MemoryUsage getBrokerMemoryUsage() {
+        return memoryUsage;
     }
 
     public DestinationStatistics getDestinationStatistics() {
@@ -926,7 +931,7 @@
      */
     public boolean iterate() {
 
-        while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+        while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
             Runnable op = messagesWaitingForSpace.removeFirst();
             op.run();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Mon Aug 20 03:37:29 2007
@@ -24,8 +24,8 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * 
@@ -34,14 +34,14 @@
 public class QueueRegion extends AbstractRegion {
 
     public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics,
-                       UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+                       SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                        DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
     public String toString() {
         return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
-               + ", memory=" + memoryManager.getPercentUsage() + "%";
+               + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Aug 20 03:37:29 2007
@@ -55,11 +55,11 @@
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.state.ConnectionState;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
@@ -99,7 +99,7 @@
     private final DestinationInterceptor destinationInterceptor;
     private ConnectionContext adminConnectionContext;
 
-    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory,
+    public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
                         DestinationInterceptor destinationInterceptor) throws IOException {
         this.brokerService = brokerService;
         if (destinationFactory == null) {
@@ -158,19 +158,19 @@
         return topicRegion;
     }
 
-    protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 
-    protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
+    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
         return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Mon Aug 20 03:37:29 2007
@@ -23,15 +23,15 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * @version $Revision: 1.7 $
  */
 public class TempQueueRegion extends AbstractRegion {
 
-    public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                            DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         // We should allow the following to be configurable via a Destination
@@ -65,7 +65,7 @@
     }
 
     public String toString() {
-        return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
+        return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Mon Aug 20 03:37:29 2007
@@ -22,8 +22,8 @@
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -34,7 +34,7 @@
 
     private static final Log LOG = LogFactory.getLog(TempTopicRegion.class);
 
-    public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                            DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
         // We should allow the following to be configurable via a Destination
@@ -67,7 +67,7 @@
     }
 
     public String toString() {
-        return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
+        return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Aug 20 03:37:29 2007
@@ -43,13 +43,14 @@
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.filter.MessageEvaluationContext;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,7 +68,8 @@
     protected final Valve dispatchValve = new Valve(true);
     // this could be NULL! (If an advisory)
     protected final TopicMessageStore store;
-    protected final UsageManager usageManager;
+    private final SystemUsage systemUsage;
+    private final MemoryUsage memoryUsage;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
 
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
@@ -85,7 +87,7 @@
             // that the UsageManager is holding.
 
             synchronized (messagesWaitingForSpace) {
-                while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+                while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
                     Runnable op = messagesWaitingForSpace.removeFirst();
                     op.run();
                 }
@@ -95,19 +97,20 @@
     };
     private final Broker broker;
 
-    public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
+    public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) {
         this.broker = broker;
         this.destination = destination;
         this.store = store; // this could be NULL! (If an advisory)
-        this.usageManager = new UsageManager(memoryManager, destination.toString());
-        this.usageManager.setUsagePortion(1.0f);
+        this.systemUsage=systemUsage;
+        this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
+        this.memoryUsage.setUsagePortion(1.0f);
 
         // Let the store know what usage manager we are using so that he can
         // flush messages to disk
         // when usage gets high.
         if (store != null) {
-            store.setUsageManager(usageManager);
+            store.setMemoryUsage(memoryUsage);
         }
 
         // let's copy the enabled property from the parent DestinationStatistics
@@ -206,22 +209,16 @@
                 }
             }
             // Do we need to create the subscription?
-            if (info == null) {
-                info = new SubscriptionInfo();
+            if(info==null){
+                info=new SubscriptionInfo();
                 info.setClientId(clientId);
                 info.setSelector(selector);
                 info.setSubscriptionName(subscriptionName);
-                info.setDestination(getActiveMQDestination()); // This
-                // destination
-                // is an actual
-                // destination
-                // id.
-                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This
-                // destination
-                // might
-                // be a
-                // pattern
-                store.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
+                info.setDestination(getActiveMQDestination()); 
+                // Thi destination is an actual destination id.
+                info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); 
+                // This destination might be a pattern
+                store.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
             }
 
             final MessageEvaluationContext msgContext = new MessageEvaluationContext();
@@ -287,8 +284,8 @@
             return;
         }
 
-        if (context.isProducerFlowControl() && usageManager.isFull()) {
-            if (usageManager.isSendFailIfNoSpace()) {
+        if (context.isProducerFlowControl() && memoryUsage.isFull()) {
+            if (systemUsage.isSendFailIfNoSpace()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
             }
 
@@ -327,7 +324,7 @@
 
                     // If the user manager is not full, then the task will not
                     // get called..
-                    if (!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+                    if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
                         // so call it directly here.
                         sendMessagesWaitingForSpaceTask.run();
                     }
@@ -340,7 +337,7 @@
                 // Producer flow control cannot be used, so we have do the flow
                 // control at the broker
                 // by blocking this thread until there is space available.
-                while (!usageManager.waitForSpace(1000)) {
+                while (!memoryUsage.waitForSpace(1000)) {
                     if (context.getStopping().get()) {
                         throw new IOException("Connection closed, send aborted.");
                     }
@@ -365,6 +362,7 @@
         message.setRegionDestination(this);
 
         if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
+            systemUsage.getStoreUsage().waitForSpace();
             store.addMessage(context, message);
         }
 
@@ -427,16 +425,16 @@
 
     public void start() throws Exception {
         this.subscriptionRecoveryPolicy.start();
-        if (usageManager != null) {
-            usageManager.start();
+        if (memoryUsage != null) {
+            memoryUsage.start();
         }
 
     }
 
     public void stop() throws Exception {
         this.subscriptionRecoveryPolicy.stop();
-        if (usageManager != null) {
-            usageManager.stop();
+        if (memoryUsage != null) {
+            memoryUsage.stop();
         }
     }
 
@@ -474,8 +472,8 @@
     // Properties
     // -------------------------------------------------------------------------
 
-    public UsageManager getUsageManager() {
-        return usageManager;
+    public MemoryUsage getBrokerMemoryUsage() {
+        return memoryUsage;
     }
 
     public DestinationStatistics getDestinationStatistics() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Mon Aug 20 03:37:29 2007
@@ -35,9 +35,9 @@
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.commons.logging.Log;
@@ -53,7 +53,7 @@
     private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
     private boolean keepDurableSubsActive;
 
-    public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
+    public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
                        DestinationFactory destinationFactory) {
         super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
 
@@ -140,7 +140,7 @@
     }
 
     public String toString() {
-        return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
+        return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Mon Aug 20 03:37:29 2007
@@ -36,8 +36,8 @@
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -47,7 +47,7 @@
     private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
     
     protected PendingMessageCursor matched;
-    protected final UsageManager usageManager;
+    protected final SystemUsage usageManager;
     protected AtomicLong dispatchedCounter = new AtomicLong();
     protected AtomicLong prefetchExtension = new AtomicLong();
     
@@ -62,7 +62,7 @@
     private final AtomicLong dequeueCounter = new AtomicLong(0);
     private int memoryUsageHighWaterMark = 95;
 
-    public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws Exception {
+    public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
         super(broker, context, info);
         this.usageManager = usageManager;
         String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
@@ -71,7 +71,7 @@
     }
 
     public void init() throws Exception {
-        this.matched.setUsageManager(usageManager);
+        this.matched.setSystemUsage(usageManager);
         this.matched.start();
     }
 
@@ -317,7 +317,7 @@
     /**
      * @return the usageManager
      */
-    public UsageManager getUsageManager() {
+    public SystemUsage getUsageManager() {
         return this.usageManager;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Aug 20 03:37:29 2007
@@ -20,7 +20,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * Abstract method holder for pending message (messages awaiting disptach to a
@@ -31,7 +31,7 @@
 public class AbstractPendingMessageCursor implements PendingMessageCursor {
     protected int memoryUsageHighWaterMark = 90;
     protected int maxBatchSize = 100;
-    protected UsageManager usageManager;
+    protected SystemUsage systemUsage;
 
     public void start() throws Exception {
     }
@@ -110,16 +110,16 @@
     public void gc() {
     }
 
-    public void setUsageManager(UsageManager usageManager) {
-        this.usageManager = usageManager;
+    public void setSystemUsage(SystemUsage usageManager) {
+        this.systemUsage = usageManager;
     }
 
     public boolean hasSpace() {
-        return usageManager != null ? (usageManager.getPercentUsage() < memoryUsageHighWaterMark) : true;
+        return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
     }
 
     public boolean isFull() {
-        return usageManager != null ? usageManager.isFull() : false;
+        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
     }
 
     public void release() {
@@ -146,8 +146,8 @@
     /**
      * @return the usageManager
      */
-    public UsageManager getUsageManager() {
-        return this.usageManager;
+    public SystemUsage getSystemUsage() {
+        return this.systemUsage;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Mon Aug 20 03:37:29 2007
@@ -27,9 +27,10 @@
 import org.apache.activemq.kaha.CommandMarshaller;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageListener;
-import org.apache.activemq.memory.UsageManager;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -65,8 +66,8 @@
 
     public void start() {
         if (started.compareAndSet(false, true)) {
-            if (usageManager != null) {
-                usageManager.addUsageListener(this);
+            if (systemUsage != null) {
+                systemUsage.getMemoryUsage().addUsageListener(this);
             }
         }
     }
@@ -74,8 +75,8 @@
     public void stop() {
         if (started.compareAndSet(true, false)) {
             gc();
-            if (usageManager != null) {
-                usageManager.removeUsageListener(this);
+            if (systemUsage != null) {
+                systemUsage.getMemoryUsage().removeUsageListener(this);
             }
         }
     }
@@ -147,9 +148,10 @@
             } else {
                 flushToDisk();
                 node.decrementReferenceCount();
+                systemUsage.getTempDiskUsage().waitForSpace();
                 getDiskList().addLast(node);
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
@@ -166,10 +168,11 @@
                 memoryList.addFirst(node);
             } else {
                 flushToDisk();
+                systemUsage.getTempDiskUsage().waitForSpace();
                 node.decrementReferenceCount();
                 getDiskList().addFirst(node);
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
@@ -238,12 +241,12 @@
         return !isEmpty();
     }
 
-    public void setUsageManager(UsageManager usageManager) {
-        super.setUsageManager(usageManager);
-        usageManager.addUsageListener(this);
+    public void setSystemUsage(SystemUsage usageManager) {
+        super.setSystemUsage(usageManager);
+        usageManager.getMemoryUsage().addUsageListener(this);
     }
 
-    public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
+    public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
             synchronized (this) {
                 flushRequired = true;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Aug 20 03:37:29 2007
@@ -23,7 +23,7 @@
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
 
 /**
  * Interface to pending message (messages awaiting disptach to a consumer)
@@ -166,15 +166,15 @@
     /**
      * Set the UsageManager
      * 
-     * @param usageManager
-     * @see org.apache.activemq.memory.UsageManager
+     * @param systemUsage
+     * @see org.apache.activemq.usage.SystemUsage
      */
-    void setUsageManager(UsageManager usageManager);
+    void setSystemUsage(SystemUsage systemUsage);
 
     /**
      * @return the usageManager
      */
-    UsageManager getUsageManager();
+    SystemUsage getSystemUsage();
 
     /**
      * @return the memoryUsageHighWaterMark

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Aug 20 03:37:29 2007
@@ -29,7 +29,7 @@
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -96,7 +96,7 @@
         if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
             TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName);
             tsp.setMaxBatchSize(getMaxBatchSize());
-            tsp.setUsageManager(usageManager);
+            tsp.setSystemUsage(systemUsage);
             topics.put(destination, tsp);
             storePrefetches.add(tsp);
             if (started) {
@@ -244,11 +244,11 @@
         }
     }
 
-    public synchronized void setUsageManager(UsageManager usageManager) {
-        super.setUsageManager(usageManager);
+    public synchronized void setSystemUsage(SystemUsage usageManager) {
+        super.setSystemUsage(usageManager);
         for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) {
             PendingMessageCursor tsp = i.next();
-            tsp.setUsageManager(usageManager);
+            tsp.setSystemUsage(usageManager);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Mon Aug 20 03:37:29 2007
@@ -20,7 +20,7 @@
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -58,7 +58,7 @@
         if (nonPersistent == null) {
             nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
             nonPersistent.setMaxBatchSize(getMaxBatchSize());
-            nonPersistent.setUsageManager(usageManager);
+            nonPersistent.setSystemUsage(systemUsage);
         }
         nonPersistent.start();
         persistent.start();
@@ -201,13 +201,13 @@
         }
     }
 
-    public synchronized void setUsageManager(UsageManager usageManager) {
-        super.setUsageManager(usageManager);
+    public synchronized void setSystemUsage(SystemUsage usageManager) {
+        super.setSystemUsage(usageManager);
         if (persistent != null) {
-            persistent.setUsageManager(usageManager);
+            persistent.setSystemUsage(usageManager);
         }
         if (nonPersistent != null) {
-            nonPersistent.setUsageManager(usageManager);
+            nonPersistent.setSystemUsage(usageManager);
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Aug 20 03:37:29 2007
@@ -26,7 +26,7 @@
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
 import org.apache.activemq.filter.DestinationMapEntry;
 import org.apache.activemq.kaha.Store;
-import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -61,7 +61,7 @@
         }
         queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
         if (memoryLimit > 0) {
-            queue.getUsageManager().setLimit(memoryLimit);
+            queue.getBrokerMemoryUsage().setLimit(memoryLimit);
         }
         if (pendingQueuePolicy != null) {
             PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
@@ -81,11 +81,11 @@
         }
         topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
         if (memoryLimit > 0) {
-            topic.getUsageManager().setLimit(memoryLimit);
+            topic.getBrokerMemoryUsage().setLimit(memoryLimit);
         }
     }
 
-    public void configure(Broker broker, UsageManager memoryManager, TopicSubscription subscription) {
+    public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
         if (pendingMessageLimitStrategy != null) {
             int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
             int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@@ -111,13 +111,13 @@
         }
     }
 
-    public void configure(Broker broker, UsageManager memoryManager, DurableTopicSubscription sub) {
+    public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
         String clientId = sub.getClientId();
         String subName = sub.getSubscriptionName();
         int prefetch = sub.getPrefetchSize();
         if (pendingDurableSubscriberPolicy != null) {
             PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch);
-            cursor.setUsageManager(memoryManager);
+            cursor.setSystemUsage(memoryManager);
             sub.setPending(cursor);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Message.java Mon Aug 20 03:37:29 2007
@@ -582,7 +582,7 @@
         }
 
         if (rc == 1 && regionDestination != null) {
-            regionDestination.getUsageManager().increaseUsage(size);
+            regionDestination.getBrokerMemoryUsage().increaseUsage(size);
         }
 
         // System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
@@ -599,7 +599,7 @@
         }
 
         if (rc == 0 && regionDestination != null) {
-            regionDestination.getUsageManager().decreaseUsage(size);
+            regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
         }
         // System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
         // "+rc);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=567647&r1=567646&r2=567647&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Mon Aug 20 03:37:29 2007
@@ -256,22 +256,14 @@
      */
     void setMaxDataFileLength(long maxDataFileLength);
 
-    /**
-     * @see org.apache.activemq.kaha.IndexTypes
-     * @return the default index type
-     */
-    String getIndexTypeAsString();
-
-    /**
-     * Set the default index type
-     * 
-     * @param type
-     * @see org.apache.activemq.kaha.IndexTypes
-     */
-    void setIndexTypeAsString(String type);
 
     /**
      * @return true if the store has been initialized
      */
     boolean isInitialized();
+    
+    /**
+     * @return the amount of disk space the store is occupying
+     */
+    long size();
 }



Mime
View raw message