activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1154233 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala...
Date Fri, 05 Aug 2011 14:16:06 GMT
Author: chirino
Date: Fri Aug  5 14:16:05 2011
New Revision: 1154233

URL: http://svn.apache.org/viewvc?rev=1154233&view=rev
Log:
Associate and track a locator attribute /w messages which may help some store implementations
perform faster loads from disk than just using the store message id.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Aug  5 14:16:05 2011
@@ -17,10 +17,10 @@
 package org.apache.activemq.apollo.broker.store.bdb
 
 import dto.{BDBStoreDTO, BDBStoreStatusDTO}
-import java.util.concurrent.atomic.AtomicLong
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
+import atomic.{AtomicReference, AtomicLong}
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.ListEventAggregator
@@ -165,7 +165,7 @@ class BDBStore(var config:BDBStoreDTO) e
   load_source.resume
 
 
-  def load_message(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long, locator:AtomicLong)(callback: (Option[MessageRecord])
=> Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Aug  5 14:16:05
2011
@@ -49,4 +49,5 @@ message QueueEntryPB {
   optional bytes attachment=5;
   optional int32 redeliveries = 6;
   optional sint64 expiration=7;
+  optional int64 messageLocator=8;
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Fri Aug  5 14:16:05 2011
@@ -16,14 +16,13 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
 import protocol.Protocol
 import org.apache.activemq.apollo.filter.Filterable
-import org.apache.activemq.apollo.broker.store.{StoreUOW, MessageRecord}
-import org.apache.activemq.apollo.util.{Log, Logging}
-import org.apache.activemq.apollo.dto.DestinationDTO
+import org.apache.activemq.apollo.broker.store.StoreUOW
+import org.apache.activemq.apollo.util.Log
+import java.util.concurrent.atomic.AtomicLong
 
 object DeliveryProducer extends Log
 
@@ -168,11 +167,17 @@ class Delivery {
   var message: Message = null
 
   /**
-   * A reference to the stored version of the message.
+   * The id the store assigned the message
    */
   var storeKey:Long = -1
 
   /**
+   * After the store persists the message he may be able to supply us with  locator handle
+   * which will load the message faster than looking it up via the store key.
+   */
+  var storeLocator:AtomicLong = null
+
+  /**
    * The transaction the delivery is participating in.
    */
   var uow:StoreUOW = null
@@ -189,12 +194,14 @@ class Delivery {
     size = other.size
     message = other.message
     storeKey = other.storeKey
+    storeLocator = other.storeLocator
     this
   }
 
   def createMessageRecord() = {
     val record = message.protocol.encode(message)
     assert( record.size == size )
+    record.locator = storeLocator
     record
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Aug  5 14:16:05 2011
@@ -19,8 +19,6 @@ package org.apache.activemq.apollo.broke
 import java.util.concurrent.TimeUnit
 
 import org.fusesource.hawtdispatch._
-import java.util.concurrent.atomic.AtomicInteger
-
 import protocol.ProtocolFactory
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.broker.store._
@@ -30,6 +28,7 @@ import org.fusesource.hawtdispatch.{List
 import OptionSupport._
 import security.SecurityContext
 import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO}
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicInteger}
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
@@ -823,7 +822,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    state = new Swapped(qer.message_key, qer.size, qer.expiration)
+    state = new Swapped(qer.message_key, new AtomicLong(qer.message_locator), qer.size, qer.expiration)
     this
   }
 
@@ -882,6 +881,7 @@ class QueueEntry(val queue:Queue, val se
     qer.queue_key = queue.store_id
     qer.entry_seq = seq
     qer.message_key = state.message_key
+    qer.message_locator = state.message_locator.get()
     qer.size = state.size
     qer.expiration = expiration
     qer
@@ -972,6 +972,8 @@ class QueueEntry(val queue:Queue, val se
      */
     def message_key = -1L
 
+    def message_locator: AtomicLong = null
+
     /**
      * Attempts to dispatch the current entry to the subscriptions position at the entry.
      * @returns true if at least one subscription advanced to the next entry as a result
of dispatching.
@@ -1097,6 +1099,8 @@ class QueueEntry(val queue:Queue, val se
     override def size = delivery.size
     override def expiration = delivery.message.expiration
     override def message_key = delivery.storeKey
+    override def message_locator = delivery.storeLocator
+
     var remove_pending = false
 
     override def is_swapped_or_swapping_out = {
@@ -1137,7 +1141,8 @@ class QueueEntry(val queue:Queue, val se
 
               delivery.uow = queue.virtual_host.store.create_uow
               val uow = delivery.uow
-              delivery.storeKey = uow.store(delivery.createMessageRecord)
+              delivery.storeLocator = new AtomicLong()
+              delivery.storeKey = uow.store(delivery.createMessageRecord )
               store
               if( asap ) {
                 uow.complete_asap
@@ -1170,7 +1175,7 @@ class QueueEntry(val queue:Queue, val se
         queue.swap_out_size_counter += size
         queue.swap_out_item_counter += 1
 
-        state = new Swapped(delivery.storeKey, size, expiration)
+        state = new Swapped(delivery.storeKey, delivery.storeLocator, size, expiration)
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
@@ -1318,7 +1323,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val size:Int, override val expiration:Long)
extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicLong, override
val size:Int, override val expiration:Long) extends EntryState {
 
     queue.individual_swapped_items += 1
 
@@ -1346,7 +1351,7 @@ class QueueEntry(val queue:Queue, val se
         // start swapping in...
         swapping_in = true
         queue.swapping_in_size += size
-        queue.virtual_host.store.load_message(message_key) { delivery =>
+        queue.virtual_host.store.load_message(message_key, message_locator) { delivery =>
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
           if( delivery.isDefined ) {
@@ -1375,6 +1380,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.message = ProtocolFactory.get(messageRecord.protocol.toString).get.decode(messageRecord)
         delivery.size = messageRecord.size
         delivery.storeKey = messageRecord.key
+        delivery.storeLocator = messageRecord.locator
 
         queue.swapped_in_size += delivery.size
         queue.swapped_in_items += 1

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Fri Aug  5 14:16:05 2011
@@ -18,12 +18,12 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.util._
-import path.Path
 import scala.collection.immutable.List
 import org.apache.activemq.apollo.dto._
 import security.SecurityContext
 import store.StoreUOW
 import util.continuations._
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -188,6 +188,7 @@ abstract class DeliveryProducerRoute(val
             } else {
               copy.uow.retain
             }
+            copy.storeLocator = new AtomicLong()
             copy.storeKey = copy.uow.store(copy.createMessageRecord)
           }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Fri Aug  5 14:16:05 2011
@@ -21,7 +21,7 @@ import java.util.HashMap
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
-import atomic.AtomicInteger
+import atomic.{AtomicReference, AtomicInteger}
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO, IntMetricDTO}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
Fri Aug  5 14:16:05 2011
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.fusesource.hawtbuf.Buffer
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -31,5 +32,6 @@ class MessageRecord {
   var buffer: Buffer = _
   var zero_copy_buffer: ZeroCopyBuffer = _
   var expiration = 0L
+  var locator:AtomicLong = _
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
Fri Aug  5 14:16:05 2011
@@ -85,6 +85,8 @@ object PBSupport {
     pb.setMessageKey(v.message_key)
     pb.setAttachment(v.attachment)
     pb.setSize(v.size)
+    if(v.message_locator!=0)
+      pb.setMessageLocator(v.message_locator)
     if(v.expiration!=0)
       pb.setExpiration(v.expiration)
     if(v.redeliveries!=0)
@@ -97,6 +99,7 @@ object PBSupport {
     rc.queue_key = pb.getQueueKey
     rc.entry_seq = pb.getQueueSeq
     rc.message_key = pb.getMessageKey
+    rc.message_locator = pb.getMessageLocator
     rc.attachment = pb.getAttachment
     rc.size = pb.getSize
     rc.expiration = pb.getExpiration

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
Fri Aug  5 14:16:05 2011
@@ -29,6 +29,7 @@ class QueueEntryRecord {
   var queue_key = 0L
   var entry_seq = 0L
   var message_key = 0L
+  var message_locator = 0L
   var attachment:Buffer = _
   var size = 0
   var expiration = 0L

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Aug  5 14:16:05 2011
@@ -16,10 +16,11 @@ package org.apache.activemq.apollo.broke
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-import org.apache.activemq.apollo.dto.{StoreStatusDTO, StoreDTO}
+import org.apache.activemq.apollo.dto.StoreStatusDTO
 import org.apache.activemq.apollo.util._
 import java.io.{InputStream, OutputStream}
 import scala.util.continuations._
+import java.util.concurrent.atomic.AtomicLong
 
 trait StreamManager[A] {
   def using_queue_stream(func: (A)=>Unit)
@@ -108,7 +109,7 @@ trait Store extends ServiceTrait {
   /**
    * Loads a delivery with the associated id from persistent storage.
    */
-  def load_message(messageKey:Long)(callback:(Option[MessageRecord])=>Unit )
+  def load_message(messageKey:Long, locator:AtomicLong)(callback:(Option[MessageRecord])=>Unit
)
 
   /**
    * Exports the contents of the store to the provided streams.  Each stream should contain

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreBenchmarkSupport.scala
Fri Aug  5 14:16:05 2011
@@ -22,8 +22,8 @@ import org.fusesource.hawtdispatch.TaskT
 import java.util.concurrent.{TimeUnit, CountDownLatch}
 import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
 import collection.mutable.ListBuffer
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger, AtomicBoolean}
 import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter}
+import java.util.concurrent.atomic._
 
 /**
  * <p>Implements generic testing of Store implementations.</p>
@@ -99,6 +99,7 @@ abstract class StoreBenchmarkSupport ext
     message.protocol = ascii("test-protocol")
     message.buffer = ascii(content).buffer
     message.size = message.buffer.length
+    message.locator = new AtomicLong()
     batch.store(message)
   }
 
@@ -190,7 +191,7 @@ abstract class StoreBenchmarkSupport ext
     var keys = message_keys.toList
     val metric = benchmarkCount(keys.size) {
       val latch = new CountDownLatch(1)
-      store.load_message(keys.head) { msg=>
+      store.load_message(keys.head, new AtomicLong(0)) { msg=>
         assert(msg.isDefined, "message key not found: "+keys.head)
         latch.countDown
       }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
Fri Aug  5 14:16:05 2011
@@ -17,12 +17,12 @@ package org.apache.activemq.apollo.broke
  * limitations under the License.
  */
 import org.fusesource.hawtbuf.AsciiBuffer._
-import org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch.TaskTracker
 import java.util.concurrent.{TimeUnit, CountDownLatch}
-import org.scalatest.{BeforeAndAfterEach, BeforeAndAfterAll}
+import org.scalatest.BeforeAndAfterEach
 import collection.mutable.ListBuffer
 import org.apache.activemq.apollo.util.{LoggingTracker, FunSuiteSupport, LongCounter}
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * <p>Implements generic testing of Store implementations.</p>
@@ -139,7 +139,7 @@ abstract class StoreFunSuiteSupport exte
     val A = add_queue("A")
     val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head)(cb) )
+    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head, new AtomicLong())(cb)
)
     expect(ascii("message 1").buffer) {
       rc.get.buffer
     }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
