activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1459006 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Date Wed, 20 Mar 2013 19:20:34 GMT
Author: chirino
Date: Wed Mar 20 19:20:34 2013
New Revision: 1459006

URL: http://svn.apache.org/r1459006
Log:
Fixes APLO-313: Avoid blocking producers if consumers are not likely to catch up within a
few seconds.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1459006&r1=1459005&r2=1459006&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Mar 20 19:20:34 2013
@@ -878,6 +878,7 @@ class Queue(val router: LocalRouter, val
     // swap out messages.
     cur = entries.getHead.getNext
     var dropping_head_entries = is_topic_queue
+    var distance_from_last_prefetch = 0L
     while( cur!=null ) {
       val next = cur.getNext
       if ( dropping_head_entries ) {
@@ -897,7 +898,9 @@ class Queue(val router: LocalRouter, val
         if( cur.prefetched ) {
           // Prefteched entries need to get loaded..
           cur.load(consumer_swapped_in)
+          distance_from_last_prefetch = 0
         } else {
+
           // This is a non-prefetched entry.. entires ahead and behind the
           // consumer subscriptions.
           val loaded = cur.as_loaded
@@ -911,8 +914,17 @@ class Queue(val router: LocalRouter, val
               // about what gets swapped out..
 
               if (cur.memory_space eq producer_swapped_in ) {
-                // Entry will be used soon..
-                cur.load(producer_swapped_in)
+                // If we think we can catch up in seconds.. lets keep it in producer_swapped_in
to
+                // pause the producer.
+                val max_distance = delivery_rate * 2;
+                if( distance_from_last_prefetch < max_distance ) {
+                  // Looks like the entry will be used soon..
+                  cur.load(producer_swapped_in)
+                } else {
+                  // Does not look to be anywhere close to the consumer.. so get
+                  // rid of it asap.
+                  cur.swap(true)
+                }
               } else if ( cur.is_acquired ) {
                 // Entry was just used...
                 cur.load(consumer_swapped_in)
@@ -924,6 +936,8 @@ class Queue(val router: LocalRouter, val
               }
             }
           }
+
+          distance_from_last_prefetch += cur.size
         }
       }
       cur = next
@@ -971,12 +985,13 @@ class Queue(val router: LocalRouter, val
   }
 
   def swapped_out_size = queue_size - (producer_swapped_in.size + consumer_swapped_in.size)
+  var delivery_rate = 0
 
   def queue_maintenance:Unit = {
     var elapsed = System.currentTimeMillis-now
     now += elapsed
 
-    var delivery_rate = 0
+    delivery_rate = 0
     var avg_browser_delivery_rate = 0
     var avg_sub_stall_ms = 0L
 



Mime
View raw message