activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r818540 - in /activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region: BaseDestination.java Queue.java Topic.java
Date Thu, 24 Sep 2009 16:32:07 GMT
Author: dejanb
Date: Thu Sep 24 16:32:07 2009
New Revision: 818540

URL: http://svn.apache.org/viewvc?rev=818540&view=rev
Log:
merging revisions 817222:817842

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=818540&r1=818539&r2=818540&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Sep 24 16:32:07 2009
@@ -51,6 +51,7 @@
     protected SystemUsage systemUsage;
     protected MemoryUsage memoryUsage;
     private boolean producerFlowControl = true;
+    protected boolean warnOnProducerFlowControl = true; 
     private int maxProducersToAudit = 1024;
     private int maxAuditDepth = 2048;
     private boolean enableAudit = true;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=818540&r1=818539&r2=818540&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Sep 24 16:32:07 2009
@@ -413,8 +413,15 @@
             isFull(context, memoryUsage);
             fastProducer(context, producerInfo);
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
+                if(warnOnProducerFlowControl) {
+                    warnOnProducerFlowControl = false;
+                    LOG.info("Usage Manager memory limit reached on " +getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination
to prevent flooding it." +
+                            " See http://activemq.apache.org/producer-flow-control.html for
more info");
+                }
+                
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("SystemUsage memory limit
reached");
+                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName()
+ "." +
+                            " See http://activemq.apache.org/producer-flow-control.html for
more info");
                 }
     
                 // We can avoid blocking due to low usage if the producer is sending
@@ -498,8 +505,13 @@
         final ConnectionContext context = producerExchange.getConnectionContext();
         synchronized (sendLock) {
             if (store != null && message.isPersistent()) {
-                if (systemUsage.isSendFailIfNoSpace() && systemUsage.getStoreUsage().isFull())
{
-                	throw new javax.jms.ResourceAllocationException("Usage Manager Store is
Full");
+                if (systemUsage.getStoreUsage().isFull()) {
+                    final String logMessage = "Usage Manager Store is Full. Stopping producer
(" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "." +
+                            " See http://activemq.apache.org/producer-flow-control.html for
more info";
+                    LOG.info(logMessage);
+                    if (systemUsage.isSendFailIfNoSpace()) {
+                        throw new javax.jms.ResourceAllocationException(logMessage);
+                    }
                 }
                 while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
                     if (context.getStopping().get()) {
@@ -1279,6 +1291,14 @@
 
     final void sendMessage(final ConnectionContext context, Message msg) throws Exception
{
         if (!msg.isPersistent() && messages.getSystemUsage() != null) {
+        	if (systemUsage.getTempUsage().isFull()) {
+                final String logMessage = "Usage Manager Temp Store is Full. Stopping producer
(" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "." +
+                        " See http://activemq.apache.org/producer-flow-control.html for more
info";
+                LOG.info(logMessage);
+                if (systemUsage.isSendFailIfNoSpace()) {
+                    throw new javax.jms.ResourceAllocationException(logMessage);
+                }
+            }
             messages.getSystemUsage().getTempUsage().waitForSpace();
         }
         synchronized(messages) {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=818540&r1=818539&r2=818540&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Sep 24 16:32:07 2009
@@ -280,11 +280,20 @@
         if(memoryUsage.isFull()) {
             isFull(context, memoryUsage);
             fastProducer(context, producerInfo);
+            
             if (isProducerFlowControl() && context.isProducerFlowControl()) {
+                
+                if(warnOnProducerFlowControl) {
+                    warnOnProducerFlowControl = false;
+                    LOG.info("Usage Manager memory limit reached for " +getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination
to prevent flooding it." +
+                            " See http://activemq.apache.org/producer-flow-control.html for
more info");
+                }
+                
                 if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit reached");
+                    throw new javax.jms.ResourceAllocationException("Usage Manager memory
limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName()
+ "." +
+                            " See http://activemq.apache.org/producer-flow-control.html for
more info");
                 }
-    
+   
                 // We can avoid blocking due to low usage if the producer is sending
                 // a sync message or
                 // if it is using a producer window
@@ -390,8 +399,13 @@
 
         if (topicStore != null && message.isPersistent()
                 && !canOptimizeOutPersistence()) {
-            if (systemUsage.isSendFailIfNoSpace() && systemUsage.getStoreUsage().isFull())
{
-                throw new javax.jms.ResourceAllocationException("Usage Manager Store is Full");
+            if (systemUsage.getStoreUsage().isFull()) {
+                final String logMessage = "Usage Manager Store is Full. Stopping producer
(" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "." +
+                        " See http://activemq.apache.org/producer-flow-control.html for more
info";
+                LOG.info(logMessage);
+                if (systemUsage.isSendFailIfNoSpace()) {
+            	    throw new javax.jms.ResourceAllocationException(logMessage);
+                }
             }
             while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
                 if (context.getStopping().get()) {



Mime
View raw message