activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234518 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Date Sun, 22 Jan 2012 14:43:08 GMT
Author: chirino
Date: Sun Jan 22 14:43:08 2012
New Revision: 1234518

URL: http://svn.apache.org/viewvc?rev=1234518&view=rev
Log:
Be a little less eager about swapping out message entries.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1234518&r1=1234517&r2=1234518&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sun Jan 22 14:43:08 2012
@@ -199,7 +199,13 @@ class Queue(val router: LocalRouter, val
 
   var producer_counter = 0L
   var consumer_counter = 0L
+
   var consumers_keeping_up = true
+  var consumers_keeping_up_counter = 0
+
+  // This set to true if any consumer kept up within the
+  // last second.
+  var consumers_keeping_up_historically = false
 
   var individual_swapped_items = 0
 
@@ -558,7 +564,7 @@ class Queue(val router: LocalRouter, val
           entry.dispatch
         }
 
-        if( !consumers_keeping_up  ) {
+        if( !consumers_keeping_up_historically  ) {
           entry.swap(true)
         } else if( entry.as_loaded.is_acquired && persisted) {
           // If the message as dispatched and it's marked to get persisted anyways,
@@ -684,27 +690,49 @@ class Queue(val router: LocalRouter, val
     }
 
     // Set the prefetch flags
-    val was_keepingup = consumers_keeping_up
     consumers_keeping_up = false
     all_subscriptions.valuesIterator.foreach{ x=>
       x.refill_prefetch
     }
     consumers_keeping_up = consumers_keeping_up && delivery_rate > tune_fast_delivery_rate
+    if( consumers_keeping_up ) {
+      consumers_keeping_up_counter += 1
+      consumers_keeping_up_historically = true
+    }
 
     // swap out messages.
     cur = entries.getHead
     while( cur!=null ) {
       val next = cur.getNext
-      val loaded = cur.as_loaded
-      if( loaded!=null ) {
-        if( !cur.prefetched ) {
-          if( consumers_keeping_up && (loaded.space eq producer_swapped_in)) {
-            // don't move out. keeps producer mem maxed to slow down the producer
-          } else {
+      if( cur.prefetched ) {
+        // Prefteched entries need to get loaded..
+        cur.load(consumer_swapped_in)
+      } else {
+        // This is a non-prefetched entry.. entires ahead and behind the
+        // consumer subscriptions.
+        val loaded = cur.as_loaded
+        if( loaded!=null ) {
+          // It's in memory.. perhaps we need to swap it out..
+          if(!consumers_keeping_up_historically) {
+            // Swap out ASAP if consumers are not keeping up..
             cur.swap(true)
+          } else {
+            // Consumers seem to be keeping up.. so we have to be more selective
+            // about what gets swapped out..
+
+            if (cur.memory_space eq producer_swapped_in ) {
+              // Entry will be used soon..
+              cur.load(producer_swapped_in)
+            } else if ( cur.is_acquired ) {
+              // Entry was just used...
+              cur.load(consumer_swapped_in)
+//              cur.swap(false)
+            } else {
+              // Does not look to be anywhere close to the consumer.. so get
+              // rid of it asap.
+              cur.swap(true)
+            }
           }
-        } else {
-          cur.load(consumer_swapped_in)
         }
       }
       cur = next
@@ -760,6 +788,10 @@ class Queue(val router: LocalRouter, val
       var elapsed = System.currentTimeMillis-now
       now += elapsed
 
+      println("consumers_keeping_up_counter: "+consumers_keeping_up_counter)
+      consumers_keeping_up_historically = consumers_keeping_up_counter!=0
+      consumers_keeping_up_counter = 0
+      
       delivery_rate = 0L
 
       var consumer_stall_ms = 0L
@@ -1180,6 +1212,7 @@ class QueueEntry(val queue:Queue, val se
   def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
   def is_acquired = state.is_acquired
   def dispatch() = state.dispatch
+  def memory_space = state.memory_space
 
   // These methods may cause a change in the current state.
   def swap(asap:Boolean) = state.swap_out(asap)
@@ -1264,6 +1297,8 @@ class QueueEntry(val queue:Queue, val se
      */
     def swap_in(space:MemorySpace) = {}
 
+    def memory_space:MemorySpace = null
+
     /**
      * Triggers the entry to get swapped out if it's not already swapped.
      */
@@ -1354,7 +1389,9 @@ class QueueEntry(val queue:Queue, val se
 
     var acquirer:Subscription = _
     override def is_acquired = acquirer!=null
-    
+
+    override def memory_space = space
+
     var swapping_out = false
     var storing = false
 
@@ -1620,7 +1657,7 @@ class QueueEntry(val queue:Queue, val se
 
     queue.individual_swapped_items += 1
 
-    var swap_in_space:MemorySpace = _
+    var space:MemorySpace = _
 
     override def redelivery_count = _redeliveries
     override def redelivered = _redeliveries = ((_redeliveries+1).min(Short.MaxValue)).toShort
@@ -1633,24 +1670,26 @@ class QueueEntry(val queue:Queue, val se
     
     override def is_acquired = acquirer!=null
 
+    override def memory_space = space
+
     def label = {
       var rc = "swapped"
       if( is_acquired ) {
         rc += "|acquired"
       }
-      if( swap_in_space!=null ) {
+      if( space!=null ) {
         rc += "|swapping in"
       }
       rc
     }
 
-    override def toString = { "swapped:{ swapping_in: "+swap_in_space+", acquired:"+acquirer+",
size:"+size+"}" }
+    override def toString = { "swapped:{ swapping_in: "+space+", acquired:"+acquirer+", size:"+size+"}"
}
 
-    override def swap_in(space:MemorySpace) = {
-      if( swap_in_space==null ) {
+    override def swap_in(mem_space:MemorySpace) = {
+      if( this.space==null ) {
 //        trace("Start entry load of message seq: %s", seq)
         // start swapping in...
-        swap_in_space = space
+        space = mem_space
         queue.swapping_in_size += size
         queue.virtual_host.store.load_message(message_key, message_locator) { delivery =>
           // pass off to a source so it can aggregate multiple
@@ -1672,7 +1711,7 @@ class QueueEntry(val queue:Queue, val se
     }
 
     def swapped_in(messageRecord:MessageRecord) = {
-      if( swap_in_space!=null ) {
+      if( space!=null ) {
 //        debug("Loaded message seq: ", seq )
         queue.swapping_in_size -= size
 
@@ -1684,14 +1723,14 @@ class QueueEntry(val queue:Queue, val se
         delivery.storeLocator = messageRecord.locator
         delivery.redeliveries = redelivery_count
 
-        swap_in_space += delivery
+        space += delivery
 
         queue.swap_in_size_counter += size
         queue.swap_in_item_counter += 1
 
         queue.individual_swapped_items -= 1
-        state = new Loaded(delivery, true, swap_in_space)
-        swap_in_space = null
+        state = new Loaded(delivery, true, space)
+        space = null
       } else {
 //        debug("Ignoring store load of: ", messageKey)
       }
@@ -1699,8 +1738,8 @@ class QueueEntry(val queue:Queue, val se
 
 
     override def remove = {
-      if( swap_in_space!=null ) {
-        swap_in_space = null
+      if( space!=null ) {
+        space = null
         queue.swapping_in_size -= size
       }
       queue.individual_swapped_items -= 1
@@ -1710,8 +1749,8 @@ class QueueEntry(val queue:Queue, val se
     override def swap_range = {
       // You can't swap range an acquired entry.
       assert(!is_acquired)
-      if( swap_in_space!=null ) {
-        swap_in_space = null
+      if( space!=null ) {
+        space = null
         queue.swapping_in_size -= size
       }
       queue.individual_swapped_items -= 1
@@ -1749,7 +1788,7 @@ class QueueEntry(val queue:Queue, val se
       }
 
       if ( advancing.isEmpty ) {
-        if (swap_in_space==null && !parked.isEmpty) {
+        if (space==null && !parked.isEmpty) {
           // If we are not swapping in try to get a sub to prefetch us.
           parked.foreach(_.refill_prefetch)
         }
@@ -1759,7 +1798,7 @@ class QueueEntry(val queue:Queue, val se
         // The held back subs stay on this entry..
         parked = heldBack.toList
 
-        if (swap_in_space==null && !parked.isEmpty) {
+        if (space==null && !parked.isEmpty) {
           // If we are not swapping in try to get a sub to prefetch us.
           parked.foreach(_.refill_prefetch)
         }



Mime
View raw message