activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1233768 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala store/DelayingStoreSupport.scala store/StoreUOW.scala
Date Fri, 20 Jan 2012 04:54:30 GMT
Author: chirino
Date: Fri Jan 20 04:54:30 2012
New Revision: 1233768

URL: http://svn.apache.org/viewvc?rev=1233768&view=rev
Log:
Update the store interface so that you know if a message store was canceled, that way we don't
add to the swapped out counter on a queue if a swap out gets canceled.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1233768&r1=1233767&r2=1233768&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Jan 20 04:54:30 2012
@@ -1001,14 +1001,14 @@ class Queue(val router: LocalRouter, val
     rc
   }
 
-  val swap_out_completes_source = createSource(new ListEventAggregator[QueueEntry#Loaded](),
dispatch_queue)
+  val swap_out_completes_source = createSource(new ListEventAggregator[Runnable](), dispatch_queue)
   swap_out_completes_source.setEventHandler(^ {drain_swap_out_completes});
   swap_out_completes_source.resume
 
   def drain_swap_out_completes() = might_unfill {
     val data = swap_out_completes_source.getData
     data.foreach { loaded =>
-      loaded.swapped_out
+      loaded.run()
     }
   }
 
@@ -1389,8 +1389,10 @@ class QueueEntry(val queue:Queue, val se
       if(!storing) {
         storing = true
         delivery.uow.enqueue(toQueueEntryRecord)
-        delivery.uow.on_flush {
-          queue.swap_out_completes_source.merge(this)
+        delivery.uow.on_flush { canceled =>
+          queue.swap_out_completes_source.merge(^{
+            this.swapped_out(!canceled)
+          })
         }
       }
     }
@@ -1401,7 +1403,7 @@ class QueueEntry(val queue:Queue, val se
 
         queue.swapping_out_size+=size
         if( stored ) {
-          swapped_out
+          swapped_out(false)
         } else {
 
           // The storeBatch is only set when called from the messages.offer method
@@ -1437,7 +1439,7 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-    def swapped_out() = {
+    def swapped_out(store_wrote_to_disk:Boolean) = {
       assert( state == this )
       storing = false
       stored = true
@@ -1448,8 +1450,10 @@ class QueueEntry(val queue:Queue, val se
         space -= delivery
         queue.swapping_out_size-=size
 
-        queue.swap_out_size_counter += size
-        queue.swap_out_item_counter += 1
+        if( store_wrote_to_disk ) {
+          queue.swap_out_size_counter += size
+          queue.swap_out_item_counter += 1
+        }
 
         state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration, redelivery_count,
acquirer)
         if( can_combine_with_prev ) {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1233768&r1=1233767&r2=1233768&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Fri Jan 20 04:54:30 2012
@@ -63,6 +63,7 @@ trait DelayingStoreSupport extends Store
 
     var dispose_start:Long = 0
     var flushing = false;
+    var canceled = false;
 
     class MessageAction {
 
@@ -85,7 +86,7 @@ trait DelayingStoreSupport extends Store
     var completed = false
     var complete_listeners = ListBuffer[() => Unit]()
     var flushed = false
-    var flush_listeners = ListBuffer[() => Unit]()
+    var flush_listeners = ListBuffer[(Boolean)=>Unit]()
     var disable_delay = false
 
     var map_actions = Map[Buffer, Buffer]()
@@ -94,17 +95,15 @@ trait DelayingStoreSupport extends Store
       map_actions += (key -> value)
     }
 
-    def on_flush(callback: =>Unit) = {
-      if( this.synchronized {
+    def on_flush(callback: (Boolean)=>Unit) = {
+      (this.synchronized {
         if( flushed ) {
-          true
+          Some(canceled)
         } else {
-          flush_listeners += ( ()=> callback  )
-          false
+          flush_listeners += callback
+          None
         }
-      }) {
-        callback
-      }
+      }).foreach(callback(_))
     }
 
     def on_complete(callback: =>Unit) = {
@@ -135,7 +134,7 @@ trait DelayingStoreSupport extends Store
 
     def cancel = {
       dispatch_queue.assertExecuting()
-      flushing = true
+      canceled = true
       delayed_uows.remove(uow_id)
       on_completed
     }
@@ -193,7 +192,7 @@ trait DelayingStoreSupport extends Store
     def on_flushed() = this.synchronized {
       if( !flushed ) {
         flushed = true
-        flush_listeners.foreach(_())
+        flush_listeners.foreach(_(canceled))
       }
     }
 
@@ -319,7 +318,7 @@ trait DelayingStoreSupport extends Store
 
           def prev_uow = prev_action.uow
 
-          if( prev_action!=null && !prev_uow.flushing ) {
+          if( prev_action!=null && !(prev_uow.flushing || prev_uow.canceled) ) {
 
 
             prev_uow.delayable_actions -= 1
@@ -366,7 +365,7 @@ trait DelayingStoreSupport extends Store
   }
 
   private def flush(uow:DelayableUOW) = {
-    if( uow!=null && !uow.flushing ) {
+    if( uow!=null && !(uow.flushing || uow.canceled) ) {
       uow.flushing = true
       delayed_uows.remove(uow.uow_id)
       flush_source.merge(uow)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1233768&r1=1233767&r2=1233768&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Fri Jan 20 04:54:30 2012
@@ -77,7 +77,7 @@ trait StoreUOW extends Retained {
    * has written to disk and flushed of the application
    * buffers.
    */
-  def on_flush(callback: =>Unit)
+  def on_flush(callback: (Boolean)=>Unit)
 
   /**
    * The specified callback is executed once the UOW



Mime
View raw message