activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233767 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Date Fri, 20 Jan 2012 04:54:23 GMT
Author: chirino
Date: Fri Jan 20 04:54:22 2012
New Revision: 1233767

URL: http://svn.apache.org/viewvc?rev=1233767&view=rev
Log:
Fixes APLO-130 : Allow acquired queue entries to get swapped. This is especially handy if
the entry was already persisted anyways.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1233767&r1=1233766&r2=1233767&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Jan 20 04:54:22 2012
@@ -36,8 +36,6 @@ import org.apache.activemq.apollo.dto._
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
 
-  val PREFTCH_LOAD_FLAG = 1.toByte
-
   class MemorySpace {
     var items = 0
     var size = 0
@@ -526,13 +524,13 @@ class Queue(val router: LocalRouter, val
 
         val entry = tail_entry
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
-        val queueDelivery = delivery.copy
-        queueDelivery.sender = destination_dto
-        queueDelivery.seq = entry.seq
-        entry.init(queueDelivery)
+        val queue_delivery = delivery.copy
+        queue_delivery.sender = destination_dto
+        queue_delivery.seq = entry.seq
+        entry.init(queue_delivery)
         
         if( tune_persistent ) {
-          queueDelivery.uow = delivery.uow
+          queue_delivery.uow = delivery.uow
         }
 
         entries.addLast(entry)
@@ -544,7 +542,8 @@ class Queue(val router: LocalRouter, val
         enqueue_remaining_take(entry.size)
 
         // Do we need to do a persistent enqueue???
-        if (queueDelivery.uow != null) {
+        val persisted = queue_delivery.uow != null
+        if (persisted) {
           entry.as_loaded.store
         }
 
@@ -553,14 +552,18 @@ class Queue(val router: LocalRouter, val
           entry.dispatch
         }
 
-        if( !(consumers_keeping_up || entry.as_loaded.acquired) ) {
+        if( !consumers_keeping_up  ) {
           entry.swap(true)
+        } else if( entry.as_loaded.is_acquired && persisted) {
+          // If the message as dispatched and it's marked to get persisted anyways,
+          // then it's ok if it falls out of memory since we won't need to load it again.
+          entry.swap(false)
         }
-
+        
         // release the store batch...
-        if (queueDelivery.uow != null) {
-          queueDelivery.uow.release
-          queueDelivery.uow = null
+        if (persisted) {
+          queue_delivery.uow.release
+          queue_delivery.uow = null
         }
 
         
@@ -645,7 +648,7 @@ class Queue(val router: LocalRouter, val
     while( cur!=null ) {
 
       // reset the prefetch flags and handle expiration...
-      cur.prefetch_flags = 0
+      cur.prefetched = false
       val next = cur.getNext
 
       // handle expiration...
@@ -655,13 +658,16 @@ class Queue(val router: LocalRouter, val
             // load the range to expire the messages in it.
             cur.load(null)
           case x:QueueEntry#Swapped =>
-            // remove the expired swapped message.
-            expired(cur)
-            x.remove
+            // remove the expired message if it has not been
+            // acquired.
+            if( !x.is_acquired ) {
+              expired(cur)
+              x.remove
+            }
           case x:QueueEntry#Loaded =>
             // remove the expired message if it has not been
             // acquired.
-            if( !x.acquired ) {
+            if( !x.is_acquired ) {
               expired(cur)
               x.remove
             }
@@ -685,7 +691,7 @@ class Queue(val router: LocalRouter, val
       val next = cur.getNext
       val loaded = cur.as_loaded
       if( loaded!=null ) {
-        if( cur.prefetch_flags==0 && !loaded.acquired  ) {
+        if( !cur.prefetched ) {
           if( consumers_keeping_up && (loaded.space eq producer_swapped_in)) {
             // don't move out. keeps producer mem maxed to slow down the producer
           } else {
@@ -714,7 +720,7 @@ class Queue(val router: LocalRouter, val
         // from the entry list.
         val next = cur.getNext
 
-        if( cur.prefetch_flags!=0 ) {
+        if( cur.prefetched ) {
           distance_from_sub = 0
         } else {
           distance_from_sub += 1
@@ -722,7 +728,7 @@ class Queue(val router: LocalRouter, val
             cur.getPrevious.as_swapped_range.combineNext
             combine_counter += 1
           } else {
-            if( cur.is_swapped && distance_from_sub > tune_swap_range_size ) {
+            if( cur.is_swapped && !cur.is_acquired && distance_from_sub >
tune_swap_range_size ) {
               cur.swapped_range
               combine_counter += 1
             }
@@ -1036,14 +1042,14 @@ class QueueEntry(val queue:Queue, val se
   // Subscriptions waiting to dispatch this entry.
   var parked:List[Subscription] = Nil
 
-  // subscriptions will set this to non-zero if they are interested
+  // subscriptions will set this to true if they are interested
   // in the entry.
-  var prefetch_flags:Byte = 0
+  var prefetched = false
 
   // The current state of the entry: Head | Tail | Loaded | Swapped | SwappedRange
   var state:EntryState = new Tail
 
-  def is_prefetched = prefetch_flags == 1
+  def is_prefetched = prefetched
 
   def <(value:QueueEntry) = this.seq < value.seq
   def <=(value:QueueEntry) = this.seq <= value.seq
@@ -1066,7 +1072,7 @@ class QueueEntry(val queue:Queue, val se
 
   def init(qer:QueueEntryRecord):QueueEntry = {
     val locator = new AtomicReference[Array[Byte]](Option(qer.message_locator).map(_.toByteArray).getOrElse(null))
-    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration, qer.redeliveries)
+    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration, qer.redeliveries,
null)
     this
   }
 
@@ -1132,7 +1138,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   override def toString = {
-    "{seq: "+seq+", prefetch_flags: "+prefetch_flags+", value: "+state+", subscriptions:
"+parked+"}"
+    "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+parked+"}"
   }
 
   /////////////////////////////////////////////////////
@@ -1162,10 +1168,11 @@ class QueueEntry(val queue:Queue, val se
   def count = state.count
   def size = state.size
   def expiration = state.expiration
-  def redeliveries = state.redeliveries
+  def redelivery_count = state.redelivery_count
   def redelivered = state.redelivered
   def messageKey = state.message_key
   def is_swapped_or_swapping_out = state.is_swapped_or_swapping_out
+  def is_acquired = state.is_acquired
   def dispatch() = state.dispatch
 
   // These methods may cause a change in the current state.
@@ -1178,7 +1185,7 @@ class QueueEntry(val queue:Queue, val se
   def can_combine_with_prev = {
     getPrevious !=null &&
       getPrevious.is_swapped_range &&
-        ( is_swapped || is_swapped_range ) &&
+        ( (is_swapped && !is_acquired) || is_swapped_range ) &&
           (getPrevious.count + count  < queue.tune_swap_range_size)
   }
 
@@ -1205,7 +1212,7 @@ class QueueEntry(val queue:Queue, val se
     /**
      * When the entry expires or 0 if it does not expire.
      */
-    def redeliveries:Short = throw new UnsupportedOperationException()
+    def redelivery_count:Short = throw new UnsupportedOperationException()
 
     /**
      * Called to increment the redelivery counter
@@ -1237,6 +1244,11 @@ class QueueEntry(val queue:Queue, val se
     def dispatch() = false
 
     /**
+     * Is the entry acquired by a subscription.
+     */
+    def is_acquired = false
+
+    /**
      * @returns true if the entry is either swapped or swapping.
      */
     def is_swapped_or_swapping_out = false
@@ -1255,7 +1267,7 @@ class QueueEntry(val queue:Queue, val se
 
     /**
      * Removes the entry from the queue's linked list of entries.  This gets called
-     * as a result of an aquired ack.
+     * as a result of an acquired ack.
      */
     def remove:Unit = {
       // advance subscriptions that were on this entry..
@@ -1334,7 +1346,9 @@ class QueueEntry(val queue:Queue, val se
 
     assert( delivery!=null, "delivery cannot be null")
 
-    var acquired = false
+    var acquirer:Subscription = _
+    override def is_acquired = acquirer!=null
+    
     var swapping_out = false
     var storing = false
 
@@ -1343,8 +1357,8 @@ class QueueEntry(val queue:Queue, val se
 
     def label = {
       var rc = "loaded"
-      if( acquired ) {
-        rc += "|aquired"
+      if( is_acquired ) {
+        rc += "|acquired"
       }
       if( swapping_out ) {
         rc += "|swapping out"
@@ -1352,14 +1366,14 @@ class QueueEntry(val queue:Queue, val se
       rc
     }
 
-    override def toString = { "loaded:{ stored: "+stored+", swapping_out: "+swapping_out+",
acquired: "+acquired+", size:"+size+"}" }
+    override def toString = { "loaded:{ stored: "+stored+", swapping_out: "+swapping_out+",
acquired: "+acquirer+", size:"+size+"}" }
 
     override def count = 1
     override def size = delivery.size
     override def expiration = delivery.message.expiration
     override def message_key = delivery.storeKey
     override def message_locator = delivery.storeLocator
-    override def redeliveries = delivery.redeliveries
+    override def redelivery_count = delivery.redeliveries
 
     override def redelivered = delivery.redeliveries = ((delivery.redeliveries+1).min(Short.MaxValue)).toShort
 
@@ -1429,6 +1443,7 @@ class QueueEntry(val queue:Queue, val se
       stored = true
       delivery.uow = null
       if( swapping_out ) {
+
         swapping_out = false
         space -= delivery
         queue.swapping_out_size-=size
@@ -1436,7 +1451,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swap_out_size_counter += size
         queue.swap_out_item_counter += 1
 
-        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redeliveries)
+        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count,
acquirer)
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
@@ -1483,7 +1498,7 @@ class QueueEntry(val queue:Queue, val se
 
       queue.assert_executing
 
-      if( !acquired && expiration != 0 && expiration <= queue.now ) {
+      if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
         queue.expired(entry)
         remove
         return true
@@ -1515,7 +1530,7 @@ class QueueEntry(val queue:Queue, val se
           }
 
         } else {
-          if( acquired ) {
+          if( is_acquired ) {
             // advance: another sub already acquired this entry..
             advancing += sub
           } else {
@@ -1539,7 +1554,7 @@ class QueueEntry(val queue:Queue, val se
                 } else {
                   // advance: accepted...
                   acquiringSub = sub
-                  acquired = true
+                  acquirer = sub
 
                   val acquiredQueueEntry = sub.acquire(entry)
                   val acquiredDelivery = delivery.copy
@@ -1592,13 +1607,13 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Array[Byte]],
override val size:Int, override val expiration:Long, var _redeliveries:Short) extends EntryState
{
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Array[Byte]],
override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription)
extends EntryState {
 
     queue.individual_swapped_items += 1
 
     var swap_in_space:MemorySpace = _
 
-    override def redeliveries = _redeliveries
+    override def redelivery_count = _redeliveries
     override def redelivered = _redeliveries = ((_redeliveries+1).min(Short.MaxValue)).toShort
 
     override def count = 1
@@ -1606,15 +1621,21 @@ class QueueEntry(val queue:Queue, val se
     override def as_swapped = this
 
     override def is_swapped_or_swapping_out = true
+    
+    override def is_acquired = acquirer!=null
 
     def label = {
       var rc = "swapped"
+      if( is_acquired ) {
+        rc += "|acquired"
+      }
       if( swap_in_space!=null ) {
         rc += "|swapping in"
       }
       rc
     }
-    override def toString = { "swapped:{ swapping_in: "+swap_in_space+", size:"+size+"}"
}
+
+    override def toString = { "swapped:{ swapping_in: "+swap_in_space+", acquired:"+acquirer+",
size:"+size+"}" }
 
     override def swap_in(space:MemorySpace) = {
       if( swap_in_space==null ) {
@@ -1652,7 +1673,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
         delivery.storeLocator = messageRecord.locator
-        delivery.redeliveries = redeliveries
+        delivery.redeliveries = redelivery_count
 
         swap_in_space += delivery
 
@@ -1678,6 +1699,8 @@ class QueueEntry(val queue:Queue, val se
     }
 
     override def swap_range = {
+      // You can't swap range an acquired entry.
+      assert(!is_acquired)
       if( swap_in_space!=null ) {
         swap_in_space = null
         queue.swapping_in_size -= size
@@ -1685,6 +1708,58 @@ class QueueEntry(val queue:Queue, val se
       queue.individual_swapped_items -= 1
       state = new SwappedRange(seq, 1, size, expiration)
     }
+
+    override def dispatch():Boolean = {
+      queue.assert_executing
+
+      if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
+        queue.expired(entry)
+        remove
+        return true
+      }
+
+      // Nothing to dispatch if we don't have subs..
+      if( parked.isEmpty ) {
+        return false
+      }
+
+      var heldBack = ListBuffer[Subscription]()
+      var advancing = ListBuffer[Subscription]()
+
+      parked.foreach{ sub=>
+        if( sub.browser ) {
+          heldBack += sub
+        } else {
+          if( is_acquired ) {
+            // advance: another sub already acquired this entry.. we don't need to load..
yay!
+            advancing += sub
+          } else {
+            heldBack += sub
+          }
+        }
+      }
+
+      if ( advancing.isEmpty ) {
+        if (swap_in_space==null && !parked.isEmpty) {
+          // If we are not swapping in try to get a sub to prefetch us.
+          parked.foreach(_.refill_prefetch)
+        }
+        return false
+      } else {
+
+        // The held back subs stay on this entry..
+        parked = heldBack.toList
+
+        if (swap_in_space==null && !parked.isEmpty) {
+          // If we are not swapping in try to get a sub to prefetch us.
+          parked.foreach(_.refill_prefetch)
+        }
+
+        // the advancing subs move on to the next entry...
+        advance(advancing)
+        return true
+      }
+    }
   }
 
   /**
@@ -1781,6 +1856,7 @@ class QueueEntry(val queue:Queue, val se
       val value = getNext
       assert(value!=null)
       assert(value.is_swapped || value.is_swapped_range)
+      assert(!value.is_acquired)
       if( value.is_swapped ) {
         assert(last < value.seq )
         last = value.seq
@@ -1907,7 +1983,7 @@ class Subscription(val queue:Queue, val 
       session.close
       session = null
 
-      // The following action gets executed once all aquired messages
+      // The following action gets executed once all acquired messages
       // ared acked or nacked.
       pending_close_action = ()=> {
         queue.change_consumer_capacity( - queue.tune_consumer_buffer )
@@ -1960,6 +2036,7 @@ class Subscription(val queue:Queue, val 
   def full = session.full
 
   def offer(delivery:Delivery) = try {
+    assert(delivery.seq > 0 )
     session.offer(delivery)
   } finally {
     check_consumer_stall
@@ -2026,12 +2103,14 @@ class Subscription(val queue:Queue, val 
       pos // start prefetching from the current position.
     }
 
-    var remaining = queue.tune_consumer_buffer - acquired_size; // 3/4 of the prefetch is
triggers loading
+    var remaining = queue.tune_consumer_buffer - acquired_size;
     while( remaining>0 && cursor!=null ) {
       val next = cursor.getNext
-      if( (cursor.prefetch_flags & PREFTCH_LOAD_FLAG) == 0 ) {
+      // Browsers prefetch all messages..
+      // Non-Browsers prefetch non-acquired messages.
+      if( !cursor.prefetched && (browser || !cursor.is_acquired) ) {
         remaining -= cursor.size
-        cursor.prefetch_flags = (cursor.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
+        cursor.prefetched = true
         cursor.load(queue.consumer_swapped_in)
       }
       cursor = next
@@ -2095,7 +2174,10 @@ class Subscription(val queue:Queue, val 
       }
 
       total_nack_count += 1
-      entry.as_loaded.acquired = false
+      entry.state match {
+        case x:entry.Loaded=> x.acquirer = null
+        case x:entry.Swapped=> x.acquirer = null
+      }
       acquired_size -= entry.size
 
       // track for stats
@@ -2106,12 +2188,20 @@ class Subscription(val queue:Queue, val 
       // The following does not need to get done for exclusive subs because
       // they end up rewinding all the sub of the head of the queue.
       if( !exclusive ) {
-
         // rewind all the matching competing subs past the entry.. back to the entry
+        val loaded = entry.as_loaded
         queue.all_subscriptions.valuesIterator.foreach{ sub=>
-          if( !sub.browser && entry.seq < sub.pos.seq && sub.matches(entry.as_loaded.delivery))
{
+          val matches = if( loaded!=null ) {
+            // small perf optimization.. no need to rewind if the
+            // consumer is not interested in the message. (not the typical case).
+            sub.matches(loaded.delivery)
+          } else {
+            true // if message was not loaded lets just assume it was.
+          }
+          if( !sub.browser && entry.seq < sub.pos.seq && matches) {
             sub.rewind(entry)
           }
+
         }
 
       }



Mime
View raw message