activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1387819 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/
Date Wed, 19 Sep 2012 23:44:47 GMT
Author: chirino
Date: Wed Sep 19 23:44:46 2012
New Revision: 1387819

URL: http://svn.apache.org/viewvc?rev=1387819&view=rev
Log:
Back-out the changes introduced in rev 1379944.  That change caused a queue performance regression.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1387819&r1=1387818&r2=1387819&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Wed Sep 19 23:44:46 2012
@@ -30,7 +30,6 @@ import security.{SecuredResource, Securi
 import org.apache.activemq.apollo.dto._
 import java.util.regex.Pattern
 import collection.mutable.ListBuffer
-import java.util
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -120,12 +119,6 @@ class Queue(val router: LocalRouter, val
   var tune_swap = true
 
   /**
-   * Todo.. see if we can remove this collection.  Don't think it's
-   * actually need.
-   */
-  val in_flight_removes = new util.HashSet[Long]()
-
-  /**
    * The number max number of swapped queue entries to load
    * for the store at a time.  Note that swapped entries are just
    * reference pointers to the actual messages.  When not loaded,
@@ -605,13 +598,19 @@ class Queue(val router: LocalRouter, val
             case state: entry.Loaded =>
               var next = entry.getNext
               if (!entry.is_acquired) {
-                entry.dequeue(null)
+                dequeue_item_counter += 1
+                dequeue_size_counter += entry.size
+                dequeue_ts = now
+                entry.remove
               }
               next
             case state: entry.Swapped =>
               var next = entry.getNext
               if (!entry.is_acquired) {
-                entry.dequeue(null)
+                dequeue_item_counter += 1
+                dequeue_size_counter += entry.size
+                dequeue_ts = now
+                entry.remove
               }
               next
             case state: entry.SwappedRange =>
@@ -669,13 +668,12 @@ class Queue(val router: LocalRouter, val
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queue_delivery = delivery.copy
         queue_delivery.seq = entry.seq
-
+        entry.init(queue_delivery)
+        
         if( tune_persistent ) {
           queue_delivery.uow = delivery.uow
         }
 
-        entry.init(queue_delivery)
-
         entries.addLast(entry)
         enqueue_item_counter += 1
         enqueue_size_counter += entry.size
@@ -684,6 +682,15 @@ class Queue(val router: LocalRouter, val
         // To decrease the enqueue throttle.
         enqueue_remaining_take(entry.size)
 
+        // Do we need to do a persistent enqueue???
+        val persisted = queue_delivery.uow != null
+        if (persisted) {
+          entry.state match {
+            case state:entry.Loaded => state.store
+            case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
+          }
+        }
+
         if( entry.hasSubs ) {
           // try to dispatch it directly...
           entry.dispatch
@@ -693,7 +700,7 @@ class Queue(val router: LocalRouter, val
         if( entry.isLinked ) {
           if( !consumers_keeping_up_historically  ) {
             entry.swap(true)
-          } else if( entry.as_loaded.is_acquired && queue_delivery.uow != null) {
+          } else if( entry.as_loaded.is_acquired && persisted) {
             // If the message as dispatched and it's marked to get persisted anyways,
             // then it's ok if it falls out of memory since we won't need to load it again.
             entry.swap(false)
@@ -701,7 +708,7 @@ class Queue(val router: LocalRouter, val
         }
 
         // release the store batch...
-        if (queue_delivery.uow != null) {
+        if (persisted) {
           queue_delivery.uow.release
           queue_delivery.uow = null
         }
@@ -722,10 +729,17 @@ class Queue(val router: LocalRouter, val
   }
 
   def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
+    if(dequeue) {
+      might_unfill {
+        dequeue_item_counter += 1
+        dequeue_size_counter += entry.size
+        dequeue_ts = now
+      }
+    }
+
     expired_ts = now
     expired_item_counter += 1
     expired_size_counter += entry.size
-    entry.dequeue(null)
   }
 
   def display_stats: Unit = {
@@ -795,12 +809,14 @@ class Queue(val router: LocalRouter, val
             // acquired.
             if( !x.is_acquired ) {
               expired(cur)
+              x.remove
             }
           case x:QueueEntry#Loaded =>
             // remove the expired message if it has not been
             // acquired.
             if( !x.is_acquired ) {
               expired(cur)
+              x.remove
             }
           case _ =>
         }
@@ -820,51 +836,36 @@ class Queue(val router: LocalRouter, val
     }
 
     // swap out messages.
-    cur = entries.getHead.getNext
-    var dropping_head_entries = is_topic_queue
+    cur = entries.getHead
     while( cur!=null ) {
       val next = cur.getNext
-      if ( dropping_head_entries ) {
-        if( cur.parked.isEmpty ) {
-          if( cur.is_swapped_range ) {
-            cur.load(producer_swapped_in)
-            dropping_head_entries=false
-          } else {
-            cur.dequeue(null)
-          }
-        } else {
-          cur.load(consumer_swapped_in)
-          dropping_head_entries = false
-        }
+      if( cur.prefetched ) {
+        // Prefteched entries need to get loaded..
+        cur.load(consumer_swapped_in)
       } else {
-        if( cur.prefetched ) {
-          // Prefteched entries need to get loaded..
-          cur.load(consumer_swapped_in)
-        } else {
-          // This is a non-prefetched entry.. entires ahead and behind the
-          // consumer subscriptions.
-          val loaded = cur.as_loaded
-          if( loaded!=null ) {
-            // It's in memory.. perhaps we need to swap it out..
-            if(!consumers_keeping_up_historically) {
-              // Swap out ASAP if consumers are not keeping up..
-              cur.swap(true)
-            } else {
-              // Consumers seem to be keeping up.. so we have to be more selective
-              // about what gets swapped out..
+        // This is a non-prefetched entry.. entires ahead and behind the
+        // consumer subscriptions.
+        val loaded = cur.as_loaded
+        if( loaded!=null ) {
+          // It's in memory.. perhaps we need to swap it out..
+          if(!consumers_keeping_up_historically) {
+            // Swap out ASAP if consumers are not keeping up..
+            cur.swap(true)
+          } else {
+            // Consumers seem to be keeping up.. so we have to be more selective
+            // about what gets swapped out..
 
-              if (cur.memory_space eq producer_swapped_in ) {
-                // Entry will be used soon..
-                cur.load(producer_swapped_in)
-              } else if ( cur.is_acquired ) {
-                // Entry was just used...
-                cur.load(consumer_swapped_in)
-  //              cur.swap(false)
-              } else {
-                // Does not look to be anywhere close to the consumer.. so get
-                // rid of it asap.
-                cur.swap(true)
-              }
+            if (cur.memory_space eq producer_swapped_in ) {
+              // Entry will be used soon..
+              cur.load(producer_swapped_in)
+            } else if ( cur.is_acquired ) {
+              // Entry was just used...
+              cur.load(consumer_swapped_in)
+//              cur.swap(false)
+            } else {
+              // Does not look to be anywhere close to the consumer.. so get
+              // rid of it asap.
+              cur.swap(true)
             }
           }
         }
@@ -1054,11 +1055,12 @@ class Queue(val router: LocalRouter, val
       case (entry, consumed, uow) =>
         consumed match {
           case Consumed =>
+//            debug("ack consumed: ("+store_id+","+entry.entry.seq+")")
             entry.ack(uow)
           case Expired=>
 //            debug("ack expired: ("+store_id+","+entry.entry.seq+")")
             entry.entry.queue.expired(entry.entry, false)
-            entry.remove()
+            entry.ack(uow)
           case Delivered =>
             entry.increment_nack
             entry.entry.redelivered

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1387819&r1=1387818&r2=1387819&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Wed Sep 19 23:44:46 2012
@@ -64,25 +64,15 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(delivery:Delivery):QueueEntry = {
-
     if( delivery.message == null ) {
       // This must be a swapped out message which has been previously persisted in
       // another queue.  We need to enqueue it to this queue..
       queue.swap_out_size_counter += delivery.size
       queue.swap_out_item_counter += 1
       state = new Swapped(delivery.storeKey, delivery.storeLocator, delivery.size, delivery.expiration,
0, null, delivery.sender)
-      // store it..
-      if( delivery.uow != null ) {
-        delivery.uow.enqueue(toQueueEntryRecord)
-      }
     } else {
       queue.producer_swapped_in += delivery
-      val loaded: QueueEntry.this.type#Loaded = new Loaded(delivery, false, queue.producer_swapped_in)
-      state = loaded
-      // store it..
-      if( delivery.uow != null ) {
-        loaded.store
-      }
+      state = new Loaded(delivery, false, queue.producer_swapped_in)
     }
     this
   }
@@ -205,7 +195,6 @@ class QueueEntry(val queue:Queue, val se
   // These methods may cause a change in the current state.
   def swap(asap:Boolean) = state.swap_out(asap)
   def load(space:MemorySpace) = state.swap_in(space)
-  def dequeue(uow:StoreUOW) = state.dequeue(uow)
   def remove = state.remove
 
   def swapped_range = state.swap_range
@@ -301,48 +290,14 @@ class QueueEntry(val queue:Queue, val se
      * Removes the entry from the queue's linked list of entries.  This gets called
      * as a result of an acquired ack.
      */
-    def dequeue(uow:StoreUOW):Unit = {
-      if (messageKey != -1) {
-        val localuow = if( uow == null ) {
-          queue.virtual_host.store.create_uow
-        } else {
-          uow
-        }
-        localuow.dequeue(entry.toQueueEntryRecord)
-        remove()
-        queue.in_flight_removes.add(seq)
-        localuow.on_complete {
-          queue.dispatch_queue {
-            queue.in_flight_removes.remove(seq)
-            queue.might_unfill {
-              queue.dequeue_item_counter += 1
-              queue.dequeue_size_counter += size
-              queue.dequeue_ts = queue.now
-            }
-          }
-        }
-        if( uow == null ) {
-          localuow.release
-        }
-      } else {
-        queue.might_unfill {
-          remove()
-          queue.dequeue_item_counter += 1
-          queue.dequeue_size_counter += size
-          queue.dequeue_ts = queue.now
-        }
-      }
-    }
-
-    def remove():Unit = {
+    def remove:Unit = {
       // advance subscriptions that were on this entry..
-      if( !parked.isEmpty ) {
-        advance(parked)
-        parked = Nil
-      }
+      advance(parked)
+      parked = Nil
 
       // take the entry of the entries list..
       unlink
+      //TODO: perhaps refill subscriptions.
     }
 
     /**
@@ -382,7 +337,7 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-    override def dequeue(uow:StoreUOW) = throw new AssertionError("Head entry cannot be removed")
+    override def remove = throw new AssertionError("Head entry cannot be removed")
     override def swap_in(space:MemorySpace) = throw new AssertionError("Head entry cannot
be loaded")
     override def swap_out(asap:Boolean) = throw new AssertionError("Head entry cannot be
swapped")
   }
@@ -398,7 +353,7 @@ class QueueEntry(val queue:Queue, val se
     override  def toString = "tail"
     override def as_tail:Tail = this
 
-    override def dequeue(uow:StoreUOW) = throw new AssertionError("Tail entry cannot be removed")
+    override def remove = throw new AssertionError("Tail entry cannot be removed")
     override def swap_in(space:MemorySpace) = throw new AssertionError("Tail entry cannot
be loaded")
     override def swap_out(asap:Boolean) = throw new AssertionError("Tail entry cannot be
swapped")
 
@@ -446,7 +401,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def redelivered = delivery.redeliveries = ((delivery.redeliveries+1).min(Short.MaxValue)).toShort
 
-    var remove_pending:Option[StoreUOW] = None
+    var remove_pending = false
 
     override def is_swapped_or_swapping_out = {
       swapping_out
@@ -534,11 +489,11 @@ class QueueEntry(val queue:Queue, val se
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
-        queue.loaded_items -= 1
-        queue.loaded_size -= size
-
-        if( remove_pending.isDefined ) {
-          state.dequeue(remove_pending.get)
+        if( remove_pending ) {
+          state.remove
+        } else {
+          queue.loaded_items -= 1
+          queue.loaded_size -= size
         }
 
         val on_swap_out_copy = on_swap_out
@@ -548,10 +503,10 @@ class QueueEntry(val queue:Queue, val se
         }
 
       } else {
-        if( remove_pending.isDefined ) {
+        if( remove_pending ) {
           delivery.message.release
           space -= delivery
-          super.dequeue(remove_pending.get)
+          super.remove
         }
       }
     }
@@ -565,20 +520,16 @@ class QueueEntry(val queue:Queue, val se
       swapping_out = false
     }
 
-    override def dequeue(uow:StoreUOW) = {
-      if( storing | remove_pending.isDefined ) {
-        remove_pending = Some(uow)
-      } else {
-        super.dequeue(uow)
-      }
-    }
-
-    override def remove() = {
+    override def remove = {
       queue.loaded_items -= 1
       queue.loaded_size -= size
-      delivery.message.release
-      space -= delivery
-      super.remove()
+      if( storing | remove_pending ) {
+        remove_pending = true
+      } else {
+        delivery.message.release
+        space -= delivery
+        super.remove
+      }
     }
 
     override def dispatch():Boolean = {
@@ -587,6 +538,7 @@ class QueueEntry(val queue:Queue, val se
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
         queue.expired(entry)
+        remove
         return true
       }
 
@@ -695,7 +647,15 @@ class QueueEntry(val queue:Queue, val se
 
         // We can drop after dispatch in some cases.
         if( queue.is_topic_queue  && parked.isEmpty && getPrevious.is_head
) {
-          dequeue(null)
+          if (messageKey != -1) {
+            val storeBatch = queue.virtual_host.store.create_uow
+            storeBatch.dequeue(toQueueEntryRecord)
+            storeBatch.release
+          }
+          queue.dequeue_item_counter += 1
+          queue.dequeue_size_counter += size
+          queue.dequeue_ts = queue.now
+          remove
         }
 
         queue.trigger_swap
@@ -803,18 +763,14 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-    override def dequeue(uow:StoreUOW) = {
+
+    override def remove = {
       if( space!=null ) {
         space = null
         queue.swapping_in_size -= size
       }
-      super.dequeue(uow)
-    }
-
-
-    override def remove() {
       queue.individual_swapped_items -= 1
-      super.remove()
+      super.remove
     }
 
     override def swap_range = {
@@ -833,6 +789,7 @@ class QueueEntry(val queue:Queue, val se
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
         queue.expired(entry)
+        remove
         return true
       }
 
@@ -934,11 +891,9 @@ class QueueEntry(val queue:Queue, val se
             val tmpList = new LinkedNodeList[QueueEntry]()
             records.foreach { record =>
               val entry = new QueueEntry(queue, record.entry_seq).init(record)
-              if( !queue.in_flight_removes.contains(entry.seq) ) {
-                tmpList.addLast(entry)
-                item_count += 1
-                size_count += record.size
-              }
+              tmpList.addLast(entry)
+              item_count += 1
+              size_count += record.size
             }
 
             // we may need to adjust the enqueue count if entries

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1387819&r1=1387818&r2=1387819&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
Wed Sep 19 23:44:46 2012
@@ -322,11 +322,21 @@ class Subscription(val queue:Queue, val 
 
       total_ack_count += 1
       total_ack_size += entry.size
-      remove()
-      entry.dequeue(uow) // entry size changes to 0
-    }
+      if (entry.messageKey != -1) {
+        val storeBatch = if( uow == null ) {
+          queue.virtual_host.store.create_uow
+        } else {
+          uow
+        }
+        storeBatch.dequeue(entry.toQueueEntryRecord)
+        if( uow == null ) {
+          storeBatch.release
+        }
+      }
+      queue.dequeue_item_counter += 1
+      queue.dequeue_size_counter += entry.size
+      queue.dequeue_ts = queue.now
 
-    def remove():Unit = {
       // removes this entry from the acquired list.
       unlink()
       if( acquired.isEmpty ) {
@@ -337,9 +347,12 @@ class Subscription(val queue:Queue, val 
       acquired_size -= entry.size
 
       val next = entry.nextOrTail
+      entry.remove // entry size changes to 0
+
       queue.trigger_swap
       next.task.run
       check_finish_close
+
     }
 
     def increment_nack = total_nack_count += 1

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala?rev=1387819&r1=1387818&r2=1387819&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
Wed Sep 19 23:44:46 2012
@@ -286,7 +286,7 @@ class StompLevelDBMetricsTest extends St
 
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
 
-  test("slow_consumer_policy='queue' /w 1 slow and 1 fast consumer.") {
+  ignore("slow_consumer_policy='queue' /w 1 slow and 1 fast consumer.") {
     var dest_name = next_id("queued.metrics")
     val dest = "/topic/"+dest_name
 



Mime
View raw message