activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234263 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Date Sat, 21 Jan 2012 04:21:11 GMT
Author: chirino
Date: Sat Jan 21 04:21:11 2012
New Revision: 1234263

URL: http://svn.apache.org/viewvc?rev=1234263&view=rev
Log:
When a queue is being stopped, make sure you wait for any in flight message store operations
to complete before reporting the queue has been stopped.  

This avoids an ordering issue where a queue could get deleted from a store but then queue
store operations are performed later.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1234263&r1=1234262&r2=1234263&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Sat Jan 21 04:21:11 2012
@@ -457,6 +457,8 @@ class Queue(val router: LocalRouter, val
     }
   }
 
+  var stop_listener_waiting_for_flush:Runnable = _
+
   protected def _stop(on_completed: Runnable) = {
     // Disconnect the producers..
     producers.foreach { producer =>
@@ -469,15 +471,25 @@ class Queue(val router: LocalRouter, val
 
     trigger_swap
 
-    destination_dto match {
-      case d:DurableSubscriptionDestinationDTO =>
-        DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_dsub_metrics,
get_queue_metrics)
-      case t:TopicDestinationDTO =>
-        // metrics are taken care of by topic
-      case _ =>
-        DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_queue_metrics,
get_queue_metrics)
+    stop_listener_waiting_for_flush = on_completed
+    if( swapping_out_size==0 ) {
+      on_queue_flushed
+    }
+  }
+
+  def on_queue_flushed = {
+    if(stop_listener_waiting_for_flush!=null) {
+      destination_dto match {
+        case d:DurableSubscriptionDestinationDTO =>
+          DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_dsub_metrics,
get_queue_metrics)
+        case t:TopicDestinationDTO =>
+          // metrics are taken care of by topic
+        case _ =>
+          DestinationMetricsSupport.add_destination_metrics(virtual_host.dead_queue_metrics,
get_queue_metrics)
+      }
+      stop_listener_waiting_for_flush.run()
+      stop_listener_waiting_for_flush = null
     }
-    on_completed.run
   }
 
   def might_unfill[T](func: =>T):T = {
@@ -1383,9 +1395,14 @@ class QueueEntry(val queue:Queue, val se
       if(!storing) {
         storing = true
         delivery.uow.enqueue(toQueueEntryRecord)
+        queue.swapping_out_size+=size
         delivery.uow.on_flush { canceled =>
           queue.swap_out_completes_source.merge(^{
+            queue.swapping_out_size-=size
             this.swapped_out(!canceled)
+            if( queue.swapping_out_size==0 ) {
+              queue.on_queue_flushed
+            }
           })
         }
       }
@@ -1395,7 +1412,6 @@ class QueueEntry(val queue:Queue, val se
       if( queue.tune_swap && !swapping_out ) {
         swapping_out=true
 
-        queue.swapping_out_size+=size
         if( stored ) {
           swapped_out(false)
         } else {
@@ -1439,10 +1455,8 @@ class QueueEntry(val queue:Queue, val se
       stored = true
       delivery.uow = null
       if( swapping_out ) {
-
         swapping_out = false
         space -= delivery
-        queue.swapping_out_size-=size
 
         if( store_wrote_to_disk ) {
           queue.swap_out_size_counter += size
@@ -1474,10 +1488,7 @@ class QueueEntry(val queue:Queue, val se
         this.space = space
         this.space += delivery
       }
-      if( swapping_out ) {
-        swapping_out = false
-        queue.swapping_out_size-=size
-      }
+      swapping_out = false
     }
 
     override def remove = {



Mime
View raw message