activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r382421 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Date Thu, 02 Mar 2006 15:56:31 GMT
Author: jstrachan
Date: Thu Mar  2 07:56:30 2006
New Revision: 382421

URL: http://svn.apache.org/viewcvs?rev=382421&view=rev
Log:
added the ability to discard old messages for non-durable topics if a maximum number of pending
messages is reached

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=382421&r1=382420&r2=382421&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Mar  2 07:56:30 2006
@@ -42,6 +42,7 @@
     final protected UsageManager usageManager;
     protected int dispatched=0;
     protected int delivered=0;
+    private int maximumPendingMessages = 0;
     
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info,
UsageManager usageManager) throws InvalidSelectorException {
         super(broker,context, info);
@@ -50,11 +51,20 @@
 
     public void add(MessageReference node) throws InterruptedException, IOException {
         node.incrementReferenceCount();
-        if( !isFull() && !isSlaveBroker() ) {
+        if( !isFull() && !isSlaveBroker()) {
+            // TODO - if we have already dispatched too many messages to this slow consumer
+            // should we avoid dispatching and just discard old messages as shown below
             dispatch(node);
         } else {
-            synchronized(matched){
-            matched.addLast(node);
+            synchronized (matched) {
+                matched.addLast(node);
+                if (maximumPendingMessages > 0) {
+                    // lets discard old messages as we are a slow consumer
+                    while (matched.size() > maximumPendingMessages) {
+                        MessageReference oldMessage = (MessageReference) matched.removeFirst();
+                        oldMessage.decrementReferenceCount();
+                    }
+                }
             }
         }        
     }
@@ -122,6 +132,18 @@
         return delivered;
     }
     
+    public int getMaximumPendingMessages() {
+        return maximumPendingMessages;
+    }
+
+    /**
+     * Sets the maximum number of pending messages that can be matched against this consumer
+     * before old messages are discarded.
+     */
+    public void setMaximumPendingMessages(int maximumPendingMessages) {
+        this.maximumPendingMessages = maximumPendingMessages;
+    }
+
     private boolean isFull() {
         return dispatched-delivered >= info.getPrefetchSize();
     }



Mime
View raw message