activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r512637 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/BrokerService.java broker/region/Queue.java broker/region/Topic.java memory/UsageManager.java
Date Wed, 28 Feb 2007 07:17:56 GMT
Author: rajdavies
Date: Tue Feb 27 23:17:55 2007
New Revision: 512637

URL: http://svn.apache.org/viewvc?view=rev&rev=512637
Log:
Have separate child Usage Managers - one for producers, one for consumers.
This has been done so that paged-in messages do not have to contend for memory
with fast producers

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=512637&r1=512636&r2=512637
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Tue Feb 27 23:17:55 2007
@@ -120,7 +120,9 @@
     private ObjectName brokerObjectName;
     private TaskRunnerFactory taskRunnerFactory;
     private TaskRunnerFactory persistenceTaskRunnerFactory;
-    private UsageManager memoryManager;
+    private UsageManager usageManager;
+    private UsageManager producerUsageManager;
+    private UsageManager consumerUsageManager;
     private PersistenceAdapter persistenceAdapter;
     private PersistenceAdapterFactory persistenceFactory;
     private DestinationFactory destinationFactory;
@@ -621,18 +623,56 @@
     }
 
     public UsageManager getMemoryManager() {
-        if (memoryManager == null) {
-            memoryManager = new UsageManager();
-            memoryManager.setLimit(1024 * 1024 * 20); // Default to 20 Meg
+        if (usageManager == null) {
+            usageManager = new UsageManager("Main");
+            usageManager.setLimit(1024 * 1024 * 20); // Default to 20 Meg
             // limit
         }
-        return memoryManager;
+        return usageManager;
     }
+       
 
     public void setMemoryManager(UsageManager memoryManager) {
-        this.memoryManager = memoryManager;
+        this.usageManager = memoryManager;
     }
+    
+    /**
+     * @return the consumerUsageManager
+     */
+    public UsageManager getConsumerUsageManager(){
+        if (consumerUsageManager==null) {
+            consumerUsageManager = new UsageManager(getMemoryManager(),"Consumer",0.5f);
+        }
+        return consumerUsageManager;
+    }
+
+    
+    /**
+     * @param consumerUsageManager the consumerUsageManager to set
+     */
+    public void setConsumerUsageManager(UsageManager consumerUsageManager){
+        this.consumerUsageManager=consumerUsageManager;
+    }
+
+    
+    /**
+     * @return the producerUsageManager
+     */
+    public UsageManager getProducerUsageManager(){
+        if (producerUsageManager==null) {
+            producerUsageManager = new UsageManager(getMemoryManager(),"Producer",0.45f);
+        }
+        return producerUsageManager;
+    }
+    
+    /**
+     * @param producerUsageManager the producerUsageManager to set
+     */
+    public void setProducerUsageManager(UsageManager producerUsageManager){
+        this.producerUsageManager=producerUsageManager;
+    }    
 
+   
     public PersistenceAdapter getPersistenceAdapter() throws IOException {
         if (persistenceAdapter == null) {
             persistenceAdapter = createPersistenceAdapter();
@@ -1272,7 +1312,7 @@
     protected Broker createRegionBroker() throws Exception {
         // we must start the persistence adaptor before we can create the region
         // broker
-        getPersistenceAdapter().setUsageManager(getMemoryManager());
+        getPersistenceAdapter().setUsageManager(getProducerUsageManager());
         getPersistenceAdapter().start();
         
         DestinationInterceptor destinationInterceptor = null;
@@ -1284,15 +1324,15 @@
         }
 	RegionBroker regionBroker = null;
 	if (destinationFactory == null) {
-            destinationFactory = new DestinationFactoryImpl(getMemoryManager(), getTaskRunnerFactory(),
getPersistenceAdapter());
+            destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(),
getPersistenceAdapter());
         }
         if (isUseJmx()) {
             MBeanServer mbeanServer = getManagementContext().getMBeanServer();
-            regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(),
getTaskRunnerFactory(), getMemoryManager(),
+            regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(),
getTaskRunnerFactory(), getConsumerUsageManager(),
                     destinationFactory, destinationInterceptor);
         }
         else {
-            regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(),
destinationFactory, destinationInterceptor);
+            regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getConsumerUsageManager(),
destinationFactory, destinationInterceptor);
         }
         destinationFactory.setRegionBroker(regionBroker);
         
@@ -1597,4 +1637,7 @@
         }
         LOCAL_HOST_NAME = localHostName;
     }
+
+    
+   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=512637&r1=512636&r2=512637
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Feb 27 23:17:55 2007
@@ -96,7 +96,7 @@
     public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore
store, DestinationStatistics parentStats,
             TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
         this.destination = destination;
