activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1374576 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-website/src/documentation/
Date Sat, 18 Aug 2012 13:30:49 GMT
Author: chirino
Date: Sat Aug 18 13:30:49 2012
New Revision: 1374576

URL: http://svn.apache.org/viewvc?rev=1374576&view=rev
Log:
Better producer rate throttling.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1374576&r1=1374575&r2=1374576&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sat Aug 18 13:30:49 2012
@@ -881,9 +881,10 @@ class Queue(val router: LocalRouter, val
 
   }
 
-  var delivery_rate = 0L
   def swapped_out_size = queue_size - (producer_swapped_in.size + consumer_swapped_in.size)
 
+  var delivery_rate = 0
+
   def queue_maintenance:Unit = {
     var elapsed = System.currentTimeMillis-now
     now += elapsed
@@ -891,8 +892,7 @@ class Queue(val router: LocalRouter, val
     consumers_keeping_up_historically = consumers_keeping_up_counter!=0
     consumers_keeping_up_counter = 0
 
-    delivery_rate = 0L
-
+    delivery_rate = 0
     var consumer_stall_ms = 0L
     var load_stall_ms = 0L
 
@@ -906,18 +906,23 @@ class Queue(val router: LocalRouter, val
     }
 
     val rate_adjustment = elapsed.toFloat / 1000.toFloat
-    delivery_rate  = (delivery_rate / rate_adjustment).toLong
+    delivery_rate  = (delivery_rate / rate_adjustment).toInt
 
     val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
 
     // Figure out what the max enqueue rate should be.
     max_enqueue_rate = Int.MaxValue
-    if( tune_fast_delivery_rate>=0 && tune_catchup_enqueue_rate>=0 &&
delivery_rate>tune_fast_delivery_rate && swapped_out_size > 0 && stall_ratio
< 1.0 ) {
-      max_enqueue_rate = tune_catchup_enqueue_rate
+    if( tune_fast_delivery_rate>=0 && delivery_rate>tune_fast_delivery_rate
&& swapped_out_size > 0 && stall_ratio < 10.0 ) {
+      if( tune_catchup_enqueue_rate >= 0 ) {
+        max_enqueue_rate = tune_catchup_enqueue_rate
+      } else {
+        max_enqueue_rate = delivery_rate / 2;
+      }
     }
     if(tune_max_enqueue_rate >=0 ) {
       max_enqueue_rate = max_enqueue_rate.min(tune_max_enqueue_rate)
     }
+
     if( max_enqueue_rate < Int.MaxValue ) {
       if(enqueues_remaining==null) {
         enqueues_remaining = new LongCounter()

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1374576&r1=1374575&r2=1374576&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
Sat Aug 18 13:30:49 2012
@@ -46,7 +46,7 @@ class Subscription(val queue:Queue, val 
   var acquired_size = 0L
   def acquired_count = acquired.size()
 
-  var enqueue_size_per_interval = 0L
+  var enqueue_size_per_interval = 0
   var enqueue_size_at_last_interval = 0L
 
   var consumer_stall_ms = 0L
@@ -238,7 +238,7 @@ class Subscription(val queue:Queue, val 
   }
 
   def adjust_prefetch_size = {
-    enqueue_size_per_interval = session.enqueue_size_counter - enqueue_size_at_last_interval
+    enqueue_size_per_interval = (session.enqueue_size_counter - enqueue_size_at_last_interval).toInt
     enqueue_size_at_last_interval = session.enqueue_size_counter
 
     if(consumer_stall_start !=0) {

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1374576&r1=1374575&r2=1374576&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Sat Aug
18 13:30:49 2012
@@ -418,12 +418,10 @@ memory.  Defaults to true.
   rate to match the consumption rate if the consumers are at the 
   tail of the queue.  Defaults to `1M`           
   
-* `catchup_enqueue_rate`:  If set, and the the current delivery
-   rate is exceeding the configured value of `fast_delivery_rate` and 
-   the consumers are spending more time loading from the store than 
-   delivering, then the enqueue rate will be throttled to the
-   specified value so that the consumers can catch up and reach the 
-   tail of the queue.
+* `catchup_enqueue_rate`:  The rate that producers will be throttled to
+   when queue consumers are considered to be fast.  This allows consumers 
+   to catch up and reach the tail of the queue.  If not set, then it is
+   computed to be 1/2 the current consumer delivery rate.
 
 * `max_enqueue_rate`: The maximum enqueue rate of the queue.  Producers
   will be flow controlled once this enqueue rate is reached.  If not set



Mime
View raw message