activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1235138 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/br...
Date Tue, 24 Jan 2012 06:12:21 GMT
Author: chirino
Date: Tue Jan 24 06:12:20 2012
New Revision: 1235138

URL: http://svn.apache.org/viewvc?rev=1235138&view=rev
Log:
The DelayingStoreSupport class now does better state tracking of the UoWs that it processes.
 Fixes bugs that would occur when UoWs would get canceled.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1235138&r1=1235137&r2=1235138&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Tue Jan 24 06:12:20 2012
@@ -207,7 +207,7 @@ class BDBStore(var config:BDBStoreDTO) e
         message_load_latency = message_load_latency_counter(true)
 //        client.metric_journal_append = client.metric_journal_append_counter(true)
 //        client.metric_index_update = client.metric_index_update_counter(true)
-        commit_latency = commit_latency_counter(true)
+        close_latency = close_latency_counter(true)
         message_load_batch_size =  message_load_batch_size_counter(true)
 
         poll_stats

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1235138&r1=1235137&r2=1235138&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Tue Jan 24 06:12:20 2012
@@ -106,6 +106,9 @@ class Topic(val router:LocalRouter, val 
   }
 
   case class ProxyConsumerSession(proxy:ProxyDeliveryConsumer, session:DeliverySession) extends
DeliverySession with SessionSinkFilter[Delivery] {
+
+    override def toString = proxy.consumer.toString
+
     def downstream = session
 
     dispatch_queue {
@@ -133,6 +136,8 @@ class Topic(val router:LocalRouter, val 
 
     def offer(value: Delivery) = {
       val copy = value.copy();
+      copy.uow = value.uow
+      copy.ack = value.ack
       copy.sender = destination_dto
       downstream.offer(copy)
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1235138&r1=1235137&r2=1235138&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Tue Jan 24 06:12:20 2012
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.broker.store
 
 import collection.mutable.ListBuffer
-import java.util.HashMap
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
@@ -26,6 +25,50 @@ import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO, IntMetricDTO}
 import org.fusesource.hawtbuf.Buffer
+import java.lang.ref.WeakReference
+
+object DelayingStoreSupport extends Log
+
+sealed trait UowState {
+  def stage:Int
+}
+// UoW is initial open.
+object UowOpen extends UowState {
+  override def stage = 0
+  override def toString = "UowOpen"
+}
+// UoW is Committed once the broker finished creating it.
+object UowClosed extends UowState {
+  override def stage = 1
+  override def toString = "UowClosed"
+}
+// UOW is delayed until we send it to get flushed.
+object UowDelayed extends UowState {
+  override def stage = 2
+  override def toString = "UowDelayed"
+}
+object UowFlushQueued extends UowState {
+  override def stage = 3
+  override def toString = "UowFlushQueued"
+}
+
+object UowFlushing extends UowState {
+  override def stage = 4
+  override def toString = "UowFlushing"
+}
+// Then it moves on to be flushed. Flushed just
+// means the message has been written to disk
+// and out of memory
+object UowFlushed extends UowState {
+  override def stage = 5
+  override def toString = "UowFlushed"
+}
+
+// Once completed then you know it has been synced to disk.
+object UowCompleted extends UowState {
+  override def stage = 6
+  override def toString = "UowCompleted"
+}
 
 /**
  * <p>
@@ -38,6 +81,8 @@ import org.fusesource.hawtbuf.Buffer
  */
 trait DelayingStoreSupport extends Store with BaseService {
 
+  import DelayingStoreSupport._
+
   protected def flush_delay:Long
 
   protected def get_next_msg_key:Long
@@ -50,7 +95,11 @@ trait DelayingStoreSupport extends Store
   //
   /////////////////////////////////////////////////////////////////////
   val dispatch_queue:DispatchQueue = createQueue(toString)
-  val aggregator = new AggregatingExecutor(dispatch_queue)
+  
+  val event_source = createSource(new ListEventAggregator[Runnable](), dispatch_queue)
+  event_source.setEventHandler(^{ event_source.getData.foreach(_.run()) });
+  event_source.resume
+  
 
   /////////////////////////////////////////////////////////////////////
   //
@@ -77,60 +126,69 @@ trait DelayingStoreSupport extends Store
     }
 
     val uow_id:Int = next_batch_id.getAndIncrement
-    var commit_ts:Long = 0
+    var close_ts:Long = 0
 
     // User might request the UOW to flush asap
     var flush_asap = false
     // Or to get canceled..
     var canceled = false
 
-    // Perhaps track the 4 states below with a single enum?
-
-    // UOW is delayed until we send it to get flushed.
-    var delayed = true
-    // Once completed it will be marked flushed. Flushed just
-    // means the message has been written to disk
-    // and out of memory
-    var flushed = false
-    // You have to wait for it to be completed
-    // to know the write has been synced to disk.
-    var completed = false
+    private var _state:UowState = UowOpen
+    
+    def state = this._state
+    def state_=(next:UowState) {
+      assert(this._state.stage < next.stage)
+      this._state = next
+    }
 
-    var flush_listeners = ListBuffer[(Boolean)=>Unit]()
-    var complete_listeners = ListBuffer[() => Unit]()
+    var complete_listeners = ListBuffer[(Boolean) => Unit]()
 
     var actions = Map[Long, MessageAction]()
     var map_actions = Map[Buffer, Buffer]()
 
+
     def put(key: Buffer, value: Buffer) = {
       map_actions += (key -> value)
     }
 
     def on_flush(callback: (Boolean)=>Unit) = {
       (this.synchronized {
-        if( flushed ) {
+        if( state.stage >= UowFlushed.stage ) {
           Some(canceled)
         } else {
-          flush_listeners += callback
+          complete_listeners += callback
           None
         }
       }).foreach(callback(_))
     }
 
-    def on_complete(callback: =>Unit) = {
-      if( this.synchronized {
-        if( completed ) {
+
+    def on_complete(callback: (Boolean)=>Unit) = {
+      if(this.synchronized {
+        if( state eq UowCompleted ) {
           true
         } else {
-          complete_listeners += ( ()=> callback  )
+          complete_listeners += callback
           false
         }
       }) {
         callback
       }
     }
+    
+    def complete_asap = flush_it
 
-    def complete_asap() = this.synchronized { flush_asap=true }
+    def flush_it() = this.synchronized { 
+      flush_asap=true
+      if( state eq UowDelayed ) {
+        queue_flush(this)
+        false
+      } else if( state eq UowCompleted ) {
+        true
+      } else {
+        false
+      }
+    }
 
     var delayable_actions = 0
 
@@ -138,15 +196,15 @@ trait DelayingStoreSupport extends Store
 
     def rm(msg:Long) = {
       actions -= msg
-      if( actions.isEmpty && map_actions.isEmpty ) {
+      if( actions.isEmpty && map_actions.isEmpty && state.stage < UowFlushing.stage
) {
         cancel
       }
     }
 
-    def cancel = {
+
+    private def cancel = {
       dispatch_queue.assertExecuting()
       canceled = true
-      delayed_uows.remove(uow_id)
       on_completed
     }
 
@@ -157,9 +215,7 @@ trait DelayingStoreSupport extends Store
       action.message_record = record
       this.synchronized {
         actions += record.key -> action
-      }
-      aggregator {
-        pending_stores.put(record.key, action)
+        pending_stores.put(action.message_record.key, action)
       }
       delayable_actions += 1
       record.key
@@ -181,12 +237,8 @@ trait DelayingStoreSupport extends Store
         val a = action(entry.message_key)
         a.enqueues += entry
         delayable_actions += 1
-        a
-      }
-      aggregator {
         cancelable_enqueue_actions.put(key(entry), a)
       }
-
     }
 
     def dequeue(entry: QueueEntryRecord) = {
@@ -194,47 +246,42 @@ trait DelayingStoreSupport extends Store
         action(entry.message_key).dequeues += entry
       }
     }
-
-    override def dispose = {
-      commit_ts = System.nanoTime
-      uow_source.merge(this)
+    
+    def have_locators:Boolean = {
+      actions.values.foreach{ a =>
+        // There must either be a dequeue or a message record for a enqueue request.
+        if( !a.enqueues.isEmpty && ( a.message_record==null && a.dequeues.isEmpty
) ) {
+          return false 
+        }
+        if( locator_based && a.message_record==null && !a.dequeues.isEmpty
) {
+          a.dequeues.foreach { d =>
+            if ( d.message_locator.get() == null ) {
+              return false
+            }
+          }
+        }
+      }
+      true  
     }
 
-    def on_flushed() = this.synchronized {
-      if( !flushed ) {
-        flushed = true
-        flush_listeners.foreach(_(canceled))
-      }
+    override def dispose = this.synchronized {
+      state = UowClosed
+      close_ts = System.nanoTime
+      uow_source.merge(this)
     }
 
-    def on_completed() = this.synchronized {
-      if ( !completed ) {
-        on_flushed
-        completed = true
-        commit_latency_counter += System.nanoTime-commit_ts
-        complete_listeners.foreach(_())
+    def on_completed = this.synchronized {
+      if ( state.stage < UowCompleted.stage ) {
+        state = UowCompleted
+        close_latency_counter += System.nanoTime-close_ts
+        complete_listeners.foreach(_(canceled))
         super.dispose
       }
     }
   }
 
-  def flush_message(message_key: Long)(cb: => Unit) = flush_message_source.merge((message_key,
cb _))
-
-  val flush_message_source = createSource(new ListEventAggregator[(Long, ()=>Unit)](),
dispatch_queue)
-  flush_message_source.setEventHandler(^{drain_flush_message});
-  flush_message_source.resume
+  protected def locator_based = false
   
-  def drain_flush_message:Unit = {
-    flush_message_source.getData.foreach { case (message_key, cb) =>
-      pending_stores.get(message_key) match {
-        case null => cb()
-        case action =>
-          action.uow.on_complete( cb() )
-          flush(action.uow)
-      }
-    }
-  }
-
   implicit def toTimeMetricDTO( m: TimeMetric) = {
     val rc = new TimeMetricDTO()
     rc.count = m.count
@@ -258,8 +305,8 @@ trait DelayingStoreSupport extends Store
   var metric_flushed_message_counter:Long = 0
   var metric_flushed_enqueue_counter:Long = 0
 
-  val commit_latency_counter = new TimeCounter
-  var commit_latency = commit_latency_counter(false)
+  val close_latency_counter = new TimeCounter
+  var close_latency = close_latency_counter(false)
 
   val message_load_latency_counter = new TimeCounter
   var message_load_latency = message_load_latency_counter(false)
@@ -308,9 +355,8 @@ trait DelayingStoreSupport extends Store
   uow_source.setEventHandler(^{drain_uows});
   uow_source.resume
 
-  var pending_stores = new HashMap[Long, DelayableUOW#MessageAction]()
-  var cancelable_enqueue_actions = new HashMap[(Long,Long), DelayableUOW#MessageAction]()
-  var delayed_uows = new HashMap[Int, DelayableUOW]()
+  val pending_stores = new ConcurrentHashMap[Long, DelayableUOW#MessageAction]()
+  var cancelable_enqueue_actions = new ConcurrentHashMap[(Long,Long), DelayableUOW#MessageAction]()
 
   val next_batch_id = new AtomicInteger(1)
 
@@ -318,68 +364,111 @@ trait DelayingStoreSupport extends Store
     dispatch_queue.assertExecuting()
     uow_source.getData.foreach { uow =>
 
-      delayed_uows.put(uow.uow_id, uow)
-
-      uow.actions.foreach { case (msg, action) =>
+      // Broker could issue a flush_message call before
+      // this stage runs.. which make the stage jump over UowDelayed
+      if( uow.state.stage < UowDelayed.stage ) {
+        uow.state = UowDelayed
+      }
 
-        // dequeues can cancel out previous enqueues
-        action.dequeues.foreach { currentDequeue=>
-          val currentKey = key(currentDequeue)
-          val prev_action:DelayableUOW#MessageAction = cancelable_enqueue_actions.remove(currentKey)
 
-          def prev_uow = prev_action.uow
+      if( uow.state.stage < UowFlushing.stage ) {
+        uow.actions.foreach { case (msg, action) =>
+
+          // The UoW may have been canceled.
+          if( action.message_record!=null && action.enqueues.isEmpty ) {
+            pending_stores.remove(msg)
+            action.message_record = null
+            uow.delayable_actions -= 1
+            metric_canceled_message_counter += 1
+          }
+          if( action.isEmpty ) {
+            action.cancel()
+          }
 
-          if( prev_action!=null && !prev_uow.canceled ) {
+          // dequeues can cancel out previous enqueues
+          action.dequeues.foreach { entry=>
 
+            val entry_key = key(entry)
+            val prev_action:DelayableUOW#MessageAction = cancelable_enqueue_actions.remove(entry_key)
 
-            prev_uow.delayable_actions -= 1
-            metric_canceled_enqueue_counter += 1
+            if( prev_action!=null ) {
+              val prev_uow = prev_action.uow
 
-            // yay we can cancel out a previous enqueue
-            prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) == currentKey
)
+              prev_uow.synchronized {
+                if( !prev_uow.canceled ) {
+
+                  prev_uow.delayable_actions -= 1
+                  metric_canceled_enqueue_counter += 1
+
+                  // yay we can cancel out a previous enqueue
+                  prev_action.enqueues = prev_action.enqueues.filterNot( x=> key(x) ==
entry_key )
+
+                  if( prev_uow.state.stage >= UowDelayed.stage ) {
+                    // if the message is not in any queues.. we can gc it..
+                    if( prev_action.enqueues == Nil && prev_action.message_record
!=null ) {
+                      pending_stores.remove(msg)
+                      prev_action.message_record = null
+                      prev_uow.delayable_actions -= 1
+                      metric_canceled_message_counter += 1
+                    }
+
+                    // Cancel the action if it's now empty
+                    if( prev_action.isEmpty ) {
+                      prev_action.cancel()
+                    } else if( !prev_uow.delayable ) {
+                      // flush it if there is no point in delaying anymore
+                      prev_uow.complete_asap()
+                    }
+                  }
 
-            // if the message is not in any queues.. we can gc it..
-            if( prev_action.enqueues == Nil && prev_action.message_record !=null
) {
-              pending_stores.remove(msg)
-              prev_action.message_record = null
-              prev_uow.delayable_actions -= 1
-              metric_canceled_message_counter += 1
-            }
+                }
+              }
 
-            // Cancel the action if it's now empty
-            if( prev_action.isEmpty ) {
-              prev_action.cancel()
-            } else if( !prev_uow.delayable ) {
-              // flush it if there is no point in delyaing anymore
-              flush(prev_uow)
-            }
+              // since we canceled out the previous enqueue.. now cancel out the action
+              action.dequeues = action.dequeues.filterNot( _ == entry)
+              if( action.isEmpty ) {
+                action.cancel()
+              }
 
-            // since we canceled out the previous enqueue.. now cancel out the action
-            action.dequeues = action.dequeues.filterNot( _ == currentDequeue)
-            if( action.isEmpty ) {
-              action.cancel()
             }
           }
         }
       }
 
-      if( !uow.canceled ) {
+      if( !uow.canceled && uow.state.stage < UowFlushQueued.stage ) {
         if( uow.delayable ) {
-          val uow_id = uow.uow_id
-          dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^{
-            flush(delayed_uows.get(uow_id))
-          })
+          // Let the uow get GCed if its' canceled during the delay window..
+          val ref = new WeakReference[DelayableUOW](uow)
+          schedule_flush(ref)
         } else {
-          flush(uow)
+          queue_flush(uow)
         }
       }
     }
   }
 
-  private def flush(uow:DelayableUOW) = {
-    if( uow!=null && uow.delayed && !uow.canceled ) {
-      uow.delayed = false
-      delayed_uows.remove(uow.uow_id)
+  def flush_message(message_key: Long)(cb: => Unit) = event_source.merge(^{
+    pending_stores.get(message_key) match {
+      case null => cb
+      case action =>
+        val uow = action.uow
+        uow.on_complete( (canceled)=>{ cb } )
+        uow.complete_asap
+    }
+  })
+
+  private def schedule_flush(ref: WeakReference[DelayableUOW]) {
+    dispatch_queue.executeAfter(flush_delay, TimeUnit.MILLISECONDS, ^ {
+      val uow = ref.get();
+      if (uow != null) {
+        queue_flush(uow)
+      }
+    })
+  }
+
+  private def queue_flush(uow:DelayableUOW) = {
+    if( uow!=null && !uow.canceled && uow.state.stage < UowFlushQueued.stage
) {
+      uow.state = UowFlushQueued
       flush_source.merge(uow)
     }
   }
@@ -398,16 +487,17 @@ trait DelayingStoreSupport extends Store
       return
     }
     
-    val uows = flush_source.getData
-
+    
     var fasap = 0
     var fdelayed = 0
     
     // Some UOWs may have been canceled.
-    uows.flatMap { uow=>
+    val uows = flush_source.getData.flatMap { uow=>
       if( uow.canceled ) {
         None
       } else {
+        uow.state = UowFlushing
+        assert( uow.have_locators )
         if( uow.flush_asap ) {
           fasap += 1
         } else {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1235138&r1=1235137&r2=1235138&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Tue Jan 24 06:12:20 2012
@@ -77,13 +77,15 @@ trait StoreUOW extends Retained {
    * has written to disk and flushed of the application
    * buffers.
    */
-  def on_flush(callback: (Boolean)=>Unit)
+  def on_flush(callback: (Boolean)=>Unit):Unit
+  def on_flush(callback: =>Unit):Unit = on_flush((canceled)=>{callback})
 
   /**
    * The specified callback is executed once the UOW
    * has fully completed, that is it's been flushed and
    * and synced to disk.
    */
-  def on_complete(callback: =>Unit)
+  def on_complete(callback: (Boolean)=>Unit):Unit
+  def on_complete(callback: =>Unit):Unit = on_complete((canceled)=>{callback})
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1235138&r1=1235137&r2=1235138&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Tue Jan 24 06:12:20 2012
@@ -764,6 +764,7 @@ class LevelDBClient(store: LevelDBStore)
                 if( locator==null ) {
                   locator = entry.message_locator.get().asInstanceOf[(Long, Int)]
                 }
+                assert(locator!=null)
                 val (pos, len) = locator
                 val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
                 appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
@@ -773,6 +774,7 @@ class LevelDBClient(store: LevelDBStore)
 
               var locator_buffer:Buffer = null
               action.enqueues.foreach { entry =>
+                assert(locator!=null)
                 val (pos, len) = locator
                 entry.message_locator.set(locator)
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1235138&r1=1235137&r2=1235138&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
Tue Jan 24 06:12:20 2012
@@ -58,6 +58,8 @@ class LevelDBStore(val config:LevelDBSto
 
   override def toString = store_kind+" store at "+config.directory
 
+  override protected def locator_based = true
+
   def flush_delay = config.flush_delay.getOrElse(500)
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
@@ -244,7 +246,7 @@ class LevelDBStore(val config:LevelDBSto
         message_load_latency = message_load_latency_counter(true)
 //        client.metric_journal_append = client.metric_journal_append_counter(true)
 //        client.metric_index_update = client.metric_index_update_counter(true)
-        commit_latency = commit_latency_counter(true)
+        close_latency = close_latency_counter(true)
         message_load_batch_size =  message_load_batch_size_counter(true)
 
         poll_stats



Mime
View raw message