activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234517 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Date Sun, 22 Jan 2012 14:43:00 GMT
Author: chirino
Date: Sun Jan 22 14:43:00 2012
New Revision: 1234517

URL: http://svn.apache.org/viewvc?rev=1234517&view=rev
Log:
Enhance UoW canceling support.
UoW's could queue up if the DB backed up storing them.  This change allows more of those queued
UoW to get canceled before they get sent to the DB to be stored.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1234517&r1=1234516&r2=1234517&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Sun Jan 22 14:43:00 2012
@@ -21,7 +21,7 @@ import java.util.HashMap
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
-import atomic.{AtomicReference, AtomicInteger}
+import atomic.AtomicInteger
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO, IntMetricDTO}
@@ -61,10 +61,6 @@ trait DelayingStoreSupport extends Store
 
   class DelayableUOW extends BaseRetained with StoreUOW {
 
-    var dispose_start:Long = 0
-    var flushing = false;
-    var canceled = false;
-
     class MessageAction {
 
       var msg= 0L
@@ -81,14 +77,29 @@ trait DelayingStoreSupport extends Store
     }
 
     val uow_id:Int = next_batch_id.getAndIncrement
-    var actions = Map[Long, MessageAction]()
+    var commit_ts:Long = 0
 
-    var completed = false
-    var complete_listeners = ListBuffer[() => Unit]()
+    // User might request the UOW to flush asap
+    var flush_asap = false
+    // Or to get canceled..
+    var canceled = false
+
+    // Perhaps track the 4 states below with a single enum?
+
+    // UOW is delayed until we send it to get flushed.
+    var delayed = true
+    // Once completed it will be marked flushed. Flushed just
+    // means the message has been written to disk
+    // and out of memory
     var flushed = false
+    // You have to wait for it to be completed
+    // to know the write has been synced to disk.
+    var completed = false
+
     var flush_listeners = ListBuffer[(Boolean)=>Unit]()
-    var disable_delay = false
+    var complete_listeners = ListBuffer[() => Unit]()
 
+    var actions = Map[Long, MessageAction]()
     var map_actions = Map[Buffer, Buffer]()
 
     def put(key: Buffer, value: Buffer) = {
@@ -119,11 +130,11 @@ trait DelayingStoreSupport extends Store
       }
     }
 
-    def complete_asap() = this.synchronized { disable_delay=true }
+    def complete_asap() = this.synchronized { flush_asap=true }
 
     var delayable_actions = 0
 
-    def delayable = !disable_delay && delayable_actions>0 && flush_delay>=0
+    def delayable = !flush_asap && delayable_actions>0 && flush_delay>=0
 
     def rm(msg:Long) = {
       actions -= msg
@@ -173,7 +184,7 @@ trait DelayingStoreSupport extends Store
         a
       }
       aggregator {
-        pending_enqueues.put(key(entry), a)
+        cancelable_enqueue_actions.put(key(entry), a)
       }
 
     }
@@ -185,7 +196,7 @@ trait DelayingStoreSupport extends Store
     }
 
     override def dispose = {
-      dispose_start = System.nanoTime
+      commit_ts = System.nanoTime
       uow_source.merge(this)
     }
 
@@ -200,7 +211,7 @@ trait DelayingStoreSupport extends Store
       if ( !completed ) {
         on_flushed
         completed = true
-        commit_latency_counter += System.nanoTime-dispose_start
+        commit_latency_counter += System.nanoTime-commit_ts
         complete_listeners.foreach(_())
         super.dispose
       }
@@ -298,7 +309,7 @@ trait DelayingStoreSupport extends Store
   uow_source.resume
 
   var pending_stores = new HashMap[Long, DelayableUOW#MessageAction]()
-  var pending_enqueues = new HashMap[(Long,Long), DelayableUOW#MessageAction]()
+  var cancelable_enqueue_actions = new HashMap[(Long,Long), DelayableUOW#MessageAction]()
   var delayed_uows = new HashMap[Int, DelayableUOW]()
 
   val next_batch_id = new AtomicInteger(1)
@@ -314,11 +325,11 @@ trait DelayingStoreSupport extends Store
         // dequeues can cancel out previous enqueues
         action.dequeues.foreach { currentDequeue=>
           val currentKey = key(currentDequeue)
-          val prev_action:DelayableUOW#MessageAction = pending_enqueues.remove(currentKey)
+          val prev_action:DelayableUOW#MessageAction = cancelable_enqueue_actions.remove(currentKey)
 
           def prev_uow = prev_action.uow
 
-          if( prev_action!=null && !(prev_uow.flushing || prev_uow.canceled) ) {
+          if( prev_action!=null && !prev_uow.canceled ) {
 
 
             prev_uow.delayable_actions -= 1
@@ -352,21 +363,22 @@ trait DelayingStoreSupport extends Store
         }
       }
 
-      val uow_id = uow.uow_id
-      if( uow.delayable ) {
-        dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{
-          flush(delayed_uows.get(uow_id))
-        })
-      } else {
-        flush(uow)
+      if( !uow.canceled ) {
+        if( uow.delayable ) {
+          val uow_id = uow.uow_id
+          dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{
+            flush(delayed_uows.get(uow_id))
+          })
+        } else {
+          flush(uow)
+        }
       }
-
     }
   }
 
   private def flush(uow:DelayableUOW) = {
-    if( uow!=null && !(uow.flushing || uow.canceled) ) {
-      uow.flushing = true
+    if( uow!=null && uow.delayed && !uow.canceled ) {
+      uow.delayed = false
       delayed_uows.remove(uow.uow_id)
       flush_source.merge(uow)
     }
@@ -385,14 +397,37 @@ trait DelayingStoreSupport extends Store
     if( !service_state.is_started ) {
       return
     }
-
+    
     val uows = flush_source.getData
+
+    var fasap = 0
+    var fdelayed = 0
+    
+    // Some UOWs may have been canceled.
+    uows.flatMap { uow=>
+      if( uow.canceled ) {
+        None
+      } else {
+        if( uow.flush_asap ) {
+          fasap += 1
+        } else {
+          fdelayed +=1
+        }
+        // It will not be possible to cancel the UOW anymore..
+        uow.actions.foreach { case (_, action) =>
+          action.enqueues.foreach { queue_entry=>
+            val action = cancelable_enqueue_actions.remove(key(queue_entry))
+            assert(action!=null)
+          }
+        }
+        Some(uow)
+      }
+    }
     if( !uows.isEmpty ) {
       flush_latency_counter.start { end=>
         flush_source.suspend
         store(uows) {
           store_completed(uows)
-
           flush_source.resume
           dispatch_queue.assertExecuting()
           uows.foreach { uow=>
@@ -403,7 +438,6 @@ trait DelayingStoreSupport extends Store
               }
               action.enqueues.foreach { queue_entry=>
                 metric_flushed_enqueue_counter += 1
-                pending_enqueues.remove(key(queue_entry))
               }
             }
           }
@@ -413,7 +447,7 @@ trait DelayingStoreSupport extends Store
     }
   }
 
-  def store_completed(uows: ListBuffer[DelayingStoreSupport.this.type#DelayableUOW]) {
+  def store_completed(uows: ListBuffer[DelayingStoreSupport.this.type#DelayableUOW]) = {
     uows.foreach { uow =>
         uow.on_completed
     }



Mime
View raw message