activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r394707 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: AbstractRegion.java DurableTopicSubscription.java PrefetchSubscription.java QueueSubscription.java Subscription.java TopicSubscription.java
Date Mon, 17 Apr 2006 15:32:30 GMT
Author: chirino
Date: Mon Apr 17 08:32:28 2006
New Revision: 394707

URL: http://svn.apache.org/viewcvs?rev=394707&view=rev
Log:
If a topic consumer was hung up, it would eventually stop the producers since the broker memory
limit would be reached.
The problem was if the consumer was killed, the broker memory would not get freed up and so
the producer would remain blocked.
When a subscription is removed, the memory of the pending messages are now released.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
Mon Apr 17 08:32:28 2006
@@ -180,9 +180,11 @@
         }
         
         destroySubscription(sub);
+        
     }
 
     protected void destroySubscription(Subscription sub) {        
+        sub.destroy();
     }
 
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info)
throws Exception {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Mon Apr 17 08:32:28 2006
@@ -181,5 +181,24 @@
     public SubscriptionKey getSubscriptionKey() {
         return subscriptionKey;
     }
+    
+    /**
+     * Release any references that we are holding.
+     */
+    synchronized public void destroy() {
+        
+        for (Iterator iter = pending.iterator(); iter.hasNext();) {
+            MessageReference node = (MessageReference) iter.next();
+            node.decrementReferenceCount();
+        }
+        pending.clear();
+        
+        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+            MessageReference node = (MessageReference) iter.next();
+            node.decrementReferenceCount();
+        }
+        dispatched.clear();
+        
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Apr 17 08:32:28 2006
@@ -372,5 +372,4 @@
     protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference
node)
                     throws IOException{}
 
-
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Mon Apr 17 08:32:28 2006
@@ -28,6 +28,7 @@
 import javax.jms.InvalidSelectorException;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 public class QueueSubscription extends PrefetchSubscription implements LockOwner {
     
@@ -184,4 +185,9 @@
         }
     }
     
+    /**
+     */
+    synchronized public void destroy() {        
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Mon Apr 17 08:32:28 2006
@@ -170,4 +170,10 @@
      *
      */
     public void optimizePrefetch();
+    
+    /**
+     * Called when the subscription is destroyed.
+     */
+    public void destroy();
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Apr 17 08:32:28 2006
@@ -53,7 +53,8 @@
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
     private int discarded = 0;
     private final Object matchedListMutex=new Object();
-    long enqueueCounter;
+    private final AtomicLong enqueueCounter = new AtomicLong(0);
+    private final AtomicLong dequeueCounter = new AtomicLong(0);
     
     public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager
usageManager)
                     throws InvalidSelectorException{
@@ -62,8 +63,10 @@
     }
 
     public void add(MessageReference node) throws InterruptedException,IOException{
-        enqueueCounter++;
+        
+        enqueueCounter.incrementAndGet();        
         node.incrementReferenceCount();
+        
         if(!isFull()&&!isSlaveBroker()){
             optimizePrefetch();
             // if maximumPendingMessages is set we will only discard messages which
@@ -131,6 +134,7 @@
     }
 
     synchronized public void acknowledge(final ConnectionContext context,final MessageAck
ack) throws Exception{
+        
         // Handle the standard acknowledgment case.
         boolean wasFull=isFull();
         if(ack.isStandardAck()||ack.isPoisonAck()){
@@ -138,11 +142,13 @@
                 delivered.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new Synchronization(){
                     public void afterCommit() throws Exception{
+                        dequeueCounter.addAndGet(ack.getMessageCount());
                         dispatched.addAndGet(-ack.getMessageCount());
                         delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
                     }
                 });
             }else{
+                dequeueCounter.addAndGet(ack.getMessageCount());
                 dispatched.addAndGet(-ack.getMessageCount());
                 delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
             }
@@ -178,14 +184,13 @@
 	}
 
 	public long getEnqueueCounter() {
-		return enqueueCounter;
+		return enqueueCounter.get();
 	}
+    
     public long getDequeueCounter(){
-        return delivered.get();
+        return dequeueCounter.get();
     }
 
-
-
     /**
      * @return the number of messages discarded due to being a slow consumer
      */
@@ -313,6 +318,16 @@
     public String toString(){
         return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
                         +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+",
matched="+matched()+", discarded="+discarded();
+    }
+
+    public void destroy() {
+        synchronized(matchedListMutex){
+            for (Iterator iter = matched.iterator(); iter.hasNext();) {
+                MessageReference node = (MessageReference) iter.next();
+                node.decrementReferenceCount();
+            }
+            matched.clear();
+        }
     }
 
 }



Mime
View raw message