Fri Aug  5 14:16:05 2011
@@ -20,7 +20,7 @@ import dto.{HawtDBStoreStatusDTO, HawtDB
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
-import atomic.{AtomicInteger, AtomicLong}
+import atomic.{AtomicReference, AtomicInteger, AtomicLong}
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
@@ -178,7 +178,7 @@ class HawtDBStore(var config:HawtDBStore
     }
   }
 
-  def load_message(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long, locator:AtomicLong)(callback: (Option[MessageRecord])
=> Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1154233&r1=1154232&r2=1154233&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Aug  5 14:16:05 2011
@@ -17,10 +17,10 @@
 package org.apache.activemq.apollo.broker.store.jdbm2
 
 import dto.{JDBM2StoreDTO, JDBM2StoreStatusDTO}
-import java.util.concurrent.atomic.AtomicLong
 import collection.Seq
 import org.fusesource.hawtdispatch._
 import java.util.concurrent._
+import atomic.{AtomicReference, AtomicLong}
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.ListEventAggregator
@@ -153,7 +153,7 @@ class JDBM2Store(var config:JDBM2StoreDT
   load_source.resume
 
 
-  def load_message(messageKey: Long)(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long, locator:AtomicLong)(callback: (Option[MessageRecord])
=> Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()



Mime
View raw message