activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1391410 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala Subscription.scala
Date Fri, 28 Sep 2012 11:50:00 GMT
Author: chirino
Date: Fri Sep 28 11:50:00 2012
New Revision: 1391410

URL: http://svn.apache.org/viewvc?rev=1391410&view=rev
Log:
Improving the slow consumer detection.  We were getting too many false positives.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1391410&r1=1391409&r2=1391410&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Sep 28 11:50:00 2012
@@ -186,9 +186,6 @@ class Queue(val router: LocalRouter, val
   var producer_counter = 0L
   var consumer_counter = 0L
 
-  var consumers_keeping_up = true
-  var consumers_keeping_up_counter = 0
-
   // This set to true if any consumer kept up within the
   // last second.
   var consumers_keeping_up_historically = false
@@ -227,7 +224,7 @@ class Queue(val router: LocalRouter, val
     tune_persistent = virtual_host.store !=null && update.persistent.getOrElse(true)
     tune_swap = tune_persistent && update.swap.getOrElse(true)
     tune_swap_range_size = update.swap_range_size.getOrElse(10000)
-    tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,"1M")
+    tune_fast_delivery_rate = mem_size(update.fast_delivery_rate,"512k")
     tune_catchup_enqueue_rate = mem_size(update.catchup_enqueue_rate,"-1")
     tune_max_enqueue_rate = mem_size(update.max_enqueue_rate,"-1")
     tune_quota = mem_size(update.quota,"-1")
@@ -546,6 +543,7 @@ class Queue(val router: LocalRouter, val
       func
     } finally {
       if( was_full && !messages.full ) {
+        messages.stall_check
         messages.refiller.run
       }
     }
@@ -559,7 +557,7 @@ class Queue(val router: LocalRouter, val
   def is_topic_queue = resource_kind eq TopicQueueKind
 
   object messages extends Sink[(Session[Delivery], Delivery)] {
-
+    def stall_check = {}
     var refiller: Task = null
 
     def is_quota_exceeded = (tune_quota >= 0 && queue_size > tune_quota) ||
(tune_quota_messages >= 0 && queue_items > tune_quota_messages)
@@ -713,6 +711,7 @@ class Queue(val router: LocalRouter, val
         if( full ) {
           trigger_swap
         }
+        stall_check
         true
       }
     }
@@ -809,15 +808,9 @@ class Queue(val router: LocalRouter, val
     }
 
     // Set the prefetch flags
-    consumers_keeping_up = false
     all_subscriptions.valuesIterator.foreach{ x=>
       x.refill_prefetch
     }
-    consumers_keeping_up = consumers_keeping_up && delivery_rate > tune_fast_delivery_rate
-    if( consumers_keeping_up ) {
-      consumers_keeping_up_counter += 1
-      consumers_keeping_up_historically = true
-    }
 
     // swap out messages.
     cur = entries.getHead.getNext
@@ -908,6 +901,7 @@ class Queue(val router: LocalRouter, val
     }
     
     if(!messages.full) {
+      messages.stall_check
       messages.refiller.run
     }
 
@@ -915,36 +909,50 @@ class Queue(val router: LocalRouter, val
 
   def swapped_out_size = queue_size - (producer_swapped_in.size + consumer_swapped_in.size)
 
-  var delivery_rate = 0
-
   def queue_maintenance:Unit = {
     var elapsed = System.currentTimeMillis-now
     now += elapsed
 
-    consumers_keeping_up_historically = consumers_keeping_up_counter!=0
-    consumers_keeping_up_counter = 0
-
-    delivery_rate = 0
-    var consumer_stall_ms = 0L
-    var load_stall_ms = 0L
+    var delivery_rate = 0
+    var avg_browser_delivery_rate = 0
+    var avg_sub_stall_ms = 0L
 
     all_subscriptions.values.foreach{ sub=>
-      val (cs, ls) = sub.adjust_prefetch_size
-      consumer_stall_ms += cs
-      load_stall_ms += ls
-      if(!sub.browser) {
+      sub.adjust_prefetch_size
+      avg_sub_stall_ms += sub.reset_stall_timer
+      if(sub.browser) {
+        avg_browser_delivery_rate += sub.avg_enqueue_size_per_interval
+      } else {
         delivery_rate += sub.avg_enqueue_size_per_interval
       }
     }
 
+    if ( !all_subscriptions.isEmpty ) {
+      avg_sub_stall_ms = avg_sub_stall_ms / all_subscriptions.size
+      avg_browser_delivery_rate = avg_browser_delivery_rate / all_subscriptions.size
+    }
+
+    // add the browser delivery rate in as an average.
+    delivery_rate += avg_browser_delivery_rate
+
     val rate_adjustment = elapsed.toFloat / 1000.toFloat
     delivery_rate  = (delivery_rate / rate_adjustment).toInt
 
-    val stall_ratio = ((consumer_stall_ms*100)+1).toFloat / ((load_stall_ms*100)+1).toFloat
+    consumers_keeping_up_historically = (
+      // No brainer.. we see consumers are fast..
+      ( delivery_rate > tune_fast_delivery_rate )
+      ||
+      // also if the queue size is small and there's not much
+      // much consumer stalling happening.
+      ( queue_size < delivery_rate && avg_sub_stall_ms < 200 )
+    )
+
+//    println("delivery_rate:%d, tune_fast_delivery_rate: %d, queue_size: %d, avg_consumer_stall_ms:
%d, consumers_keeping_up_historically: %s".
+//            format(delivery_rate, tune_fast_delivery_rate, queue_size, avg_sub_stall_ms,
consumers_keeping_up_historically))
 
     // Figure out what the max enqueue rate should be.
     max_enqueue_rate = Int.MaxValue
-    if( tune_fast_delivery_rate>=0 && delivery_rate>tune_fast_delivery_rate
&& swapped_out_size > 0 && stall_ratio < 10.0 ) {
+    if( consumers_keeping_up_historically && swapped_out_size > 0 ) {
       if( tune_catchup_enqueue_rate >= 0 ) {
         max_enqueue_rate = tune_catchup_enqueue_rate
       } else {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1391410&r1=1391409&r2=1391410&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
Fri Sep 28 11:50:00 2012
@@ -34,7 +34,7 @@ object Subscription extends Log
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer
with Dispatched {
+class Subscription(val queue:Queue, val consumer:DeliveryConsumer) extends DeliveryProducer
with Dispatched with StallCheckSupport {
   import Subscription._
 
   def dispatch_queue = queue.dispatch_queue
@@ -63,12 +63,6 @@ class Subscription(val queue:Queue, val 
 
   var enqueue_size_at_last_interval = 0L
 
-  var consumer_stall_ms = 0L
-  var load_stall_ms = 0L
-
-  var consumer_stall_start = 0L
-  var load_stall_start = 0L
-
   var started_at = Broker.now
   var total_ack_count = 0L
   var total_ack_size = 0L
@@ -117,7 +111,7 @@ class Subscription(val queue:Queue, val 
     session = consumer.connect(this)
     session.refiller = dispatch_queue.runnable {
       if(session!=null) {
-        check_consumer_stall
+        stall_check
       }
       if( pos!=null ) {
         pos.task.run
@@ -190,7 +184,6 @@ class Subscription(val queue:Queue, val 
   def advance(value:QueueEntry):Unit = {
     assert(value!=null)
     pos = value
-    check_load_stall
     if( tail_parked ) {
         if(consumer.close_on_drain) {
           close
@@ -207,7 +200,6 @@ class Subscription(val queue:Queue, val 
     pos -= this
     value ::= this
     pos = value
-    check_load_stall
     queue.dispatch_queue << value.task // queue up the entry to get dispatched..
   }
 
@@ -218,59 +210,23 @@ class Subscription(val queue:Queue, val 
 
   def offer(delivery:Delivery) = try {
     assert(delivery.seq > 0 )
-    session.offer(delivery)
+    if( session.full ) {
+      false
+    } else {
+      val accepted = session.offer(delivery)
+      assert(accepted)
+      true
+    }
   } finally {
-    check_consumer_stall
+    stall_check
   }
 
   def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
 
-  def check_load_stall = {
-    if ( pos.is_swapped_or_swapped_range ) {
-      if(load_stall_start==0) {
-        load_stall_start = queue.virtual_host.broker.now
-      }
-    } else {
-      if(load_stall_start!=0) {
-        load_stall_ms += queue.virtual_host.broker.now - load_stall_start
-        load_stall_start = 0
-      }
-    }
-  }
-
-  def check_consumer_stall = {
-    if ( full ) {
-      if(consumer_stall_start==0) {
-        consumer_stall_start = queue.virtual_host.broker.now
-      }
-    } else {
-      if( consumer_stall_start!=0 ) {
-        consumer_stall_ms += queue.virtual_host.broker.now - consumer_stall_start
-        consumer_stall_start = 0
-      }
-    }
-  }
 
   def adjust_prefetch_size = {
     enqueue_size_per_interval += (session.enqueue_size_counter - enqueue_size_at_last_interval).toInt
     enqueue_size_at_last_interval = session.enqueue_size_counter
-
-    if(consumer_stall_start !=0) {
-      val now = queue.virtual_host.broker.now
-      consumer_stall_ms += now - consumer_stall_start
-      consumer_stall_start = now
-    }
-
-    if(load_stall_start !=0) {
-      val now = queue.virtual_host.broker.now
-      load_stall_ms += now - load_stall_start
-      load_stall_start = now
-    }
-
-    val rc = (consumer_stall_ms, load_stall_ms)
-    consumer_stall_ms = 0
-    load_stall_ms = 0
-    rc
   }
 
   def refill_prefetch = {
@@ -295,12 +251,6 @@ class Subscription(val queue:Queue, val 
       }
       cursor = next
     }
-
-    // If we hit the tail or the producer swap in area.. let the queue know we are keeping
up.
-    if( !queue.consumers_keeping_up && (cursor == null || (cursor.as_loaded!=null
&& (cursor.as_loaded.space eq queue.producer_swapped_in))) ) {
-      queue.consumers_keeping_up = true
-    }
-
   }
 
   class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
@@ -391,3 +341,35 @@ class Subscription(val queue:Queue, val 
   }
 
 }
+
+trait StallCheckSupport {
+  def full:Boolean
+
+  var stall_start = Broker.now
+  var stall_ms = 0L
+
+  def reset_stall_timer = {
+    if( stall_start!=0 ) {
+      val now = Broker.now
+      stall_ms += now - stall_start
+      stall_start = now
+    }
+    val rc = stall_ms
+    stall_ms = 0
+    rc
+  }
+
+  def stall_check = {
+    if ( full ) {
+      if(stall_start==0) {
+        stall_start = Broker.now
+      }
+    } else {
+      if( stall_start!=0 ) {
+        stall_ms += Broker.now - stall_start
+        stall_start = 0
+      }
+    }
+  }
+
+}



Mime
View raw message