activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r818418 - in /activemq/branches/activemq-5.3: ./ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/broker/re...
Date Thu, 24 Sep 2009 09:39:06 GMT
Author: rajdavies
Date: Thu Sep 24 09:39:05 2009
New Revision: 818418

URL: http://svn.apache.org/viewvc?rev=818418&view=rev
Log:
 Merged /activemq/trunk:r818140-818147,818160-818176,818225-818262

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
      - copied, changed from r818147, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
Modified:
    activemq/branches/activemq-5.3/   (props changed)
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java

Propchange: activemq/branches/activemq-5.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Sep 24 09:39:05 2009
@@ -1 +1 @@
-/activemq/trunk:816278-816279,816298,818138,818155,818209,818211,818224
+/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Thu Sep 24 09:39:05 2009
@@ -47,6 +47,7 @@
     protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
+    private int cursorMemoryHighWaterMark = 70;
 
 
     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
@@ -211,6 +212,14 @@
         
     }
     
+    public int getCursorMemoryHighWaterMark(){
+    	return this.cursorMemoryHighWaterMark;
+    }
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
+		this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
+	}
+    
     public int countBeforeFull() {
         return getDispatchedQueueSize() - info.getPrefetchSize();
     }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Sep 24 09:39:05 2009
@@ -50,7 +50,7 @@
     protected final MessageStore store;
     protected SystemUsage systemUsage;
     protected MemoryUsage memoryUsage;
-    private boolean producerFlowControl = false;
+    private boolean producerFlowControl = true;
     private int maxProducersToAudit = 1024;
     private int maxAuditDepth = 2048;
     private boolean enableAudit = true;
@@ -72,6 +72,7 @@
     protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
     protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
     private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
