activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1389435 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/
Date Mon, 24 Sep 2012 15:35:01 GMT
Author: chirino
Date: Mon Sep 24 15:35:01 2012
New Revision: 1389435

URL: http://svn.apache.org/viewvc?rev=1389435&view=rev
Log:
Re-implemented the backed out changes in rev 1379944 in a way that does not cause a performance
regression.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1389435&r1=1389434&r2=1389435&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Mon Sep 24 15:35:01 2012
@@ -598,18 +598,14 @@ class Queue(val router: LocalRouter, val
             case state: entry.Loaded =>
               var next = entry.getNext
               if (!entry.is_acquired) {
-                dequeue_item_counter += 1
-                dequeue_size_counter += entry.size
-                dequeue_ts = now
+                entry.dequeue(null)
                 entry.remove
               }
               next
             case state: entry.Swapped =>
               var next = entry.getNext
               if (!entry.is_acquired) {
-                dequeue_item_counter += 1
-                dequeue_size_counter += entry.size
-                dequeue_ts = now
+                entry.dequeue(null)
                 entry.remove
               }
               next
@@ -722,21 +718,7 @@ class Queue(val router: LocalRouter, val
     }
   }
 
-  def expired(delivery:Delivery):Unit = {
-    expired_ts = now
-    expired_item_counter += 1
-    expired_size_counter += delivery.size
-  }
-
-  def expired(entry:QueueEntry, dequeue:Boolean=true):Unit = {
-    if(dequeue) {
-      might_unfill {
-        dequeue_item_counter += 1
-        dequeue_size_counter += entry.size
-        dequeue_ts = now
-      }
-    }
-
+  def expired(entry:QueueEntry):Unit = {
     expired_ts = now
     expired_item_counter += 1
     expired_size_counter += entry.size
@@ -809,6 +791,7 @@ class Queue(val router: LocalRouter, val
             // acquired.
             if( !x.is_acquired ) {
               expired(cur)
+              cur.dequeue(null)
               x.remove
             }
           case x:QueueEntry#Loaded =>
@@ -816,6 +799,7 @@ class Queue(val router: LocalRouter, val
             // acquired.
             if( !x.is_acquired ) {
               expired(cur)
+              cur.dequeue(null)
               x.remove
             }
           case _ =>
@@ -836,36 +820,52 @@ class Queue(val router: LocalRouter, val
     }
 
     // swap out messages.
-    cur = entries.getHead
+    cur = entries.getHead.getNext
+    var dropping_head_entries = is_topic_queue
     while( cur!=null ) {
       val next = cur.getNext
-      if( cur.prefetched ) {
-        // Prefteched entries need to get loaded..
-        cur.load(consumer_swapped_in)
-      } else {
-        // This is a non-prefetched entry.. entires ahead and behind the
-        // consumer subscriptions.
-        val loaded = cur.as_loaded
-        if( loaded!=null ) {
-          // It's in memory.. perhaps we need to swap it out..
-          if(!consumers_keeping_up_historically) {
-            // Swap out ASAP if consumers are not keeping up..
-            cur.swap(true)
+      if ( dropping_head_entries ) {
+        if( cur.parked.isEmpty ) {
+          if( cur.is_swapped_range ) {
+            cur.load(producer_swapped_in)
+            dropping_head_entries=false
           } else {
-            // Consumers seem to be keeping up.. so we have to be more selective
-            // about what gets swapped out..
-
-            if (cur.memory_space eq producer_swapped_in ) {
-              // Entry will be used soon..
-              cur.load(producer_swapped_in)
-            } else if ( cur.is_acquired ) {
-              // Entry was just used...
-              cur.load(consumer_swapped_in)
-//              cur.swap(false)
-            } else {
-              // Does not look to be anywhere close to the consumer.. so get
-              // rid of it asap.
+            cur.dequeue(null)
+            cur.remove
+          }
+        } else {
+          cur.load(consumer_swapped_in)
+          dropping_head_entries = false
+        }
+      } else {
+        if( cur.prefetched ) {
+          // Prefteched entries need to get loaded..
+          cur.load(consumer_swapped_in)
+        } else {
+          // This is a non-prefetched entry.. entires ahead and behind the
+          // consumer subscriptions.
+          val loaded = cur.as_loaded
+          if( loaded!=null ) {
+            // It's in memory.. perhaps we need to swap it out..
+            if(!consumers_keeping_up_historically) {
+              // Swap out ASAP if consumers are not keeping up..
               cur.swap(true)
+            } else {
+              // Consumers seem to be keeping up.. so we have to be more selective
+              // about what gets swapped out..
+
+              if (cur.memory_space eq producer_swapped_in ) {
+                // Entry will be used soon..
+                cur.load(producer_swapped_in)
+              } else if ( cur.is_acquired ) {
+                // Entry was just used...
+                cur.load(consumer_swapped_in)
+  //              cur.swap(false)
+              } else {
+                // Does not look to be anywhere close to the consumer.. so get
+                // rid of it asap.
+                cur.swap(true)
+              }
             }
           }
         }
@@ -1059,7 +1059,9 @@ class Queue(val router: LocalRouter, val
             entry.ack(uow)
           case Expired=>
 //            debug("ack expired: ("+store_id+","+entry.entry.seq+")")
-            entry.entry.queue.expired(entry.entry, false)
+            expired_ts = now
+            expired_item_counter += 1
+            expired_size_counter += entry.entry.size
             entry.ack(uow)
           case Delivered =>
             entry.increment_nack

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1389435&r1=1389434&r2=1389435&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Mon Sep 24 15:35:01 2012
@@ -197,6 +197,26 @@ class QueueEntry(val queue:Queue, val se
   def load(space:MemorySpace) = state.swap_in(space)
   def remove = state.remove
 
+  def dequeue(uow: StoreUOW) = {
+
+    if (messageKey != -1) {
+      val storeBatch = if( uow == null ) {
+        queue.virtual_host.store.create_uow
+      } else {
+        uow
+      }
+      storeBatch.dequeue(toQueueEntryRecord)
+      if( uow == null ) {
+        storeBatch.release
+      }
+    }
+
+    queue.dequeue_item_counter += 1
+    queue.dequeue_size_counter += size
+    queue.dequeue_ts = queue.now
+  }
+
+
   def swapped_range = state.swap_range
 
   def can_combine_with_prev = {
@@ -538,6 +558,7 @@ class QueueEntry(val queue:Queue, val se
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
         queue.expired(entry)
+        entry.dequeue(null)
         remove
         return true
       }
@@ -652,9 +673,7 @@ class QueueEntry(val queue:Queue, val se
             storeBatch.dequeue(toQueueEntryRecord)
             storeBatch.release
           }
-          queue.dequeue_item_counter += 1
-          queue.dequeue_size_counter += size
-          queue.dequeue_ts = queue.now
+          dequeue(null)
           remove
         }
 
@@ -789,6 +808,7 @@ class QueueEntry(val queue:Queue, val se
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
         queue.expired(entry)
+        entry.dequeue(null)
         remove
         return true
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1389435&r1=1389434&r2=1389435&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
Mon Sep 24 15:35:01 2012
@@ -322,20 +322,7 @@ class Subscription(val queue:Queue, val 
 
       total_ack_count += 1
       total_ack_size += entry.size
-      if (entry.messageKey != -1) {
-        val storeBatch = if( uow == null ) {
-          queue.virtual_host.store.create_uow
-        } else {
-          uow
-        }
-        storeBatch.dequeue(entry.toQueueEntryRecord)
-        if( uow == null ) {
-          storeBatch.release
-        }
-      }
-      queue.dequeue_item_counter += 1
-      queue.dequeue_size_counter += entry.size
-      queue.dequeue_ts = queue.now
+      entry.dequeue(uow)
 
       // removes this entry from the acquired list.
       unlink()

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala?rev=1389435&r1=1389434&r2=1389435&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
Mon Sep 24 15:35:01 2012
@@ -286,7 +286,7 @@ class StompLevelDBMetricsTest extends St
 
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
 
-  ignore("slow_consumer_policy='queue' /w 1 slow and 1 fast consumer.") {
+  test("slow_consumer_policy='queue' /w 1 slow and 1 fast consumer.") {
     var dest_name = next_id("queued.metrics")
     val dest = "/topic/"+dest_name
 



Mime
View raw message