activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r382459 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: ./ policy/
Date Thu, 02 Mar 2006 17:56:22 GMT
Author: jstrachan
Date: Thu Mar  2 09:56:21 2006
New Revision: 382459

URL: http://svn.apache.org/viewcvs?rev=382459&view=rev
Log:
Patch for AMQ-606 to allow slow consumers to have their old messages discarded. For background
to the issue see http://docs.codehaus.org/display/ACTIVEMQ/Slow+Consumers

Provides a way to set a limit on the pending messages to be dispatched to a subscription so
that if that limit is reached, old messages are discarded. This will avoid the non-durable
topics from getting blocked due to slow consumers

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=382459&r1=382458&r2=382459&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Thu Mar  2 09:56:21 2006
@@ -115,4 +115,16 @@
     public boolean isSlaveBroker(){
         return broker.isSlaveBroker();
     }
+
+    public ConnectionContext getContext() {
+        return context;
+    }
+
+    public ConsumerInfo getInfo() {
+        return info;
+    }
+
+    public BooleanExpression getSelector() {
+        return selector;
+    }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?rev=382459&r1=382458&r2=382459&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
Thu Mar  2 09:56:21 2006
@@ -204,7 +204,17 @@
             return sub;
         }
         else {
-            return new TopicSubscription(broker,context, info, memoryManager);
+            TopicSubscription answer = new TopicSubscription(broker,context, info, memoryManager);
+            
+            // lets configure the subscription depending on the destination
+            ActiveMQDestination destination = info.getDestination();
+            if (destination != null && policyMap != null) {
+                PolicyEntry entry = policyMap.getEntryFor(destination);
+                if (entry != null) {
+                    entry.configure(answer);
+                }
+            }
+            return answer;
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=382459&r1=382458&r2=382459&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Mar  2 09:56:21 2006
@@ -42,7 +42,7 @@
     final protected UsageManager usageManager;
     protected int dispatched=0;
     protected int delivered=0;
-    private int maximumPendingMessages = 0;
+    private int maximumPendingMessages = -1;
     
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info,
UsageManager usageManager) throws InvalidSelectorException {
         super(broker,context, info);
@@ -56,15 +56,17 @@
             // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
             dispatch(node);
         } else {
-            synchronized (matched) {
-                matched.addLast(node);
-                
-                // NOTE - be careful about the slaveBroker!
-                if (maximumPendingMessages > 0) {
-                    // lets discard old messages as we are a slow consumer
-                    while (matched.size() > maximumPendingMessages) {
-                        MessageReference oldMessage = (MessageReference) matched.removeFirst();
-                        oldMessage.decrementReferenceCount();
+            if (maximumPendingMessages != 0) {
+                synchronized (matched) {
+                    matched.addLast(node);
+
+                    // NOTE - be careful about the slaveBroker!
+                    if (maximumPendingMessages > 0) {
+                        // lets discard old messages as we are a slow consumer
+                        while (matched.size() > maximumPendingMessages) {
+                            MessageReference oldMessage = (MessageReference) matched.removeFirst();
+                            oldMessage.decrementReferenceCount();
+                        }
                     }
                 }
             }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java?rev=382459&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java
Thu Mar  2 09:56:21 2006
@@ -0,0 +1,43 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.TopicSubscription;
+
+/**
+ * This PendingMessageLimitStrategy is configured to a constant value for all subscriptions.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision$
+ */
+public class ConstantPendingMessageLimitStrategy implements PendingMessageLimitStrategy {
+
+    private int limit = -1;
+
+    public int getMaximumPendingMessageLimit(TopicSubscription subscription) {
+        return limit;
+    }
+
+    public int getLimit() {
+        return limit;
+    }
+
+    public void setLimit(int limit) {
+        this.limit = limit;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/ConstantPendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java?rev=382459&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java
Thu Mar  2 09:56:21 2006
@@ -0,0 +1,41 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.TopicSubscription;
+
+/**
+ * A pluggable strategy to calculate the maximum number of messages that are allowed to be
pending on 
+ * consumers (in addition to their prefetch sizes).
+ * 
+ * Once the limit is reached, non-durable topics can then start discarding old messages.
+ * This allows us to keep dispatching messages to slow consumers while not blocking fast
consumers
+ * and discarding the messages oldest first.
+ *  
+ * @version $Revision$
+ */
+public interface PendingMessageLimitStrategy {
+
+    /**
+     * Calculate the maximum number of pending messages (in excess of the prefetch size)
+     * for the given subscription
+     * 
+     * @return the maximum or -1 if there is no maximum
+     */
+    int getMaximumPendingMessageLimit(TopicSubscription subscription);
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=382459&r1=382458&r2=382459&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Thu Mar  2 09:56:21 2006
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.broker.region.TopicSubscription;
 import org.apache.activemq.filter.DestinationMapEntry;
 
 /**
@@ -35,6 +36,7 @@
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy;
     private int messageGroupHashBucketCount = 1024;
+    private PendingMessageLimitStrategy pendingMessageLimitStrategy;
 
     public void configure(Queue queue) {
         if (dispatchPolicy != null) {
@@ -59,6 +61,15 @@
         topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
     }
 
+    public void configure(TopicSubscription subscription) {
+        if (pendingMessageLimitStrategy != null) {
+            int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
+            if (value >= 0) {
+                subscription.setMaximumPendingMessages(value);
+            }
+        }
+    }
+
     // Properties
     // -------------------------------------------------------------------------
     public DispatchPolicy getDispatchPolicy() {
@@ -94,7 +105,8 @@
     }
 
     /**
-     * Sets the policy used to determine which dead letter queue destination should be used
+     * Sets the policy used to determine which dead letter queue destination
+     * should be used
      */
     public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
         this.deadLetterStrategy = deadLetterStrategy;
@@ -105,14 +117,30 @@
     }
 
     /**
-     * Sets the number of hash buckets to use for the message group functionality. 
-     * This is only applicable to using message groups to parallelize processing of a queue
-     * while preserving order across an individual JMSXGroupID header value.
-     * This value sets the number of hash buckets that will be used (i.e. the maximum possible
concurrency).
+     * Sets the number of hash buckets to use for the message group
+     * functionality. This is only applicable to using message groups to
+     * parallelize processing of a queue while preserving order across an
+     * individual JMSXGroupID header value. This value sets the number of hash
+     * buckets that will be used (i.e. the maximum possible concurrency).
      */
     public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
         this.messageGroupHashBucketCount = messageGroupHashBucketCount;
     }
-    
-    
+
+    public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
+        return pendingMessageLimitStrategy;
+    }
+
+    /**
+     * Sets the strategy to calculate the maximum number of messages that are
+     * allowed to be pending on consumers (in addition to their prefetch sizes).
+     * 
+     * Once the limit is reached, non-durable topics can then start discarding
+     * old messages. This allows us to keep dispatching messages to slow
+     * consumers while not blocking fast consumers and discarding the messages
+     * oldest first.
+     */
+    public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy)
{
+        this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
+    }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java?rev=382459&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java
Thu Mar  2 09:56:21 2006
@@ -0,0 +1,50 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.TopicSubscription;
+
+/**
+ * This PendingMessageLimitStrategy sets the maximum pending message limit value to be
+ * a multiplier of the prefetch limit of the subscription.
+ * 
+ * @org.apache.xbean.XBean
+ * 
+ * @version $Revision$
+ */
+public class PrefetchRatePendingMessageLimitStrategy implements PendingMessageLimitStrategy
{
+
+    private double multiplier = 0.5;
+
+    public int getMaximumPendingMessageLimit(TopicSubscription subscription) {
+        int prefetchSize = subscription.getConsumerInfo().getPrefetchSize();
+        return (int) (prefetchSize * multiplier);
+    }
+
+    public double getMultiplier() {
+        return multiplier;
+    }
+
+    /**
+     * Sets the multiplier of the prefetch size which will be used to define the maximum
number of pending
+     * messages for non-durable topics before messages are discarded.
+     */
+    public void setMultiplier(double rate) {
+        this.multiplier = rate;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PrefetchRatePendingMessageLimitStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message