activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1489180 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java policy/PolicyEntry.java
Date Mon, 03 Jun 2013 21:12:44 GMT
Author: tabish
Date: Mon Jun  3 21:12:44 2013
New Revision: 1489180

URL: http://svn.apache.org/r1489180
Log:
some fixes for: https://issues.apache.org/jira/browse/AMQ-4487

Allow the maxProducersToAudit value to be set of Queue and QueueBrowser subscriptions which
can help to workaround OOM errors on QueueBrowsers when there are more than the max number
of producers audited.

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1489180&r1=1489179&r2=1489180&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Jun  3 21:12:44 2013
@@ -834,6 +834,9 @@ public abstract class PrefetchSubscripti
 
     public void setMaxProducersToAudit(int maxProducersToAudit) {
         this.maxProducersToAudit = maxProducersToAudit;
+        if (this.pending != null) {
+            this.pending.setMaxProducersToAudit(maxProducersToAudit);
+        }
     }
 
     public int getMaxAuditDepth() {
@@ -842,6 +845,9 @@ public abstract class PrefetchSubscripti
 
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
+        if (this.pending != null) {
+            this.pending.setMaxAuditDepth(maxAuditDepth);
+        }
     }
 
     public boolean isUsePrefetchExtension() {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1489180&r1=1489179&r2=1489180&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Jun  3 21:12:44 2013
@@ -39,9 +39,9 @@ import org.slf4j.LoggerFactory;
 /**
  * Represents an entry in a {@link PolicyMap} for assigning policies to a
  * specific destination or a hierarchical wildcard area of destinations.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class PolicyEntry extends DestinationMapEntry {
 
@@ -118,7 +118,7 @@ public class PolicyEntry extends Destina
             PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
             queue.setMessages(messages);
         }
-        
+
         queue.setUseConsumerPriority(isUseConsumerPriority());
         queue.setStrictOrderDispatch(isStrictOrderDispatch());
         queue.setOptimizedDispatch(isOptimizedDispatch());
@@ -144,7 +144,7 @@ public class PolicyEntry extends Destina
         }
         topic.setLazyDispatch(isLazyDispatch());
     }
-    
+
     public void baseConfiguration(Broker broker,BaseDestination destination) {
         destination.setProducerFlowControl(isProducerFlowControl());
         destination.setAlwaysRetroactive(isAlwaysRetroactive());
@@ -231,19 +231,21 @@ public class PolicyEntry extends Destina
             sub.setMaxAuditDepth(auditDepth);
         }
         sub.setMaxProducersToAudit(getMaxProducersToAudit());
-        sub.setUsePrefetchExtension(isUsePrefetchExtension());        
+        sub.setUsePrefetchExtension(isUsePrefetchExtension());
     }
-    
+
     public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription
sub) {
         configurePrefetch(sub);
         sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         sub.setUsePrefetchExtension(isUsePrefetchExtension());
+        sub.setMaxProducersToAudit(getMaxProducersToAudit());
     }
-    
+
     public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub)
{
         configurePrefetch(sub);
         sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
         sub.setUsePrefetchExtension(isUsePrefetchExtension());
+        sub.setMaxProducersToAudit(getMaxProducersToAudit());
     }
 
     public void configurePrefetch(Subscription subscription) {
@@ -442,13 +444,13 @@ public class PolicyEntry extends Destina
     public void setAlwaysRetroactive(boolean alwaysRetroactive) {
         this.alwaysRetroactive = alwaysRetroactive;
     }
-    
-    
+
+
     /**
      * Set's the interval at which warnings about producers being blocked by
      * resource usage will be triggered. Values of 0 or less will disable
      * warnings
-     * 
+     *
      * @param blockedProducerWarningInterval the interval at which warning about
      *            blocked producers will be triggered.
      */
@@ -457,14 +459,14 @@ public class PolicyEntry extends Destina
     }
 
     /**
-     * 
+     *
      * @return the interval at which warning about blocked producers will be
      *         triggered.
      */
     public long getBlockedProducerWarningInterval() {
         return blockedProducerWarningInterval;
     }
-    
+
     /**
      * @return the maxProducersToAudit
      */
@@ -522,23 +524,23 @@ public class PolicyEntry extends Destina
     public void setOptimizedDispatch(boolean optimizedDispatch) {
         this.optimizedDispatch = optimizedDispatch;
     }
-    
+
     public int getMaxPageSize() {
         return maxPageSize;
     }
 
     public void setMaxPageSize(int maxPageSize) {
         this.maxPageSize = maxPageSize;
-    } 
-    
+    }
+
     public int getMaxBrowsePageSize() {
         return maxBrowsePageSize;
     }
 
     public void setMaxBrowsePageSize(int maxPageSize) {
         this.maxBrowsePageSize = maxPageSize;
-    } 
-    
+    }
+
     public boolean isUseCache() {
         return useCache;
     }
@@ -553,8 +555,8 @@ public class PolicyEntry extends Destina
 
     public void setMinimumMessageSize(long minimumMessageSize) {
         this.minimumMessageSize = minimumMessageSize;
-    }   
-    
+    }
+
     public boolean isUseConsumerPriority() {
         return useConsumerPriority;
     }
@@ -665,7 +667,7 @@ public class PolicyEntry extends Destina
     public void setAdvisoryForConsumed(boolean advisoryForConsumed) {
         this.advisoryForConsumed = advisoryForConsumed;
     }
-    
+
     /**
      * @return the advisdoryForFastProducers
      */
@@ -683,15 +685,15 @@ public class PolicyEntry extends Destina
     public void setMaxExpirePageSize(int maxExpirePageSize) {
         this.maxExpirePageSize = maxExpirePageSize;
     }
-    
+
     public int getMaxExpirePageSize() {
         return maxExpirePageSize;
     }
-    
+
     public void setExpireMessagesPeriod(long expireMessagesPeriod) {
         this.expireMessagesPeriod = expireMessagesPeriod;
     }
-    
+
     public long getExpireMessagesPeriod() {
         return expireMessagesPeriod;
     }
@@ -759,7 +761,7 @@ public class PolicyEntry extends Destina
     public void setDurableTopicPrefetch(int durableTopicPrefetch) {
         this.durableTopicPrefetch = durableTopicPrefetch;
     }
-    
+
     public boolean isUsePrefetchExtension() {
         return this.usePrefetchExtension;
     }
@@ -767,17 +769,17 @@ public class PolicyEntry extends Destina
     public void setUsePrefetchExtension(boolean usePrefetchExtension) {
         this.usePrefetchExtension = usePrefetchExtension;
     }
-    
+
     public int getCursorMemoryHighWaterMark() {
         return this.cursorMemoryHighWaterMark;
     }
 
     public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
         this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
-	}
+    }
 
     public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) {
-        this.storeUsageHighWaterMark = storeUsageHighWaterMark;   
+        this.storeUsageHighWaterMark = storeUsageHighWaterMark;
     }
 
     public int getStoreUsageHighWaterMark() {
@@ -787,12 +789,12 @@ public class PolicyEntry extends Destina
     public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) {
         this.slowConsumerStrategy = slowConsumerStrategy;
     }
-    
+
     public SlowConsumerStrategy getSlowConsumerStrategy() {
         return this.slowConsumerStrategy;
     }
-    
-    
+
+
     public boolean isPrioritizedMessages() {
         return this.prioritizedMessages;
     }



Mime
View raw message