activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r641525 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Destination.java DestinationFilter.java PrefetchSubscription.java Queue.java Topic.java
Date Wed, 26 Mar 2008 19:56:31 GMT
Author: chirino
Date: Wed Mar 26 12:56:29 2008
New Revision: 641525

URL: http://svn.apache.org/viewvc?rev=641525&view=rev
Log:
When messages expire take them out of the paged in list so that we can dispatch more messages
to other consumers.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Wed Mar 26 12:56:29 2008
@@ -111,4 +111,6 @@
      * @param value
      */
     public void setLazyDispatch(boolean value);
+
+    void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference node);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Wed Mar 26 12:56:29 2008
@@ -202,4 +202,8 @@
     public void setLazyDispatch(boolean value) {
       next.setLazyDispatch(value);        
     }
+
+    public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference node) {
+        next.messageExpired(context, prefetchSubscription, node);        
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Mar 26 12:56:29 2008
@@ -246,12 +246,17 @@
                 // the
                 // acknowledgment.
                 int index = 0;
-                for (Iterator<MessageReference> iter = dispatched.iterator(); iter
-                        .hasNext(); index++) {
+                for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();
index++) {
                     final MessageReference node = iter.next();
+                    if( node.isExpired() ) {
+                        broker.messageExpired(getContext(), node);
+                        node.getRegionDestination().messageExpired(context, this, node);
+                        node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                        dispatched.remove(node);
+                    }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
-                        prefetchExtension = Math.max(prefetchExtension,
-                                index + 1);
+                        prefetchExtension = Math.max(prefetchExtension, index + 1);
                         callDispatchMatched = true;
                         break;
                     }
@@ -471,12 +476,11 @@
 
                                 // Message may have been sitting in the pending
                                 // list a while waiting for the consumer to ak the message.
-                                if (node != QueueMessageReference.NULL_MESSAGE
-                                        && node.isExpired()) {
+                                if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired())
{
                                     broker.messageExpired(getContext(), node);
-                                    dequeueCounter++;
                                     //increment number to dispatch
                                     numberToDispatch++;
+                                    node.getRegionDestination().messageExpired(context, this,
node);
                                     continue;
                                 }
                                 dispatch(node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar 26 12:56:29 2008
@@ -1003,7 +1003,17 @@
         }
         wakeup();
     }
-
+    
+    public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference reference) {
+        ((QueueMessageReference)reference).drop();
+        // Not sure.. perhaps we should forge an ack to remove the message from the store.
+        // acknowledge(context, sub, ack, reference);
+        destinationStatistics.getMessages().decrement();
+        synchronized(pagedInMessages) {
+            pagedInMessages.remove(reference.getMessageId());
+        }
+        wakeup();
+    }
     
     protected ConnectionContext createConnectionContext() {
         ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
@@ -1037,7 +1047,7 @@
         dispatchLock.lock();
         try{
         
-            int toPageIn = getMaxPageSize() - pagedInMessages.size();
+            int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount())
- pagedInMessages.size();
             if (isLazyDispatch()&& !force) {
              // Only page in the minimum number of messages which can be dispatched immediately.
              toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Mar 26 12:56:29 2008
@@ -631,4 +631,9 @@
         }
     }
 
+    public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription,
MessageReference node) {
+        // TODO Auto-generated method stub
+        
+    }
+
 }



Mime
View raw message