-        this.usageManager = new UsageManager(memoryManager);
+        this.usageManager = new UsageManager(memoryManager,destination.toString());
         this.usageManager.setLimit(Long.MAX_VALUE);
         this.store = store;
         if(destination.isTemporary()){
@@ -455,6 +455,9 @@
 
     public void start() throws Exception {
         started = true;
+        if (usageManager != null) {
+            usageManager.start();
+        }
         messages.start();
         doPageIn(false);
     }
@@ -466,6 +469,9 @@
         }
         if(messages!=null){
             messages.stop();
+        }
+        if (usageManager != null) {
+            usageManager.stop();
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=512637&r1=512636&r2=512637
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Tue Feb 27 23:17:55 2007
@@ -74,7 +74,7 @@
 
         this.destination = destination;
         this.store = store; //this could be NULL! (If an advsiory)
-        this.usageManager = new UsageManager(memoryManager);
+        this.usageManager = new UsageManager(memoryManager,destination.toString());
         this.usageManager.setLimit(Long.MAX_VALUE);
         
         // Let the store know what usage manager we are using so that he can flush messages
to disk
@@ -321,10 +321,17 @@
 
     public void start() throws Exception {
         this.subscriptionRecoveryPolicy.start();
+        if (usageManager != null) {
+            usageManager.start();
+        }
+        
     }
 
     public void stop() throws Exception {
         this.subscriptionRecoveryPolicy.stop();
+        if (usageManager != null) {
+            usageManager.stop();
+        }
     }
     
     public Message[] browse(){

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java?view=diff&rev=512637&r1=512636&r2=512637
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java
Tue Feb 27 23:17:55 2007
@@ -18,7 +18,9 @@
 package org.apache.activemq.memory;
 
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.activemq.Service;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -35,7 +37,7 @@
  * 
  * @version $Revision: 1.3 $
  */
-public class UsageManager {
+public class UsageManager  implements Service{
 
     private static final Log log = LogFactory.getLog(UsageManager.class);
     
@@ -55,9 +57,12 @@
     /** True if someone called setSendFailIfNoSpace() on this particular usage manager */
     private boolean sendFailIfNoSpaceExplicitySet;
     private final boolean debug = log.isDebugEnabled();
+    private String name = "";
+    private float usagePortion = 1.0f;
+    private List<UsageManager> children = new CopyOnWriteArrayList<UsageManager>();
 
     public UsageManager() {
-        this(null);
+        this(null,"default");
     }
     
     /**
@@ -68,7 +73,25 @@
      * @param parent
      */
     public UsageManager(UsageManager parent) {
+        this(parent,"default");
+    }
+    
+    public UsageManager(String name) {
+        this(null,name);
+    }
+    
+    public UsageManager(UsageManager parent,String name) {
+        this(parent,name,1.0f);
+    }
+    
+    public UsageManager(UsageManager parent, String name, float portion) {
         this.parent = parent;
+        this.usagePortion=portion;
+        if (parent != null) {
+            this.limit=(long)(parent.limit * portion);
+            this.name= parent.name + ":";
+        }
+        this.name += name;
     }
     
     /**
@@ -91,9 +114,6 @@
             for( int i=0; percentUsage >= 100 ; i++) {
                 usageMutex.wait();
             }
-            for( int i=0; percentUsage > 90 ; i++) {
-                usageMutex.wait(100);
-            }
         }
     }
     
@@ -166,11 +186,14 @@
             throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal
to 0");
         }
         int percentUsage;
-        synchronized (usageMutex) {
-            this.limit = limit;
-            percentUsage = caclPercentUsage();
+        synchronized(usageMutex){
+            this.limit=parent!=null?(long)(parent.limit*usagePortion):limit;
+            percentUsage=caclPercentUsage();
         }
         setPercentUsage(percentUsage);
+        for (UsageManager child:children) {
+            child.setLimit(limit);
+        }
     }
     
     /*
@@ -259,8 +282,35 @@
             l.onMemoryUseChanged(this,oldPercentUsage,newPercentUsage);
         }
     }
+    
+    public String getName() {
+        return name;
+    }
+
+    public String toString(){
+       
+        
+        return "UsageManager("+ getName() +") percentUsage="+percentUsage+"%, usage="+usage+"
limit="+limit+" percentUsageMinDelta="
+                +percentUsageMinDelta+"%";
+    }
+
+    public void start(){
+        if(parent!=null){
+            parent.addChild(this);
+        }
+    }
+
+    public void stop(){
+        if(parent!=null){
+            parent.removeChild(this);
+        }
+    }
+
+    private void addChild(UsageManager child){
+        children.add(child);
+    }
 
-    public String toString() {
-        return "UsageManager: percentUsage="+percentUsage+"%, usage="+usage+" limit="+limit+"
percentUsageMinDelta="+percentUsageMinDelta+"%";
+    private void removeChild(UsageManager child){
+        children.remove(child);
     }
 }



Mime
View raw message