activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r818147 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/broker/polic...
Date Wed, 23 Sep 2009 15:54:38 GMT
Author: rajdavies
Date: Wed Sep 23 15:54:37 2009
New Revision: 818147

URL: http://svn.apache.org/viewvc?rev=818147&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2403

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Wed Sep 23 15:54:37 2009
@@ -47,6 +47,7 @@
     protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
+    private int cursorMemoryHighWaterMark = 70;
 
 
     public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
@@ -211,6 +212,14 @@
         
     }
     
+    public int getCursorMemoryHighWaterMark(){
+    	return this.cursorMemoryHighWaterMark;
+    }
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
+		this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
+	}
+    
     public int countBeforeFull() {
         return getDispatchedQueueSize() - info.getPrefetchSize();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Wed Sep 23 15:54:37 2009
@@ -50,7 +50,7 @@
     protected final MessageStore store;
     protected SystemUsage systemUsage;
     protected MemoryUsage memoryUsage;
-    private boolean producerFlowControl = false;
+    private boolean producerFlowControl = true;
     protected boolean warnOnProducerFlowControl = true; 
     private int maxProducersToAudit = 1024;
     private int maxAuditDepth = 2048;
@@ -73,6 +73,7 @@
     protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
     protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
     private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
+    protected int cursorMemoryHighWaterMark = 70;
 
     /**
      * @param broker
@@ -375,6 +376,14 @@
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
         this.deadLetterStrategy = deadLetterStrategy;
     }
+    
+    public int getCursorMemoryHighWaterMark() {
+		return this.cursorMemoryHighWaterMark;
+	}
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+		this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+	}
 
     /**
      * called when message is consumed
@@ -511,5 +520,4 @@
     public void processDispatchNotification(
             MessageDispatchNotification messageDispatchNotification) throws Exception {
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Wed Sep 23 15:54:37 2009
@@ -105,6 +105,10 @@
 
     public void setMinimumMessageSize(int minimumMessageSize);
     
+    public int getCursorMemoryHighWaterMark();
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
+    
     /**
      * optionally called by a Subscriber - to inform the Destination its
      * ready for more messages

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Wed Sep 23 15:54:37 2009
@@ -265,4 +265,12 @@
             MessageDispatchNotification messageDispatchNotification) throws Exception {
         next.processDispatchNotification(messageDispatchNotification);   
     }
+
+	public int getCursorMemoryHighWaterMark() {
+		return next.getCursorMemoryHighWaterMark();
+	}
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+		next.setCursorMemoryHighWaterMark(cursorMemoryHighWaterMark);
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Wed Sep 23 15:54:37 2009
@@ -54,6 +54,7 @@
         super(broker,usageManager, context, info);
         this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(),
info.getPrefetchSize(), this);
         this.pending.setSystemUsage(usageManager);
+        this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.keepDurableSubsActive = keepDurableSubsActive;
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
         
@@ -115,6 +116,7 @@
             }
             synchronized (pending) {
                 pending.setSystemUsage(memoryManager);
+                pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
                 pending.start();
 
                 // If nothing was in the persistent store, then try to use the

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Sep 23 15:54:37 2009
@@ -525,6 +525,7 @@
         this.pending = pending;
         if (this.pending!=null) {
             this.pending.setSystemUsage(usageManager);
+            this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Sep 23 15:54:37 2009
@@ -191,6 +191,7 @@
             messages.setMaxAuditDepth(getMaxAuditDepth());
             messages.setMaxProducersToAudit(getMaxProducersToAudit());
             messages.setUseCache(isUseCache());
+            messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
             if (messages.isRecoveryRequired()) {
                 store.recover(new MessageRecoveryListener() {
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Wed Sep 23 15:54:37 2009
@@ -223,4 +223,8 @@
     int countBeforeFull();
 
     ConnectionContext getContext();
+    
+    public int getCursorMemoryHighWaterMark();
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Wed Sep 23 15:54:37 2009
@@ -52,6 +52,7 @@
     
     public void initialize() throws Exception {
         this.messages=new VMPendingMessageCursor();
+        this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.systemUsage = brokerService.getSystemUsage();
         memoryUsage.setParent(systemUsage.getMemoryUsage());           
         this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Sep 23 15:54:37 2009
@@ -76,6 +76,7 @@
 
     public void init() throws Exception {
         this.matched.setSystemUsage(usageManager);
+        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.matched.start();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Wed Sep 23 15:54:37 2009
@@ -249,6 +249,16 @@
             nonPersistent.setUseCache(useCache);
         }
     }
+    
+    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
+        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+        if (persistent != null) {
+            persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+        }
+        if (nonPersistent != null) {
+            nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
+        }
+    }
 
 
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Wed Sep 23 15:54:37 2009
@@ -33,6 +33,7 @@
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.derby.impl.sql.compile.GetCurrentConnectionNode;
 
 /**
  * Represents an entry in a {@link PolicyMap} for assigning policies to a
@@ -59,7 +60,7 @@
     private int maxAuditDepth=2048;
     private int maxQueueAuditDepth=2048;
     private boolean enableAudit=true;
-    private boolean producerFlowControl = false;
+    private boolean producerFlowControl = true;
     private boolean optimizedDispatch=false;
     private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
     private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
@@ -82,6 +83,7 @@
     private int queueBrowserPrefetch=ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH;
     private int topicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH;
     private int durableTopicPrefetch=ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
+    private int cursorMemoryHighWaterMark=70;
     
    
     public void configure(Broker broker,Queue queue) {
@@ -140,6 +142,7 @@
         destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
         destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
         destination.setMaxExpirePageSize(getMaxExpirePageSize());
+        destination.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
     }
 
     public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription)
{
@@ -177,8 +180,8 @@
         String clientId = sub.getSubscriptionKey().getClientId();
         String subName = sub.getSubscriptionKey().getSubscriptionName();
         int prefetch = sub.getPrefetchSize();
+        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         //override prefetch size if not set by the Consumer
-        
         if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
             sub.setPrefetchSize(getDurableTopicPrefetch());
         }
@@ -189,6 +192,7 @@
         }
         sub.setMaxAuditDepth(getMaxAuditDepth());
         sub.setMaxProducersToAudit(getMaxProducersToAudit());
+        
     }
     
     public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription
sub) {
@@ -199,6 +203,7 @@
         if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH){
             sub.setPrefetchSize(getQueueBrowserPrefetch());
         }
+        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
     }
     
     public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub)
{
@@ -209,6 +214,7 @@
         if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH){
             sub.setPrefetchSize(getQueuePrefetch());
         }
+        sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
     }
 
     // Properties
@@ -661,6 +667,14 @@
     public void setDurableTopicPrefetch(int durableTopicPrefetch) {
         this.durableTopicPrefetch = durableTopicPrefetch;
     }
+    
+    public int getCursorMemoryHighWaterMark() {
+		return this.cursorMemoryHighWaterMark;
+	}
+
+	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
+		this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
+	}
 
 
 

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java?rev=818147&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
Wed Sep 23 15:54:37 2009
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.policy;
+
+import junit.framework.TestCase;
+
+public class PolicyConfigTest extends TestCase{
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/PolicyConfigTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=818147&r1=818146&r2=818147&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Wed Sep 23 15:54:37 2009
@@ -285,6 +285,14 @@
             public void acknowledge(ConnectionContext context, MessageAck ack)
                     throws Exception {
             }
+
+			public int getCursorMemoryHighWaterMark(){
+				return 0;
+			}
+
+			public void setCursorMemoryHighWaterMark(
+					int cursorMemoryHighWaterMark) {				
+			}
         };
 
         queue.addSubscription(contextNotInTx, subscription);



Mime
View raw message