activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1234262 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/br...
Date Sat, 21 Jan 2012 04:21:04 GMT
Author: chirino
Date: Sat Jan 21 04:21:03 2012
New Revision: 1234262

URL: http://svn.apache.org/viewvc?rev=1234262&view=rev
Log:
Store interface changes to better support the LevelDB store.

- Changed the store interface so that the locator is an AtomicReference[Object] to avoid forcing store to serialize their locators if they don't want to.
- The QueueEntryRecords now holds an AtomicReference to the locator so that it matches MessageRecord.
- The LevelDB based store now exclusively looks up message payloads using locators.
- Eliminated the msg_key -> log  index entry per enqueued message.
- The LevelDB store is now avoiding many of the implicit conversions that the PBSupport class was performing.
- Updated the base Store Test case so that it use locators as it's now required for LevelDB.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Sat Jan 21 04:21:03 2012
@@ -358,7 +358,7 @@ class BDBClient(store: BDBStore) {
                   to_pb(action.message_record)
                 }
 
-                messages_db.put(tx, action.message_record.key, pb)
+                messages_db.put(tx, action.message_record.key, pb.freeze)
               }
 
               action.enqueues.foreach { queueEntry =>
@@ -567,7 +567,7 @@ class BDBClient(store: BDBStore) {
         streams.using_queue_stream { queue_stream =>
           queues_db.cursor(tx) { (_, value) =>
             val record:QueueRecord = value
-            record.writeFramed(queue_stream)
+            record.freeze.writeFramed(queue_stream)
             true
           }
         }
@@ -588,7 +588,7 @@ class BDBClient(store: BDBStore) {
         streams.using_queue_entry_stream { queue_entry_stream=>
           entries_db.cursor(tx) { (key, value) =>
             val record:QueueEntryRecord = value
-            record.writeFramed(queue_entry_stream)
+            record.freeze.writeFramed(queue_entry_stream)
             true
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Sat Jan 21 04:21:03 2012
@@ -170,7 +170,7 @@ class BDBStore(var config:BDBStoreDTO) e
   load_source.resume
 
 
-  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long, locator:AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Sat Jan 21 04:21:03 2012
@@ -194,7 +194,7 @@ class Delivery {
    * After the store persists the message he may be able to supply us with  locator handle
    * which will load the message faster than looking it up via the store key.
    */
-  var storeLocator:AtomicReference[Array[Byte]] = null
+  var storeLocator:AtomicReference[Object] = null
 
   /**
    * The transaction the delivery is participating in.

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Jan 21 04:21:03 2012
@@ -1066,8 +1066,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   def init(qer:QueueEntryRecord):QueueEntry = {
-    val locator = new AtomicReference[Array[Byte]](Option(qer.message_locator).map(_.toByteArray).getOrElse(null))
-    state = new Swapped(qer.message_key, locator, qer.size, qer.expiration, qer.redeliveries, null)
+    state = new Swapped(qer.message_key, qer.message_locator, qer.size, qer.expiration, qer.redeliveries, null)
     this
   }
 
@@ -1126,7 +1125,7 @@ class QueueEntry(val queue:Queue, val se
     qer.queue_key = queue.store_id
     qer.entry_seq = seq
     qer.message_key = state.message_key
-    qer.message_locator = Option(state.message_locator).flatMap(x=> Option(x.get)).map(new Buffer(_)).getOrElse(null)
+    qer.message_locator = state.message_locator
     qer.size = state.size
     qer.expiration = expiration
     qer
@@ -1230,7 +1229,7 @@ class QueueEntry(val queue:Queue, val se
      */
     def message_key = -1L
 
-    def message_locator: AtomicReference[Array[Byte]] = null
+    def message_locator: AtomicReference[Object] = null
 
     /**
      * Attempts to dispatch the current entry to the subscriptions position at the entry.
@@ -1414,7 +1413,7 @@ class QueueEntry(val queue:Queue, val se
 
               delivery.uow = queue.virtual_host.store.create_uow
               val uow = delivery.uow
-              delivery.storeLocator = new AtomicReference[Array[Byte]]()
+              delivery.storeLocator = new AtomicReference[Object]()
               delivery.storeKey = uow.store(delivery.createMessageRecord )
               store
               if( asap ) {
@@ -1606,7 +1605,7 @@ class QueueEntry(val queue:Queue, val se
    * entry is persisted, it can move into this state.  This state only holds onto the
    * the massage key so that it can reload the message from the store quickly when needed.
    */
-  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Array[Byte]], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription) extends EntryState {
+  class Swapped(override val message_key:Long, override val message_locator:AtomicReference[Object], override val size:Int, override val expiration:Long, var _redeliveries:Short, var acquirer:Subscription) extends EntryState {
 
     queue.individual_swapped_items += 1
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Sat Jan 21 04:21:03 2012
@@ -166,7 +166,7 @@ abstract class DeliveryProducerRoute(rou
             } else {
               copy.uow.retain
             }
-            copy.storeLocator = new AtomicReference[Array[Byte]]()
+            copy.storeLocator = new AtomicReference[Object]()
             copy.storeKey = copy.uow.store(copy.createMessageRecord)
           }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Sat Jan 21 04:21:03 2012
@@ -32,6 +32,6 @@ class MessageRecord {
   var buffer: Buffer = _
   var direct_buffer: DirectBuffer = _
   var expiration = 0L
-  var locator:AtomicReference[Array[Byte]] = _
+  var locator:AtomicReference[Object] = _
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PBSupport.scala Sat Jan 21 04:21:03 2012
@@ -27,7 +27,7 @@ import org.fusesource.hawtbuf.Buffer
  */
 object PBSupport {
 
-  implicit def to_pb(v: MessageRecord):MessagePB.Buffer = {
+  implicit def to_pb(v: MessageRecord):MessagePB.Bean = {
     val pb = new MessagePB.Bean
     pb.setMessageKey(v.key)
     pb.setProtocol(v.protocol)
@@ -35,7 +35,7 @@ object PBSupport {
     pb.setValue(v.buffer)
     if(v.expiration!=0)
       pb.setExpiration(v.expiration)
-    pb.freeze
+    pb
   }
 
   implicit def from_pb(pb: MessagePB.Getter):MessageRecord = {
@@ -48,51 +48,49 @@ object PBSupport {
     rc
   }
 
-  def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).writeUnframed(out)
+  def encode_message_record(out: OutputStream, v: MessageRecord) = to_pb(v).freeze.writeUnframed(out)
   def decode_message_record(in: InputStream):MessageRecord = MessagePB.FACTORY.parseUnframed(in)
 
-  implicit def encode_message_record(v: MessageRecord):Array[Byte] = to_pb(v).toUnframedByteArray
+  implicit def encode_message_record(v: MessageRecord):Array[Byte] = to_pb(v).freeze.toUnframedByteArray
   implicit def decode_message_record(data: Array[Byte]):MessageRecord = MessagePB.FACTORY.parseUnframed(data)
 
-  implicit def encode_message_record_buffer(v: MessageRecord) = to_pb(v).toUnframedBuffer
+  implicit def encode_message_record_buffer(v: MessageRecord) = to_pb(v).freeze.toUnframedBuffer
   implicit def decode_message_record_buffer(data: Buffer):MessageRecord = MessagePB.FACTORY.parseUnframed(data)
 
 
-  implicit def to_pb(v: QueueRecord):QueuePB.Buffer = {
+  implicit def to_pb(v: QueueRecord):QueuePB.Bean = {
     val pb = new QueuePB.Bean
     pb.setKey(v.key)
     pb.setBindingData(v.binding_data)
     pb.setBindingKind(v.binding_kind)
-    pb.freeze
+    pb
   }
 
   implicit def from_pb(pb: QueuePB.Getter):QueueRecord = {
     QueueRecord(pb.getKey, pb.getBindingKind, pb.getBindingData)
   }
 
-  def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).writeUnframed(out)
+  def encode_queue_record(out: OutputStream, v: QueueRecord) = to_pb(v).freeze.writeUnframed(out)
   def decode_queue_record(in: InputStream):QueueRecord = QueuePB.FACTORY.parseUnframed(in)
 
-  implicit def encode_queue_record(v: QueueRecord) = to_pb(v).toUnframedByteArray
+  implicit def encode_queue_record(v: QueueRecord) = to_pb(v).freeze.toUnframedByteArray
   implicit def decode_queue_record(data: Array[Byte]):QueueRecord = QueuePB.FACTORY.parseUnframed(data)
 
-  implicit def encode_queue_record_buffer(v: QueueRecord) = to_pb(v).toUnframedBuffer
+  implicit def encode_queue_record_buffer(v: QueueRecord) = to_pb(v).freeze.toUnframedBuffer
   implicit def decode_queue_record_buffer(data: Buffer):QueueRecord = QueuePB.FACTORY.parseUnframed(data)
 
-  implicit def to_pb(v: QueueEntryRecord):QueueEntryPB.Buffer = {
+  implicit def to_pb(v: QueueEntryRecord):QueueEntryPB.Bean = {
     val pb = new QueueEntryPB.Bean
     pb.setQueueKey(v.queue_key)
     pb.setQueueSeq(v.entry_seq)
     pb.setMessageKey(v.message_key)
     pb.setAttachment(v.attachment)
     pb.setSize(v.size)
-    if(v.message_locator!=0)
-      pb.setMessageLocator(v.message_locator)
     if(v.expiration!=0)
       pb.setExpiration(v.expiration)
     if(v.redeliveries!=0)
       pb.setRedeliveries(v.redeliveries)
-    pb.freeze
+    pb
   }
 
   implicit def from_pb(pb: QueueEntryPB.Getter):QueueEntryRecord = {
@@ -100,7 +98,6 @@ object PBSupport {
     rc.queue_key = pb.getQueueKey
     rc.entry_seq = pb.getQueueSeq
     rc.message_key = pb.getMessageKey
-    rc.message_locator = pb.getMessageLocator
     rc.attachment = pb.getAttachment
     rc.size = pb.getSize
     rc.expiration = pb.getExpiration
@@ -108,13 +105,13 @@ object PBSupport {
     rc
   }
 
-  def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) = to_pb(v).writeUnframed(out)
+  def encode_queue_entry_record(out: OutputStream, v: QueueEntryRecord) = to_pb(v).freeze.writeUnframed(out)
   def decode_queue_entry_record(in: InputStream):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(in)
 
-  implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).toUnframedByteArray
+  implicit def encode_queue_entry_record(v: QueueEntryRecord) = to_pb(v).freeze.toUnframedByteArray
   implicit def decode_queue_entry_record(data: Array[Byte]):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data)
 
-  implicit def encode_queue_entry_record_buffer(v: QueueEntryRecord) = to_pb(v).toUnframedBuffer
+  implicit def encode_queue_entry_record_buffer(v: QueueEntryRecord) = to_pb(v).freeze.toUnframedBuffer
   implicit def decode_queue_entry_record_buffer(data: Buffer):QueueEntryRecord = QueueEntryPB.FACTORY.parseUnframed(data)
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Sat Jan 21 04:21:03 2012
@@ -20,6 +20,7 @@ package org.apache.activemq.apollo.broke
 
 
 import org.fusesource.hawtbuf.Buffer
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -29,7 +30,7 @@ class QueueEntryRecord {
   var queue_key = 0L
   var entry_seq = 0L
   var message_key = 0L
-  var message_locator:Buffer = _
+  var message_locator:AtomicReference[Object] = _
   var attachment:Buffer = _
   var size = 0
   var expiration = 0L

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Sat Jan 21 04:21:03 2012
@@ -110,7 +110,7 @@ trait Store extends ServiceTrait {
   /**
    * Loads a delivery with the associated id from persistent storage.
    */
-  def load_message(messageKey:Long, locator:AtomicReference[Array[Byte]])(callback:(Option[MessageRecord])=>Unit )
+  def load_message(messageKey:Long, locator:AtomicReference[Object])(callback:(Option[MessageRecord])=>Unit )
 
   /**
    * Exports the contents of the store to the provided streams.  Each stream should contain

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Sat Jan 21 04:21:03 2012
@@ -24,6 +24,7 @@ import org.apache.activemq.apollo.util.{
 import org.scalatest.BeforeAndAfterEach
 import java.io.File
 import org.apache.activemq.apollo.util.FileSupport._
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * <p>Implements generic testing of Store implementations.</p>
@@ -95,26 +96,29 @@ abstract class StoreFunSuiteSupport exte
     queue_a.key
   }
 
-  def add_message(batch:StoreUOW, content:String):Long = {
+  def add_message(batch:StoreUOW, content:String) = {
     var message = new MessageRecord
     message.protocol = ascii("test-protocol")
     message.buffer = ascii(content).buffer
     message.size = message.buffer.length
-    batch.store(message)
+    message.locator = new AtomicReference[Object]()
+    val key = batch.store(message)
+    (key, message.locator)
   }
 
 
-  def entry(queue_key:Long, entry_seq:Long, message_key:Long=0) = {
+  def entry(queue_key:Long, entry_seq:Long, message_key:(Long, AtomicReference[Object])) = {
     var queueEntry = new QueueEntryRecord
     queueEntry.queue_key = queue_key
     queueEntry.entry_seq = entry_seq
-    queueEntry.message_key = message_key
+    queueEntry.message_key = message_key._1
+    queueEntry.message_locator = message_key._2
     queueEntry
   }
 
   def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
     var batch = store.create_uow
-    var msg_keys = ListBuffer[Long]()
+    var msg_keys = ListBuffer[(Long, AtomicReference[Object])]()
     var next_seq = first_seq
 
     messages.foreach { message=>
@@ -131,7 +135,7 @@ abstract class StoreFunSuiteSupport exte
     batch.release
 
     msg_keys.foreach { msgKey =>
-      store.flush_message(msgKey) {}
+      store.flush_message(msgKey._1) {}
     }
     tracker.await
     msg_keys
@@ -141,7 +145,7 @@ abstract class StoreFunSuiteSupport exte
     val A = add_queue("A")
     val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head, null)(cb) )
+    val rc:Option[MessageRecord] = CB( cb=> store.load_message(msg_keys.head._1, msg_keys.head._2)(cb) )
     expect(ascii("message 1").buffer) {
       rc.get.buffer
     }
@@ -172,8 +176,8 @@ abstract class StoreFunSuiteSupport exte
     val A = add_queue("A")
     val msg_keys = populate(A, "message 1"::"message 2"::"message 3"::Nil)
 
-    val rc:Seq[QueueEntryRecord] = CB( cb=> store.list_queue_entries(A,msg_keys.head, msg_keys.last)(cb) )
-    expect(msg_keys.toSeq) {
+    val rc:Seq[QueueEntryRecord] = CB( cb=> store.list_queue_entries(A,0, Long.MaxValue)(cb) )
+    expect(msg_keys.toSeq.map(_._1)) {
       rc.map( _.message_key )
     }
   }
@@ -211,7 +215,7 @@ abstract class StoreFunSuiteSupport exte
     batch.on_complete(task.run)
     batch.release
 
-    store.flush_message(m1) {}
+    store.flush_message(m1._1) {}
 
     expect(true) {
       tracker.await(1, TimeUnit.SECONDS)

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Sat Jan 21 04:21:03 2012
@@ -31,12 +31,12 @@ import java.util.concurrent.locks.Reentr
 import org.apache.activemq.apollo.util.{TreeMap=>ApolloTreeMap}
 import collection.immutable.TreeMap
 import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport}
-import java.util.concurrent.atomic.AtomicReference
 import scala.Predef._
 import org.fusesource.hawtdb.api._
 import org.fusesource.hawtbuf.Buffer._
 import org.fusesource.hawtdb.internal.page.LFUPageCache
 import org.fusesource.hawtdispatch._
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -474,6 +474,25 @@ class HawtDBClient(store: HawtDBStore) {
     callback.run
   }
 
+  def encode_locator(value:AtomicReference[Object]):Buffer = {
+    if(value==null)
+      return null
+    val l = value.get().asInstanceOf[java.lang.Long]
+    if(l==null)
+      return null
+    encode_locator(l.longValue())
+  }
+
+  def encode_locator(value:Long):Buffer = {
+    encode_long(value.asInstanceOf[Long])
+  }
+
+  def decode_locator(value:Buffer):AtomicReference[Object] = {
+    if(value==null)
+      return null
+    new AtomicReference[Object](decode_long(value).asInstanceOf[AnyRef])
+  }
+  
   def store(uows: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
     retry_using_index { index =>
       log.appender { appender =>
@@ -500,17 +519,16 @@ class HawtDBClient(store: HawtDBStore) {
 
             if (message_record != null) {
               pos = appender.append(LOG_ADD_MESSAGE, message_record)
-              val pos_encoded = encode_long(pos)
-              pos_buffer = new Buffer(pos_encoded)
+              pos_buffer = encode_locator(pos)
               if( message_record.locator !=null ) {
-                message_record.locator.set(pos_encoded.toByteArray);
+                message_record.locator.set(pos.asInstanceOf[AnyRef]);
               }
-              index.put(encode(message_prefix, action.message_record.key), pos_encoded)
+              index.put(encode(message_prefix, action.message_record.key), pos_buffer)
             }
 
             action.dequeues.foreach { entry =>
-              if( pos_buffer==null && entry.message_locator!=null ) {
-                pos_buffer = entry.message_locator
+              if( pos_buffer==null ) {
+                pos_buffer = encode_locator(entry.message_locator)
               }
               val key = encode(queue_entry_prefix, entry.queue_key, entry.entry_seq)
               appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
@@ -518,7 +536,7 @@ class HawtDBClient(store: HawtDBStore) {
             }
 
             action.enqueues.foreach { entry =>
-              entry.message_locator = pos_buffer
+              entry.message_locator = decode_locator(pos_buffer)
               val encoded:Buffer = entry
               appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
               index.put(encode(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
@@ -540,7 +558,7 @@ class HawtDBClient(store: HawtDBStore) {
   val metric_load_from_index_counter = new TimeCounter
   var metric_load_from_index = metric_load_from_index_counter(false)
 
-  def loadMessages(requests: ListBuffer[(Long, AtomicReference[Array[Byte]], (Option[MessageRecord])=>Unit)]):Unit = {
+  def loadMessages(requests: ListBuffer[(Long, AtomicReference[Object], (Option[MessageRecord])=>Unit)]):Unit = {
 
     val missing = retry_using_index { index =>
       requests.flatMap { x =>
@@ -549,7 +567,7 @@ class HawtDBClient(store: HawtDBStore) {
           var pos = 0L
           var pos_array:Array[Byte] = null
           if( locator!=null ) {
-            pos_array = locator.get()
+            pos_array = encode_locator(locator).toByteArray
             if( pos_array!=null ) {
               pos = decode_long(new Buffer(pos_array))
             }
@@ -568,7 +586,7 @@ class HawtDBClient(store: HawtDBStore) {
           } else {
             log.read(pos).map { case (prefix, data, _)=>
               val rc:MessageRecord = data
-              rc.locator = new AtomicReference[Array[Byte]](pos_array)
+              rc.locator = new AtomicReference[Object](pos.asInstanceOf[AnyRef])
               rc
             }
           }
@@ -595,7 +613,7 @@ class HawtDBClient(store: HawtDBStore) {
             val pos = decode_long(pos_buffer)
             log.read(pos).map { case (prefix, data, _)=>
               val rc:MessageRecord = data
-              rc.locator = new AtomicReference[Array[Byte]](pos_buffer.toByteArray)
+              rc.locator = new AtomicReference[AnyRef](pos.asInstanceOf[AnyRef])
               rc
             }
           }
@@ -734,7 +752,7 @@ class HawtDBClient(store: HawtDBStore) {
         index.cursor_prefixed(queue_entry_prefix_array) { (_,value) =>
           val entry_record:QueueEntryRecord = value
           val pos = if(entry_record.message_locator!=null) {
-            decode_long(entry_record.message_locator)
+            entry_record.message_locator.get().asInstanceOf[Long].longValue()
           } else {
             index.get(encode(message_prefix, entry_record.message_key)).map(decode_long(_)).getOrElse(0L)
           }

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Sat Jan 21 04:21:03 2012
@@ -214,12 +214,12 @@ class HawtDBStore(val config:HawtDBStore
     }
   }
 
-  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Array[Byte]], (Option[MessageRecord])=>Unit)](), dispatch_queue)
+  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Object], (Option[MessageRecord])=>Unit)](), dispatch_queue)
   load_source.setEventHandler(^{drain_loads});
   load_source.resume
 
 
-  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long, locator:AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, locator, { (result)=>
         end()

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Sat Jan 21 04:21:03 2012
@@ -386,7 +386,7 @@ class JDBM2Client(store: JDBM2Store) {
               to_pb(action.message_record)
             }
 
-            messages_db.put(action.message_record.key, pb)
+            messages_db.put(action.message_record.key, pb.freeze)
             if( action.message_record.key > last_message_key ) {
               last_message_key = action.message_record.key
               recman.setNamedObject("last_message_key", last_message_key)
@@ -514,7 +514,7 @@ class JDBM2Client(store: JDBM2Store) {
       streams.using_queue_stream { queue_stream=>
         queues_db.cursor { (_, value) =>
           val record:QueueRecord = value
-          record.writeFramed(queue_stream)
+          record.freeze.writeFramed(queue_stream)
           true
         }
       }
@@ -532,7 +532,7 @@ class JDBM2Client(store: JDBM2Store) {
       streams.using_queue_entry_stream { queue_entry_stream=>
         entries_db.cursor { (_, value) =>
           val record:QueueEntryRecord = value
-          record.writeFramed(queue_entry_stream)
+          record.freeze.writeFramed(queue_entry_stream)
           true
         }
       }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Sat Jan 21 04:21:03 2012
@@ -158,7 +158,7 @@ class JDBM2Store(var config:JDBM2StoreDT
   load_source.resume
 
 
-  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long, locator:AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, { (result)=>
         end()

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala Sat Jan 21 04:21:03 2012
@@ -31,28 +31,25 @@ import org.apache.activemq.apollo.util.{
 import collection.immutable.TreeMap
 import org.fusesource.leveldbjni.internal.Util
 import org.fusesource.hawtbuf.{Buffer, AbstractVarIntSupport}
-import java.util.concurrent.atomic.AtomicReference
 import org.apache.activemq.apollo.broker.Broker
 import org.apache.activemq.apollo.util.ProcessSupport._
 import collection.mutable.{HashMap, ListBuffer}
 import org.apache.activemq.apollo.dto.JsonCodec
-import java.util.Map
 import org.iq80.leveldb._
 import org.apache.activemq.apollo.broker.store.leveldb.RecordLog.LogInfo
-import org.apache.activemq.apollo.broker.store.PBSupport._
-import org.apache.activemq.apollo.broker.store.leveldb.HelperTrait._
+import org.apache.activemq.apollo.broker.store.PBSupport
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object LevelDBClient extends Log {
 
-  final val message_prefix = 'm'.toByte
   final val queue_prefix = 'q'.toByte
   final val queue_entry_prefix = 'e'.toByte
   final val map_prefix = 'p'.toByte
+  final val tmp_prefix = 't'.toByte
 
-  final val message_prefix_array = Array(message_prefix)
   final val queue_prefix_array = Array(queue_prefix)
   final val map_prefix_array = Array(map_prefix)
   final val queue_entry_prefix_array = Array(queue_entry_prefix)
@@ -216,7 +213,6 @@ class LevelDBClient(store: LevelDBStore)
   }
 
   def log_size = {
-    import OptionSupport._
     Option(config.log_size).map(MemoryPropertyEditor.parse(_)).map{size=>
       if(size == MemoryPropertyEditor.parse("2G")) {
         Int.MaxValue // which is 2G - 1 (close enough!)
@@ -266,7 +262,7 @@ class LevelDBClient(store: LevelDBStore)
 
     index_options.cacheSize(Option(config.index_cache_size).map(MemoryPropertyEditor.parse(_).toLong).getOrElse(1024*1024*256L))
     index_options.logger(new Logger() {
-      def log(msg: String) = debug(store.store_kind+": "+msg.stripSuffix("\n"))
+      def log(msg: String) = trace(store.store_kind+": "+msg.stripSuffix("\n"))
     })
 
     log = create_log
@@ -348,43 +344,27 @@ class LevelDBClient(store: LevelDBStore)
               case (kind, data, next_pos) =>
                 kind match {
                   case LOG_ADD_MESSAGE =>
-                    replay_operations+=1
-                    val record: MessageRecord = data
-                    index.put(encode_key(message_prefix, record.key), encode_locator(pos, data.length))
                   case LOG_ADD_QUEUE_ENTRY =>
                     replay_operations+=1
-                    val record: QueueEntryRecord = data
-                    index.put(encode_key(queue_entry_prefix, record.queue_key, record.entry_seq), data)
-                    
-                    // Figure out which log file this message reference is pointing at..
-                    val pos = (if(record.message_locator!=null) {
-                      Some(decode_locator(record.message_locator)._1)
-                    } else {
-                      index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
-                    })
-                    
-                    // Increment it.
+                    val record = QueueEntryPB.FACTORY.parseUnframed(data)
+                    val pos = decode_vlong(record.getMessageLocator)
+                    index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), data)
                     pos.foreach(log_ref_increment(_))
+
                   case LOG_REMOVE_QUEUE_ENTRY =>
                     replay_operations+=1
                     index.get(data, new ReadOptions).foreach { value=>
-                      val record: QueueEntryRecord = value
-  
-                      // Figure out which log file this message reference is pointing at..
-                      val pos = (if(record.message_locator!=null) {
-                        Some(decode_locator(record.message_locator)._1)
-                      } else {
-                        index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
-                      })
+                      val record = QueueEntryPB.FACTORY.parseUnframed(value)
+                      val pos = decode_vlong(record.getMessageLocator)
                       pos.foreach(log_ref_decrement(_))
-
                       index.delete(data)
                     }
                     
                   case LOG_ADD_QUEUE =>
                     replay_operations+=1
-                    val record: QueueRecord = data
-                    index.put(encode_key(queue_prefix, record.key), data)
+                    val record = QueuePB.FACTORY.parseUnframed(data)
+                    index.put(encode_key(queue_prefix, record.getKey), data)
+
                   case LOG_REMOVE_QUEUE =>
                     replay_operations+=1
                     val ro = new ReadOptions
@@ -397,8 +377,8 @@ class LevelDBClient(store: LevelDBStore)
 
                       // Figure out what log file that message entry was in so we can,
                       // decrement the log file reference.
-                      val entry_record:QueueEntryRecord = value
-                      val pos = decode_locator(entry_record.getMessageLocator)._1
+                      val record = QueueEntryPB.FACTORY.parseUnframed(value)
+                      val pos = decode_vlong(record.getMessageLocator)
                       log_ref_decrement(pos)
                       true
                     }
@@ -441,21 +421,22 @@ class LevelDBClient(store: LevelDBStore)
     index.cursor_prefixed(queue_entry_prefix_array) { (key, value)=>
       try {
         val (_, queue_key, seq_key) = decode_long_long_key(key)
-        val record: QueueEntryRecord = value
+        val record = QueueEntryPB.FACTORY.parseUnframed(value)
+        val (pos, len) = decode_locator(record.getMessageLocator)
         if (record.getQueueKey != queue_key) {
           throw new IOException("key missmatch")
         }
         if (record.getQueueSeq != seq_key) {
           throw new IOException("key missmatch")
         }
-        val pos = decode_locator(record.getMessageLocator)._1
         log.log_info(pos).foreach {
           log_info =>
             actual_log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
         }
         referenced_queues += queue_key
       } catch {
-        case _ =>
+        case e =>
+          trace("invalid queue entry record: %s, error: %s", new Buffer(key), e)
           fixed_records += 1
           // Invalid record.
           index.delete(key)
@@ -467,13 +448,14 @@ class LevelDBClient(store: LevelDBStore)
     index.cursor_prefixed(queue_prefix_array) { (key, value)=>
       try {
         val (_, queue_key) = decode_long_key(key)
-        val record: QueueRecord = value
+        val record = QueuePB.FACTORY.parseUnframed(value)
         if (record.getKey != queue_key) {
           throw new IOException("key missmatch")
         }
         referenced_queues -= queue_key
       } catch {
-        case _ =>
+        case e =>
+          trace("invalid queue record: %s, error: %s", new Buffer(key), e)
           fixed_records += 1
           // Invalid record.
           index.delete(key)
@@ -484,11 +466,11 @@ class LevelDBClient(store: LevelDBStore)
     referenced_queues.foreach { queue_key=>
       // We have queue entries for a queue that does not exist..
       index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key)) { (key, value)=>
-
+        trace("invalid queue entry record: %s, error: queue key does not exits %s", new Buffer(key), queue_key)
         fixed_records += 1
         index.delete(key)
-        val entry_record:QueueEntryRecord = value
-        val pos = decode_locator(entry_record.getMessageLocator)._1
+        val record = QueueEntryPB.FACTORY.parseUnframed(value)
+        val pos = decode_vlong(record.getMessageLocator)
         log.log_info(pos).foreach { log_info =>
           actual_log_refs.get(log_info.position).foreach { counter =>
             if (counter.decrementAndGet() == 0) {
@@ -501,6 +483,7 @@ class LevelDBClient(store: LevelDBStore)
     }
 
     if( actual_log_refs != log_refs ) {
+      debug("expected != actual log references. expected: %s, actual %s", log_refs, actual_log_refs)
       log_refs.clear()
       log_refs ++= actual_log_refs
     }
@@ -679,6 +662,7 @@ class LevelDBClient(store: LevelDBStore)
     try{
       log.close
       directory.list_files.foreach(_.recursive_delete)
+      log_refs.clear()
     } finally {
       retry {
         log.open
@@ -690,26 +674,34 @@ class LevelDBClient(store: LevelDBStore)
   def add_queue(record: QueueRecord, callback:Runnable) = {
     retry_using_index {
       log.appender { appender =>
-        appender.append(LOG_ADD_QUEUE, record)
-        index.put(encode_key(queue_prefix, record.key), record)
+        val value:Buffer = PBSupport.encode_queue_record(record)
+        appender.append(LOG_ADD_QUEUE, value)
+        index.put(encode_key(queue_prefix, record.key), value)
       }
     }
     callback.run
   }
 
-  def log_ref_decrement(pos: Long) {
-    log.log_info(pos).foreach { log_info =>
-      log_refs.get(log_info.position).foreach { counter =>
-        if (counter.decrementAndGet() == 0) {
-          log_refs.remove(log_info.position)
-        }
-      }
+  def log_ref_decrement(pos: Long) = this.synchronized {
+    log.log_info(pos) match {
+      case Some(log_info)=>
+        log_refs.get(log_info.position).foreach { counter =>
+          val count = counter.decrementAndGet()
+          if (count == 0) {
+            log_refs.remove(log_info.position)
+          }
+        }
+      case None =>
+        warn("Invalid log position: "+pos)
     }
   }
 
-  def log_ref_increment(pos: Long) {
-    log.log_info(pos).foreach { log_info =>
-      log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+  def log_ref_increment(pos: Long) = this.synchronized {
+    log.log_info(pos) match {
+      case Some(log_info)=>
+        val count = log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+      case None =>
+        warn("Invalid log position: "+pos)
     }
   }
 
@@ -726,8 +718,8 @@ class LevelDBClient(store: LevelDBStore)
 
           // Figure out what log file that message entry was in so we can,
           // decrement the log file reference.
-          val entry_record:QueueEntryRecord = value
-          val pos = decode_locator(entry_record.getMessageLocator)._1
+          val record = QueueEntryPB.FACTORY.parseUnframed(value)
+          val pos = decode_vlong(record.getMessageLocator)
           log_ref_decrement(pos)
           true
         }
@@ -758,38 +750,40 @@ class LevelDBClient(store: LevelDBStore)
 
             uow.actions.foreach { case (msg, action) =>
               val message_record = action.message_record
-              var pos = -1L
-              var len = 0
-              var locator_buffer:Buffer = null
+              var locator:(Long, Int) = null
 
               if (message_record != null) {
-                val message_data:Array[Byte] = message_record
-                len = message_data.length
-                pos = appender.append(LOG_ADD_MESSAGE, message_data)
-                val locator_data = encode_locator(pos, len)
-                locator_buffer = new Buffer(locator_data)
-                if( message_record.locator !=null ) {
-                  message_record.locator.set(locator_data);
-                }
-                batch.put(encode_key(message_prefix, action.message_record.key), locator_data)
+                val message_data = PBSupport.encode_message_record(message_record)
+                val len = message_data.length
+                val pos = appender.append(LOG_ADD_MESSAGE, message_data)
+                locator = (pos, len)
+                message_record.locator.set(locator);
               }
 
               action.dequeues.foreach { entry =>
-                if( locator_buffer==null && entry.message_locator!=null ) {
-                  locator_buffer = entry.message_locator
-                  val t = decode_locator(locator_buffer)
-                  pos = t._1
-                  len = t._2
+                if( locator==null ) {
+                  locator = entry.message_locator.get().asInstanceOf[(Long, Int)]
                 }
+                val (pos, len) = locator
                 val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
                 appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
                 batch.delete(key)
                 log_ref_decrement(pos)
               }
 
+              var locator_buffer:Buffer = null
               action.enqueues.foreach { entry =>
-                entry.message_locator = locator_buffer
-                val encoded:Array[Byte] = entry
+                val (pos, len) = locator
+                entry.message_locator.set(locator)
+
+                if ( locator_buffer==null ) {
+                  locator_buffer = encode_locator(pos, len)
+                }
+
+                val record = PBSupport.to_pb(entry)
+                record.setMessageLocator(locator_buffer)
+
+                val encoded = record.freeze().toUnframedBuffer
                 appender.append(LOG_ADD_QUEUE_ENTRY, encoded)
                 batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq), encoded)
                 
@@ -815,7 +809,7 @@ class LevelDBClient(store: LevelDBStore)
   val metric_load_from_index_counter = new TimeCounter
   var metric_load_from_index = metric_load_from_index_counter(false)
 
-  def loadMessages(requests: ListBuffer[(Long, AtomicReference[Array[Byte]], (Option[MessageRecord])=>Unit)]):Unit = {
+  def loadMessages(requests: ListBuffer[(Long, AtomicReference[Object], (Option[MessageRecord])=>Unit)]):Unit = {
 
     val ro = new ReadOptions
     ro.verifyChecksums(verify_checksums)
@@ -825,40 +819,14 @@ class LevelDBClient(store: LevelDBStore)
       index.snapshot { snapshot =>
         ro.snapshot(snapshot)
         requests.flatMap { x =>
-          val (message_key, locator, callback) = x
+          val (_, locator, callback) = x
           val record = metric_load_from_index_counter.time {
-            var pos = 0L
-            var len = 0
-            var locator_data:Array[Byte] = null
-            if( locator!=null ) {
-              locator_data = locator.get()
-              if( locator_data!=null ) {
-                val t = decode_locator(locator_data)
-                pos = t._1
-                len = t._2
-              }
-            }
-            if( pos == 0L ) {
-              index.get(encode_key(message_prefix, message_key), ro) match {
-                case Some(value) =>
-                  locator_data = value
-                  val t = decode_locator(locator_data)
-                  pos = t._1
-                  len = t._2
-
-                case None =>
-                  pos = 0L
-              }
-            }
-            if (pos == 0L ) {
-              None
-            } else {
-              log.read(pos, len).map { data =>
-                val rc:MessageRecord = data
-                rc.locator = new AtomicReference[Array[Byte]](locator_data)
-                assert( rc.protocol!=null )
-                rc
-              }
+            val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
+            log.read(pos, len).map { data =>
+              val rc = PBSupport.decode_message_record(data)
+              rc.locator = locator
+              assert( rc.protocol!=null )
+              rc
             }
           }
           if( record.isDefined ) {
@@ -880,16 +848,13 @@ class LevelDBClient(store: LevelDBStore)
       index.snapshot { snapshot =>
         ro.snapshot(snapshot)
         missing.foreach { x =>
-          val (message_key, locator, callback) = x
-          val record = metric_load_from_index_counter.time {
-            index.get(encode_key(message_prefix, message_key), ro).flatMap{ locator_data=>
-              val (pos, len) = decode_locator(locator_data)
-              log.read(pos, len).map { data =>
-                val rc:MessageRecord = data
-                rc.locator = new AtomicReference[Array[Byte]](locator_data)
-                assert( rc.protocol!=null )
-                rc
-              }
+          val (_, locator, callback) = x
+          val record:Option[MessageRecord] = metric_load_from_index_counter.time {
+            val (pos, len ) = locator.get().asInstanceOf[(Long, Int)]
+            log.read(pos, len).map { x =>
+              val rc:MessageRecord = PBSupport.decode_message_record(x)
+              rc.locator = locator
+              rc
             }
           }
           callback(record)
@@ -917,7 +882,9 @@ class LevelDBClient(store: LevelDBStore)
       val ro = new ReadOptions
       ro.fillCache(false)
       ro.verifyChecksums(verify_checksums)
-      index.get(encode_key(queue_prefix, queue_key), ro).map( x=> decode_queue_record(x)  )
+      index.get(encode_key(queue_prefix, queue_key), ro).map{ x=>
+        PBSupport.decode_queue_record(x)
+      }
     }
   }
 
@@ -939,17 +906,18 @@ class LevelDBClient(store: LevelDBStore)
             group.first_entry_seq = current_key
           }
 
-          val entry:QueueEntryRecord = value
+          val entry = QueueEntryPB.FACTORY.parseUnframed(value)
+          val pos = decode_vlong(entry.getMessageLocator)
 
           group.last_entry_seq = current_key
           group.count += 1
-          group.size += entry.size
+          group.size += entry.getSize
 
           if(group.expiration == 0){
-            group.expiration = entry.expiration
+            group.expiration = entry.getExpiration
           } else {
-            if( entry.expiration != 0 ) {
-              group.expiration = entry.expiration.min(group.expiration)
+            if( entry.getExpiration != 0 ) {
+              group.expiration = entry.getExpiration.min(group.expiration)
             }
           }
 
@@ -979,7 +947,10 @@ class LevelDBClient(store: LevelDBStore)
         val start = encode_key(queue_entry_prefix, queue_key, firstSeq)
         val end = encode_key(queue_entry_prefix, queue_key, lastSeq+1)
         index.cursor_range( start, end, ro ) { (key, value) =>
-          rc += value
+          val record = QueueEntryPB.FACTORY.parseUnframed(value)
+          val entry = PBSupport.from_pb(record)
+          entry.message_locator = new AtomicReference[Object](decode_locator(record.getMessageLocator))
+          rc += entry
           true
         }
       }
@@ -987,11 +958,7 @@ class LevelDBClient(store: LevelDBStore)
     rc
   }
 
-  def getLastMessageKey:Long = {
-    retry_using_index {
-      index.last_key(message_prefix_array).map(decode_long_key(_)._2).getOrElse(0)
-    }
-  }
+  def getLastMessageKey:Long = 0
 
   def get(key: Buffer):Option[Buffer] = {
     retry_using_index {
@@ -1040,71 +1007,78 @@ class LevelDBClient(store: LevelDBStore)
   //
   // Collects detailed usage information about the journal like who's referencing it.
   //
-  def get_log_usage_details = {
-
-    val usage_map = new ApolloTreeMap[Long,UsageCounter]()
-    log.log_mutex.synchronized {
-      log.log_infos.foreach(entry=> usage_map.put(entry._1, UsageCounter(entry._2)) )
-    }
-
-    def lookup_usage(pos: Long) = {
-      var entry = usage_map.floorEntry(pos)
-      if (entry != null) {
-        val usage = entry.getValue()
-        if (pos < usage.info.limit) {
-          Some(usage)
-        } else {
-          None
-        }
-      } else {
-        None
-      }
-    }
-
-    val ro = new ReadOptions()
-    ro.fillCache(false)
-    ro.verifyChecksums(verify_checksums)
-
-    retry_using_index {
-      index.snapshot { snapshot =>
-        ro.snapshot(snapshot)
-
-        // Figure out which journal files are still in use by which queues.
-        index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
-
-          val entry_record:QueueEntryRecord = value
-          val pos = if(entry_record.message_locator!=null) {
-            Some(decode_locator(entry_record.message_locator)._1)
-          } else {
-            index.get(encode_key(message_prefix, entry_record.message_key)).map(decode_locator(_)._1)
-          }
-
-          pos.flatMap(lookup_usage(_)).foreach { usage =>
-            if( usage.first_reference_queue == null ) {
-              usage.first_reference_queue = index.get(encode_key(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
-            }
-            usage.increment(entry_record.size)
-          }
-
-          true
-        }
-      }
-    }
-
-    import collection.JavaConversions._
-    usage_map.values.toSeq.toArray
-  }
+//  def get_log_usage_details = {
+//
+//    val usage_map = new ApolloTreeMap[Long,UsageCounter]()
+//    log.log_mutex.synchronized {
+//      log.log_infos.foreach(entry=> usage_map.put(entry._1, UsageCounter(entry._2)) )
+//    }
+//
+//    def lookup_usage(pos: Long) = {
+//      var entry = usage_map.floorEntry(pos)
+//      if (entry != null) {
+//        val usage = entry.getValue()
+//        if (pos < usage.info.limit) {
+//          Some(usage)
+//        } else {
+//          None
+//        }
+//      } else {
+//        None
+//      }
+//    }
+//
+//    val ro = new ReadOptions()
+//    ro.fillCache(false)
+//    ro.verifyChecksums(verify_checksums)
+//
+//    retry_using_index {
+//      index.snapshot { snapshot =>
+//        ro.snapshot(snapshot)
+//
+//        // Figure out which journal files are still in use by which queues.
+//        index.cursor_prefixed(queue_entry_prefix_array, ro) { (_,value) =>
+//
+//          val entry_record:QueueEntryRecord = value
+//          val pos = if(entry_record.message_locator!=null) {
+//            Some(decode_locator(entry_record.message_locator)._1)
+//          } else {
+//            index.get(encode_key(message_prefix, entry_record.message_key)).map(decode_locator(_)._1)
+//          }
+//
+//          pos.flatMap(lookup_usage(_)).foreach { usage =>
+//            if( usage.first_reference_queue == null ) {
+//              usage.first_reference_queue = index.get(encode_key(queue_prefix, entry_record.queue_key), ro).map( x=> decode_queue_record(x) ).getOrElse(null)
+//            }
+//            usage.increment(entry_record.size)
+//          }
+//
+//          true
+//        }
+//      }
+//    }
+//
+//    import collection.JavaConversions._
+//    usage_map.values.toSeq.toArray
+//  }
 
 
   def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
     try {
       retry_using_index {
+        
+        // Delete all the tmp keys..
+        index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
+          index.delete(key)
+          true
+        }
+        
         index.snapshot { snapshot=>
           val ro = new ReadOptions
           ro.snapshot(snapshot)
           ro.verifyChecksums(verify_checksums)
           ro.fillCache(false)
-
+          
           def write_framed(stream:OutputStream, value:Array[Byte]) = {
             val helper = new AbstractVarIntSupport {
               def readByte: Byte = throw new UnsupportedOperationException
@@ -1128,24 +1102,44 @@ class LevelDBClient(store: LevelDBStore)
           }
 
           streams.using_queue_stream { stream =>
-            index.cursor_prefixed(queue_prefix_array, ro) { (_, value) =>
+            index.cursor_prefixed(queue_prefix_array) { (_, value) =>
               write_framed(stream, value)
             }
           }
 
-          streams.using_message_stream { stream=>
-            index.cursor_prefixed(message_prefix_array, ro) { (_, value) =>
+          // Figure out the active log locations..
+          streams.using_queue_entry_stream { stream=>
+            index.cursor_prefixed(queue_entry_prefix_array, ro) { (_, value) =>
               write_framed(stream, value)
+              val record = QueueEntryPB.FACTORY.parseUnframed(value)
+              val (pos, len) = decode_locator(record.getMessageLocator)
+              index.put(encode_key(tmp_prefix, pos), encode_vlong(len))
+              true
             }
           }
 
-          streams.using_queue_entry_stream { stream=>
-            index.cursor_prefixed(queue_entry_prefix_array, ro) { (_, value) =>
-              write_framed(stream, value)
+          streams.using_message_stream { stream=>
+            index.cursor_prefixed(Array(tmp_prefix), ro) { (key, value) =>
+              val (_, pos) = decode_long_key(key)
+              val len = decode_vlong(value).toInt
+              log.read(pos, len).foreach { value =>
+                // Set the message key to be the position in the log.
+                val pb = MessagePB.FACTORY.parseUnframed(value).copy
+                pb.setMessageKey(pos)
+                write_framed(stream, pb.freeze.toUnframedBuffer)
+              }
+              true
             }
           }
 
         }
+
+        // Delete all the tmp keys..
+        index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
+          index.delete(key)
+          true
+        }
+
       }
       Success(Zilch)
     } catch {
@@ -1157,7 +1151,10 @@ class LevelDBClient(store: LevelDBStore)
   def import_pb(streams:StreamManager[InputStream]):Result[Zilch,String] = {
     try {
       purge
-
+      log_refs.clear()
+          
+      val actual_log_refs = HashMap[Long, LongCounter]()
+      
       retry_using_index {
         def foreach[Buffer] (stream:InputStream, fact:PBMessageFactory[_,_])(func: (Buffer)=>Unit):Unit = {
           var done = false
@@ -1180,31 +1177,42 @@ class LevelDBClient(store: LevelDBStore)
 
           streams.using_queue_stream { stream=>
             foreach[QueuePB.Buffer](stream, QueuePB.FACTORY) { record=>
-              index.put(encode_key(queue_prefix, record.key), record.toUnframedByteArray)
+              index.put(encode_key(queue_prefix, record.getKey), record.toUnframedByteArray)
             }
           }
 
           streams.using_message_stream { stream=>
             foreach[MessagePB.Buffer](stream, MessagePB.FACTORY) { record=>
-              val message_data = record.toUnframedByteArray
+              val message_data = record.toUnframedBuffer
               val pos = appender.append(LOG_ADD_MESSAGE, message_data)
-              index.put(encode_key(message_prefix, record.key), encode_locator(pos, message_data.length))
+              index.put(encode_key(tmp_prefix, record.getMessageKey), encode_locator(pos, message_data.length))
             }
           }
 
           streams.using_queue_entry_stream { stream=>
             foreach[QueueEntryPB.Buffer](stream, QueueEntryPB.FACTORY) { record=>
-              val r:QueueEntryRecord = record
               val copy = record.copy();
-              index.get(encode_key(message_prefix, r.message_key)).foreach { locator=>
-                copy.setMessageLocator(new Buffer(locator))
-                index.put(encode_key(queue_entry_prefix, r.queue_key, r.entry_seq), copy.freeze().toUnframedByteArray)
+              index.get(encode_key(tmp_prefix, record.getMessageKey)).foreach { locator=>
+                val (pos, len) = decode_locator(locator)
+                copy.setMessageLocator(locator)
+                index.put(encode_key(queue_entry_prefix, record.getQueueKey, record.getQueueSeq), copy.freeze().toUnframedBuffer)
+                
+                log.log_info(pos).foreach { log_info =>
+                  log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+                }
               }
             }
           }
         }
+      }
 
+      store_log_refs
+      // Delete all the tmp keys..
+      index.cursor_keys_prefixed(Array(tmp_prefix)) { key =>
+        index.delete(key)
+        true
       }
+      
       snapshot_index
       Success(Zilch)
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala?rev=1234262&r1=1234261&r2=1234262&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBStore.scala Sat Jan 21 04:21:03 2012
@@ -202,12 +202,12 @@ class LevelDBStore(val config:LevelDBSto
     }
   }
 
-  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Array[Byte]], (Option[MessageRecord])=>Unit)](), dispatch_queue)
+  val load_source = createSource(new ListEventAggregator[(Long, AtomicReference[Object], (Option[MessageRecord])=>Unit)](), dispatch_queue)
   load_source.setEventHandler(^{drain_loads});
   load_source.resume
 
 
-  def load_message(messageKey: Long, locator:AtomicReference[Array[Byte]])(callback: (Option[MessageRecord]) => Unit) = {
+  def load_message(messageKey: Long, locator:AtomicReference[Object])(callback: (Option[MessageRecord]) => Unit) = {
     message_load_latency_counter.start { end=>
       load_source.merge((messageKey, locator, { (result)=>
         end()



Mime
View raw message