+    protected int cursorMemoryHighWaterMark = 70;
 
     /**
      * @param broker
@@ -374,6 +375,14 @@
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
         this.deadLetterStrategy = deadLetterStrategy;
     }
+    
+    public int getCursorMemoryHighWaterMark() {
+		return this.cursorMemoryHighWaterMark;
+	}
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+		this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+	}
 
     /**
      * called when message is consumed
@@ -510,5 +519,4 @@
     public void processDispatchNotification(
             MessageDispatchNotification messageDispatchNotification) throws Exception {
     }
-
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Thu Sep 24 09:39:05 2009
@@ -105,6 +105,10 @@
 
     public void setMinimumMessageSize(int minimumMessageSize);
     
+    public int getCursorMemoryHighWaterMark();
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+    
     /**
      * optionally called by a Subscriber - to inform the Destination its
      * ready for more messages

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Thu Sep 24 09:39:05 2009
@@ -265,4 +265,12 @@
             MessageDispatchNotification messageDispatchNotification) throws Exception {
         next.processDispatchNotification(messageDispatchNotification);   
     }
+
+	public int getCursorMemoryHighWaterMark() {
+		return next.getCursorMemoryHighWaterMark();
+	}
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+		next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
+	}
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Thu Sep 24 09:39:05 2009
@@ -54,6 +54,7 @@
         super(broker,usageManager, context, info);
         this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(),
info.getPrefetchSize(), this);
         this.pending.setSystemUsage(usageManager);
+        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
         
@@ -115,6 +116,7 @@
             }
             synchronized (pending) {
                 pending.setSystemUsage(memoryManager);
+                pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
                 pending.start();
 
                 // If nothing was in the persistent store, then try to use the

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Sep 24 09:39:05 2009
@@ -525,6 +525,7 @@
         this.pending = pending;
         if (this.pending!=null) {
             this.pending.setSystemUsage(usageManager);
+            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         }
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Sep 24 09:39:05 2009
@@ -191,6 +191,7 @@
             messages.setMaxAuditDepth(getMaxAuditDepth());
             messages.setMaxProducersToAudit(getMaxProducersToAudit());
             messages.setUseCache(isUseCache());
+            messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
             if (messages.isRecoveryRequired()) {
                 store.recover(new MessageRecoveryListener() {
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Thu Sep 24 09:39:05 2009
@@ -223,4 +223,8 @@
     int countBeforeFull();
 
     ConnectionContext getContext();
+    
+    public int getCursorMemoryHighWaterMark();
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Thu Sep 24 09:39:05 2009
@@ -52,6 +52,7 @@
     
     public void initialize() throws Exception {
         this.messages=new VMPendingMessageCursor();
+        this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.systemUsage = brokerService.getSystemUsage();
         memoryUsage.setParent(systemUsage.getMemoryUsage());           
         this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Sep 24 09:39:05 2009
@@ -76,6 +76,7 @@
 
     public void init() throws Exception {
         this.matched.setSystemUsage(usageManager);
+        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.matched.start();
     }
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Thu Sep 24 09:39:05 2009
@@ -249,6 +249,16 @@
             nonPersistent.setUseCache(useCache);
         }
     }
+    
+    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
+        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+        if (persistent != null) {
+            persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+        }
+    }
 
 
 

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Sep 24 09:39:05 2009
@@ -59,7 +59,7 @@
     private int maxAuditDepth=2048;
     private int maxQueueAuditDepth=2048;
     private boolean enableAudit=true;
-    private boolean producerFlowControl = false;
+    private boolean producerFlowControl = true;
     private boolean optimizedDispatch=false;
     private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
     private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
@@ -82,6 +82,7 @@
     private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
     private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
     private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
+    private int cursorMemoryHighWaterMark=70;
     
    
     public void configure(Broker broker,Queue queue) {
@@ -140,6 +141,7 @@
         destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
         destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
         destination.setMaxExpirePageSize(getMaxExpirePageSize());
+        destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
@@ -177,8 +179,8 @@
         String clientId = sub.getSubscriptionKey().getClientId();
         String subName = sub.getSubscriptionKey().getSubscriptionName();
         int prefetch = sub.getPrefetchSize();
+        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         //override prefetch size if not set by the Consumer
-        
         if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
             sub.setPrefetchSize(getDurableTopicPrefetch());
         }
@@ -189,6 +191,7 @@
         }
         sub.setMaxAuditDepth(getMaxAuditDepth());
         sub.setMaxProducersToAudit(getMaxProducersToAudit());
+        
     }
     
     public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription
sub) {
@@ -199,6 +202,7 @@
         if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
             sub.setPrefetchSize(getQueueBrowserPrefetch());
         }
+        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
     }
     
     public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub)
{
@@ -209,6 +213,7 @@
         if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
             sub.setPrefetchSize(getQueuePrefetch());
         }
+        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
     }
 
     // Properties
@@ -661,6 +666,14 @@
     public void setDurableTopicPrefetch(int durableTopicPrefetch) {
         this.durableTopicPrefetch = durableTopicPrefetch;
     }
+    
+    public int getCursorMemoryHighWaterMark() {
+		return this.cursorMemoryHighWaterMark;
+	}
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+		this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+	}
 
 
 

Copied: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
(from r818147, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java?p2=activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java&r1=818147&r2=818418&rev=818418&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
Thu Sep 24 09:39:05 2009
@@ -21,4 +21,7 @@
 
 public class PolicyConfigTest extends TestCase{
 
+    public void testNoop() {
+    }
+
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=818418&r1=818417&r2=818418&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Thu Sep 24 09:39:05 2009
@@ -285,6 +285,14 @@
             public void acknowledge(ConnectionContext context, MessageAck ack)
                     throws Exception {
             }
+
+			public int getCursorMemoryHighWaterMark(){
+				return 0;
+			}
+
+			public void setCursorMemoryHighWaterMark(
+					int cursorMemoryHighWaterMark) {				
+			}
         };
 
         queue.addSubscription(contextNotInTx, subscription);



Mime
View raw message