Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF3D710619 for ; Mon, 3 Jun 2013 21:13:08 +0000 (UTC) Received: (qmail 18614 invoked by uid 500); 3 Jun 2013 21:13:08 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 18580 invoked by uid 500); 3 Jun 2013 21:13:08 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 18573 invoked by uid 99); 3 Jun 2013 21:13:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Jun 2013 21:13:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Jun 2013 21:13:05 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E3291238889B; Mon, 3 Jun 2013 21:12:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1489180 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java policy/PolicyEntry.java Date: Mon, 03 Jun 2013 21:12:44 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130603211244.E3291238889B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Mon Jun 3 21:12:44 2013 New Revision: 1489180 URL: http://svn.apache.org/r1489180 Log: some fixes for: https://issues.apache.org/jira/browse/AMQ-4487 Allow the maxProducersToAudit value to be set of Queue and QueueBrowser subscriptions which can help to workaround OOM errors on QueueBrowsers when there are more than the max number of producers audited. Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1489180&r1=1489179&r2=1489180&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jun 3 21:12:44 2013 @@ -834,6 +834,9 @@ public abstract class PrefetchSubscripti public void setMaxProducersToAudit(int maxProducersToAudit) { this.maxProducersToAudit = maxProducersToAudit; + if (this.pending != null) { + this.pending.setMaxProducersToAudit(maxProducersToAudit); + } } public int getMaxAuditDepth() { @@ -842,6 +845,9 @@ public abstract class PrefetchSubscripti public void setMaxAuditDepth(int maxAuditDepth) { this.maxAuditDepth = maxAuditDepth; + if (this.pending != null) { + this.pending.setMaxAuditDepth(maxAuditDepth); + } } public boolean isUsePrefetchExtension() { Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1489180&r1=1489179&r2=1489180&view=diff ============================================================================== --- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Jun 3 21:12:44 2013 @@ -39,9 +39,9 @@ import org.slf4j.LoggerFactory; /** * Represents an entry in a {@link PolicyMap} for assigning policies to a * specific destination or a hierarchical wildcard area of destinations. - * + * * @org.apache.xbean.XBean - * + * */ public class PolicyEntry extends DestinationMapEntry { @@ -118,7 +118,7 @@ public class PolicyEntry extends Destina PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue); queue.setMessages(messages); } - + queue.setUseConsumerPriority(isUseConsumerPriority()); queue.setStrictOrderDispatch(isStrictOrderDispatch()); queue.setOptimizedDispatch(isOptimizedDispatch()); @@ -144,7 +144,7 @@ public class PolicyEntry extends Destina } topic.setLazyDispatch(isLazyDispatch()); } - + public void baseConfiguration(Broker broker,BaseDestination destination) { destination.setProducerFlowControl(isProducerFlowControl()); destination.setAlwaysRetroactive(isAlwaysRetroactive()); @@ -231,19 +231,21 @@ public class PolicyEntry extends Destina sub.setMaxAuditDepth(auditDepth); } sub.setMaxProducersToAudit(getMaxProducersToAudit()); - sub.setUsePrefetchExtension(isUsePrefetchExtension()); + sub.setUsePrefetchExtension(isUsePrefetchExtension()); } - + public void configure(Broker broker, SystemUsage memoryManager, QueueBrowserSubscription sub) { configurePrefetch(sub); sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); sub.setUsePrefetchExtension(isUsePrefetchExtension()); + sub.setMaxProducersToAudit(getMaxProducersToAudit()); } - + public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { configurePrefetch(sub); sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); sub.setUsePrefetchExtension(isUsePrefetchExtension()); + sub.setMaxProducersToAudit(getMaxProducersToAudit()); } public void configurePrefetch(Subscription subscription) { @@ -442,13 +444,13 @@ public class PolicyEntry extends Destina public void setAlwaysRetroactive(boolean alwaysRetroactive) { this.alwaysRetroactive = alwaysRetroactive; } - - + + /** * Set's the interval at which warnings about producers being blocked by * resource usage will be triggered. Values of 0 or less will disable * warnings - * + * * @param blockedProducerWarningInterval the interval at which warning about * blocked producers will be triggered. */ @@ -457,14 +459,14 @@ public class PolicyEntry extends Destina } /** - * + * * @return the interval at which warning about blocked producers will be * triggered. */ public long getBlockedProducerWarningInterval() { return blockedProducerWarningInterval; } - + /** * @return the maxProducersToAudit */ @@ -522,23 +524,23 @@ public class PolicyEntry extends Destina public void setOptimizedDispatch(boolean optimizedDispatch) { this.optimizedDispatch = optimizedDispatch; } - + public int getMaxPageSize() { return maxPageSize; } public void setMaxPageSize(int maxPageSize) { this.maxPageSize = maxPageSize; - } - + } + public int getMaxBrowsePageSize() { return maxBrowsePageSize; } public void setMaxBrowsePageSize(int maxPageSize) { this.maxBrowsePageSize = maxPageSize; - } - + } + public boolean isUseCache() { return useCache; } @@ -553,8 +555,8 @@ public class PolicyEntry extends Destina public void setMinimumMessageSize(long minimumMessageSize) { this.minimumMessageSize = minimumMessageSize; - } - + } + public boolean isUseConsumerPriority() { return useConsumerPriority; } @@ -665,7 +667,7 @@ public class PolicyEntry extends Destina public void setAdvisoryForConsumed(boolean advisoryForConsumed) { this.advisoryForConsumed = advisoryForConsumed; } - + /** * @return the advisdoryForFastProducers */ @@ -683,15 +685,15 @@ public class PolicyEntry extends Destina public void setMaxExpirePageSize(int maxExpirePageSize) { this.maxExpirePageSize = maxExpirePageSize; } - + public int getMaxExpirePageSize() { return maxExpirePageSize; } - + public void setExpireMessagesPeriod(long expireMessagesPeriod) { this.expireMessagesPeriod = expireMessagesPeriod; } - + public long getExpireMessagesPeriod() { return expireMessagesPeriod; } @@ -759,7 +761,7 @@ public class PolicyEntry extends Destina public void setDurableTopicPrefetch(int durableTopicPrefetch) { this.durableTopicPrefetch = durableTopicPrefetch; } - + public boolean isUsePrefetchExtension() { return this.usePrefetchExtension; } @@ -767,17 +769,17 @@ public class PolicyEntry extends Destina public void setUsePrefetchExtension(boolean usePrefetchExtension) { this.usePrefetchExtension = usePrefetchExtension; } - + public int getCursorMemoryHighWaterMark() { return this.cursorMemoryHighWaterMark; } public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) { this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark; - } + } public void setStoreUsageHighWaterMark(int storeUsageHighWaterMark) { - this.storeUsageHighWaterMark = storeUsageHighWaterMark; + this.storeUsageHighWaterMark = storeUsageHighWaterMark; } public int getStoreUsageHighWaterMark() { @@ -787,12 +789,12 @@ public class PolicyEntry extends Destina public void setSlowConsumerStrategy(SlowConsumerStrategy slowConsumerStrategy) { this.slowConsumerStrategy = slowConsumerStrategy; } - + public SlowConsumerStrategy getSlowConsumerStrategy() { return this.slowConsumerStrategy; } - - + + public boolean isPrioritizedMessages() { return this.prioritizedMessages; }