activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1235478 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-leveldb/src/main/scala/org/apache/activemq/ap...
Date Tue, 24 Jan 2012 20:33:20 GMT
Author: chirino
Date: Tue Jan 24 20:33:19 2012
New Revision: 1235478

URL: http://svn.apache.org/viewvc?rev=1235478&view=rev
Log:
Fixing async send with no consumers performance regression

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1235478&r1=1235477&r2=1235478&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Tue Jan 24 20:33:19 2012
@@ -341,7 +341,7 @@ class BDBClient(store: BDBStore) {
   }
 
   def store(uows: Seq[BDBStore#DelayableUOW], callback:Runnable) {
-    val sync = uows.find( ! _.complete_listeners.isEmpty ).isDefined
+    val sync = uows.find( _.flush_sync ).isDefined
     with_ctx(sync) { ctx=>
       import ctx._
       var sync_lobs = false

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1235478&r1=1235477&r2=1235478&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Tue Jan 24 20:33:19 2012
@@ -130,6 +130,8 @@ trait DelayingStoreSupport extends Store
 
     // User might request the UOW to flush asap
     var flush_asap = false
+    // Or perhaps the user needs a disk sync too.
+    var flush_sync = false
     // Or to get canceled..
     var canceled = false
 
@@ -141,7 +143,7 @@ trait DelayingStoreSupport extends Store
       this._state = next
     }
 
-    var complete_listeners = ListBuffer[(Boolean) => Unit]()
+    private var complete_listeners = ListBuffer[(Boolean) => Unit]()
 
     var actions = Map[Long, MessageAction]()
     var map_actions = Map[Buffer, Buffer]()
@@ -168,6 +170,7 @@ trait DelayingStoreSupport extends Store
         if( state eq UowCompleted ) {
           true
         } else {
+          flush_sync = true
           complete_listeners += callback
           false
         }
@@ -176,9 +179,7 @@ trait DelayingStoreSupport extends Store
       }
     }
     
-    def complete_asap = flush_it
-
-    def flush_it() = this.synchronized { 
+    def complete_asap = this.synchronized {
       flush_asap=true
       if( state eq UowDelayed ) {
         queue_flush(this)
@@ -334,19 +335,11 @@ trait DelayingStoreSupport extends Store
     rc.flushed_enqueue_counter = metric_flushed_enqueue_counter
     rc.pending_stores = pending_stores.size
 
-//    import collection.JavaConversions._
-//    var last = ""
-//    var count = 0
-//    pending_stores.valuesIterator.map(_.uow.status).foreach{ line =>
-//      if( last!= "" && last!=line) {
-//        println(last+" occured "+count+" times")
-//        count = 0
-//      }
-//      count += 1
-//      last = line
-//    }
-//    println(last+" occured "+count+" times")
-//    println("--------------")
+    import collection.JavaConversions._
+    println("--------------")
+    pending_stores.valuesIterator.foreach{ action =>
+      println(action.uow.state+": "+action.uow.uow_id+" "+action.uow.delayable)
+    }
   }
 
   def key(x:QueueEntryRecord) = (x.queue_key, x.entry_seq)
@@ -452,7 +445,7 @@ trait DelayingStoreSupport extends Store
       case null => cb
       case action =>
         val uow = action.uow
-        uow.on_complete( (canceled)=>{ cb } )
+        uow.on_flush( (canceled)=>{ cb } )
         uow.complete_asap
     }
   })

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1235478&r1=1235477&r2=1235478&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Tue Jan 24 20:33:19 2012
@@ -817,7 +817,7 @@ class LevelDBClient(store: LevelDBStore)
                 
               }
             }
-            if( !uow.complete_listeners.isEmpty ) {
+            if( uow.flush_sync ) {
               sync_needed = true
             }
           }



Mime
View raw message