activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1159777 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-cli/src/main/sc...
Date Fri, 19 Aug 2011 20:37:09 GMT
Author: chirino
Date: Fri Aug 19 20:37:08 2011
New Revision: 1159777

URL: http://svn.apache.org/viewvc?rev=1159777&view=rev
Log:
Extend the store interface so that the broker can store arbitrary key/values.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
    activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Aug 19 20:37:08 2011
@@ -31,6 +31,7 @@ import scala.Some
 import java.sql.ClientInfoStatus
 import com.sleepycat.je._
 import javax.management.remote.rmi._RMIConnection_Stub
+import org.fusesource.hawtbuf.Buffer
 
 object BDBClient extends Log
 /**
@@ -160,6 +161,14 @@ class BDBClient(store: BDBStore) {
       _queues_db
     }
 
+    private var _map_db:Database = _
+    def map_db:Database = {
+      if( _map_db==null ) {
+        _map_db = environment.openDatabase(tx, "map", buffer_key_conf)
+      }
+      _map_db
+    }
+
     def close(ok:Boolean) = {
       if( _messages_db!=null ) {
         _messages_db.close
@@ -173,6 +182,9 @@ class BDBClient(store: BDBStore) {
       if( _entries_db!=null ) {
         _entries_db.close
       }
+      if( _map_db!=null ) {
+        _map_db.close
+      }
 
       if(ok){
         tx.commit
@@ -324,6 +336,15 @@ class BDBClient(store: BDBStore) {
       import ctx._
       var zcp_files_to_sync = Set[Int]()
       uows.foreach { uow =>
+
+          for((key,value) <- uow.map_actions) {
+            if( value==null ) {
+              map_db.delete(tx, key)
+            } else {
+              map_db.put(tx, key, value)
+            }
+          }
+
           uow.actions.foreach {
             case (msg, action) =>
 
@@ -518,6 +539,14 @@ class BDBClient(store: BDBStore) {
     }
   }
 
+
+  def get(key: Buffer):Option[Buffer] = {
+    with_ctx() { ctx=>
+      import ctx._
+      map_db.get(tx, to_database_entry(key)).map(x=> to_buffer(x))
+    }
+  }
+
   def getLastQueueKey:Long = {
     with_ctx() { ctx=>
       import ctx._
@@ -532,6 +561,16 @@ class BDBClient(store: BDBStore) {
         import ctx._
         import PBSupport._
 
+        streams.using_map_stream { stream =>
+          map_db.cursor(tx) { (key,value) =>
+            val record = new MapEntryPB.Bean
+            record.setKey(key)
+            record.setValue(value)
+            record.freeze().writeFramed(stream)
+            true
+          }
+        }
+
         streams.using_queue_stream { queue_stream =>
           queues_db.cursor(tx) { (_, value) =>
             val record:QueueRecord = value
@@ -605,6 +644,12 @@ class BDBClient(store: BDBStore) {
         var zcp_counter = 0
         val max_ctx = zero_copy_buffer_allocator.contexts.size
 
+        streams.using_map_stream { stream=>
+          foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
+            map_db.put(tx, pb.getKey, pb.getValue)
+          }
+        }
+
         streams.using_message_stream { message_stream=>
           foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
 

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Aug 19 20:37:08 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.apollo.dto.{S
 import org.apache.activemq.apollo.util.OptionSupport._
 import java.io.{InputStream, OutputStream, File}
 import scala.util.continuations._
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -127,6 +128,12 @@ class BDBStore(var config:BDBStoreDTO) e
   }
 
 
+  def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
+    read_executor {
+      callback(client.get(key))
+    }
+  }
+
   /**
    * Ges the last queue key identifier stored.
    */

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
Fri Aug 19 20:37:08 2011
@@ -35,6 +35,9 @@ object HelperTrait {
   implicit def to_queue_record(entry: DatabaseEntry): QueueRecord = entry.getData
   implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new DatabaseEntry(v)
 
+  implicit def to_buffer(entry: DatabaseEntry): Buffer = new Buffer(entry.getData)
+  implicit def to_database_entry(v: Buffer): DatabaseEntry = new DatabaseEntry(v.toByteArray)
+
   implicit def decode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
     val in = new DataByteArrayInputStream(entry.getData)
     (in.readVarInt(), in.readVarLong(), in.readVarInt())
@@ -124,6 +127,11 @@ object HelperTrait {
 
   }
 
+  val buffer_key_conf = new DatabaseConfig();
+  buffer_key_conf.setAllowCreate(true)
+  buffer_key_conf.setTransactional(true);
+  buffer_key_conf.setSortedDuplicates(false);
+
   val long_key_conf = new DatabaseConfig();
   long_key_conf.setAllowCreate(true)
   long_key_conf.setTransactional(true);

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Aug 19 20:37:08
2011
@@ -51,3 +51,8 @@ message QueueEntryPB {
   optional sint64 expiration=7;
   optional bytes messageLocator=8;
 }
+
+message MapEntryPB {
+  required bytes key = 1;
+  optional bytes value = 2;
+}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Fri Aug 19 20:37:08 2011
@@ -25,6 +25,7 @@ import atomic.{AtomicReference, AtomicIn
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator}
 import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO, IntMetricDTO}
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * <p>
@@ -87,6 +88,12 @@ trait DelayingStoreSupport extends Store
     var flush_listeners = ListBuffer[() => Unit]()
     var disable_delay = false
 
+    var map_actions = Map[Buffer, Buffer]()
+
+    def put(key: Buffer, value: Buffer) = {
+      map_actions += (key -> value)
+    }
+
     def on_flush(callback: =>Unit) = {
       if( this.synchronized {
         if( flushed ) {
@@ -121,7 +128,7 @@ trait DelayingStoreSupport extends Store
 
     def rm(msg:Long) = {
       actions -= msg
-      if( actions.isEmpty ) {
+      if( actions.isEmpty && map_actions.isEmpty ) {
         cancel
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Aug 19 20:37:08 2011
@@ -21,8 +21,10 @@ import org.apache.activemq.apollo.util._
 import java.io.{InputStream, OutputStream}
 import scala.util.continuations._
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
+import org.fusesource.hawtbuf.Buffer
 
 trait StreamManager[A] {
+  def using_map_stream(func: (A)=>Unit)
   def using_queue_stream(func: (A)=>Unit)
   def using_message_stream(func: (A)=>Unit)
   def using_queue_entry_stream(func: (A)=>Unit)
@@ -76,6 +78,11 @@ trait Store extends ServiceTrait {
   def remove_queue(queueKey:Long)(callback:(Boolean)=>Unit):Unit
 
   /**
+   * Gets a value of a previously stored map entry.
+   */
+  def get(key:Buffer)(callback:(Option[Buffer])=>Unit )
+
+  /**
    * Loads the queue information for a given queue key.
    */
   def get_queue(queueKey:Long)(callback:(Option[QueueRecord])=>Unit )

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Fri Aug 19 20:37:08 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.apollo.broke
 
 import org.fusesource.hawtdispatch.{Retained}
 import org.apache.activemq.apollo.broker.store._
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * A store uow is used to perform persistent
@@ -56,6 +57,12 @@ trait StoreUOW extends Retained {
   def dequeue(entry:QueueEntryRecord)
 
   /**
+   * Creates or updates a map entry.  Set value to null to
+   * remove the entry.
+   */
+  def put(key:Buffer, value:Buffer)
+
+  /**
    * Marks this uow as needing to be completed
    * as soon as possible.  If not called, the Store
    * implementation may delay completing the uow in

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala
Fri Aug 19 20:37:08 2011
@@ -91,6 +91,7 @@ class StoreExport extends Action {
           def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func)
           def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat",
func)
           def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat",
func)
+          def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func)
         }
         reset {
           val rc = store.export_pb(manager)

Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala
Fri Aug 19 20:37:08 2011
@@ -96,6 +96,7 @@ class StoreImport extends Action {
           def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func)
           def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat",
func)
           def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat",
func)
+          def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func)
         }
         reset {
           val rc = store.import_pb(manager)

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto Fri Aug 19 20:37:08
2011
@@ -29,10 +29,7 @@ enum Type {
   ADD_QUEUE = 10;
   REMOVE_QUEUE = 11;
 
-  ADD_MAP = 30;
-  REMOVE_MAP = 31;
-  PUT_MAP_ENTRY = 32;
-  REMOVE_MAP_ENTRY = 33;
+  MAP_ENTRY = 32;
 
   ADD_SUBSCRIPTION = 50;
   REMOVE_SUBSCRIPTION = 51;
@@ -109,24 +106,9 @@ message RemoveSubscription {
   required bytes name = 1 [java_override_type = "AsciiBuffer"];
 }
 
-
-///////////////////////////////////////////////////////////////
-// Map related operations.
-///////////////////////////////////////////////////////////////
-message AddMap {
-  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}
-message RemoveMap {
-  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-}
-message PutMapEntry {
-  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-  optional bytes id = 2 [java_override_type = "AsciiBuffer"];
-  optional bytes value = 3;
-}
-message RemoveMapEntry {
-  optional bytes mapName = 1 [java_override_type = "AsciiBuffer"];
-  optional bytes id = 2 [java_override_type = "AsciiBuffer"];
+message MapEntry {
+  required bytes key=1;
+  optional bytes value = 2;
 }
 
 ///////////////////////////////////////////////////////////////

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala
Fri Aug 19 20:37:08 2011
@@ -228,11 +228,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     _store(update, callback)
   }
 
-  def store(txs: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
+  def store(uows: Seq[HawtDBStore#DelayableUOW], callback:Runnable) {
     var batch = ListBuffer[TypeCreatable]()
-    txs.foreach {
-      tx =>
-        tx.actions.foreach {
+    uows.foreach { uow =>
+
+        for((key,value) <- uow.map_actions) {
+          val entry = new MapEntry.Bean
+          entry.setKey(key)
+          entry.setValue(value)
+          batch += entry
+        }
+
+        uow.actions.foreach {
           case (msg, action) =>
             if (action.message_record != null) {
               val update: AddMessage.Bean = action.message_record
@@ -273,6 +280,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt
     rc
   }
 
+  def get(key: Buffer):Option[Buffer] = {
+    withTx { tx =>
+        val helper = new TxHelper(tx)
+        import helper._
+        Option(mapIndex.get(key))
+    }
+  }
+
   def getQueue(queueKey: Long): Option[QueueRecord] = {
     withTx { tx =>
         val helper = new TxHelper(tx)
@@ -897,13 +912,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt
         cleanup(_tx);
         info("Store purged.");
 
-      case x: AddSubscription.Getter =>
-      case x: RemoveSubscription.Getter =>
-
-      case x: AddMap.Getter =>
-      case x: RemoveMap.Getter =>
-      case x: PutMapEntry.Getter =>
-      case x: RemoveMapEntry.Getter =>
+      case x: MapEntry.Getter =>
+        val value = x.getValue
+        if( value==null ) {
+          mapIndex.remove(x.getKey)
+        } else {
+          mapIndex.put(x.getKey, value)
+        }
 
     }
   }
@@ -996,6 +1011,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt
   private case class Update(update: TypeCreatable, location: Location)
 
   private class TxHelper(val _tx: Transaction) {
+    lazy val mapIndex = MAP_INDEX_FACTORY.open(_tx, rootBuffer.getMapIndexPage)
     lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, rootBuffer.getQueueIndexPage)
     lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, rootBuffer.getDataFileRefIndexPage)
     lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, rootBuffer.getMessageKeyIndexPage)

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala
Fri Aug 19 20:37:08 2011
@@ -28,6 +28,7 @@ import org.fusesource.hawtdispatch.ListE
 import org.apache.activemq.apollo.util.OptionSupport._
 import java.io.{InputStream, OutputStream}
 import scala.util.continuations._
+import org.fusesource.hawtbuf.Buffer
 
 object HawtDBStore extends Log {
   val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -144,6 +145,11 @@ class HawtDBStore(var config:HawtDBStore
     }
   }
 
+  def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
+    executor_pool {
+      callback(client.get(key))
+    }
+  }
 
   /**
    * Ges the last queue key identifier stored.

Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala
Fri Aug 19 20:37:08 2011
@@ -140,6 +140,12 @@ object Helpers {
   QUEUE_INDEX_FACTORY.setValueCodec(QueueRootRecord.FRAMED_CODEC);
   QUEUE_INDEX_FACTORY.setDeferredEncoding(true);
 
+  // maps queue key -> QueueRootRecord
+  val MAP_INDEX_FACTORY = new BTreeIndexFactory[Buffer, Buffer]();
+  MAP_INDEX_FACTORY.setKeyCodec(BufferCodec.INSTANCE);
+  MAP_INDEX_FACTORY.setValueCodec(BufferCodec.INSTANCE);
+  MAP_INDEX_FACTORY.setDeferredEncoding(true);
+
   // maps queue seq -> AddQueueEntry
   val QUEUE_ENTRY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, AddQueueEntry.Buffer]();
   QUEUE_ENTRY_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE);

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Fri Aug 19 20:37:08 2011
@@ -31,6 +31,8 @@ import jdbm.helper._
 import PBSupport._
 import org.fusesource.hawtbuf.proto.PBMessageFactory
 import java.io._
+import org.fusesource.hawtbuf.Buffer
+
 object JDBM2Client extends Log {
 
   object MessageRecordSerializer extends Serializer[MessagePB.Buffer] {
@@ -60,6 +62,19 @@ object JDBM2Client extends Log {
     }
   }
 
+  object BufferSerializer extends Serializer[Buffer] {
+    def serialize(out: SerializerOutput, v: Buffer) = {
+      out.writePackedInt(v.length())
+      out.write(v.data, v.offset, v.length)
+    }
+
+    def deserialize(in: SerializerInput) = {
+      val rc = new Buffer(in.readPackedInt());
+      in.readFully(rc.data)
+      rc
+    }
+  }
+
   object QueueEntryKeySerializer extends Serializer[(Long,Long)] {
     def serialize(out: SerializerOutput, v: (Long,Long)) = {
       out.writePackedLong(v._1)
@@ -150,6 +165,7 @@ class JDBM2Client(store: JDBM2Store) {
   var messages_db:HTree[Long, MessagePB.Buffer] = _
   var zerocp_db:HTree[Long, (Int, Long, Int)] = _
   var message_refs_db:HTree[Long, java.lang.Integer] = _
+  var map_db:HTree[Buffer, Buffer] = _
 
   var last_message_key = 0L
   var last_queue_key = 0L
@@ -203,6 +219,7 @@ class JDBM2Client(store: JDBM2Store) {
 
     transaction {
       messages_db = init_htree("messages", value_serializer = MessageRecordSerializer)
+      map_db = init_htree("map", value_serializer = BufferSerializer, key_serializer = BufferSerializer)
       zerocp_db = init_htree("lobs", value_serializer = ZeroCopyValueSerializer)
       message_refs_db = init_htree("message_refs")
       queues_db = init_htree("queues", value_serializer = QueueRecordSerializer)
@@ -353,6 +370,15 @@ class JDBM2Client(store: JDBM2Store) {
     transaction {
       var zcp_files_to_sync = Set[Int]()
       uows.foreach { uow =>
+
+        for((key,value) <- uow.map_actions) {
+          if( value==null ) {
+            map_db.remove(key)
+          } else {
+            map_db.put(key, value)
+          }
+        }
+
         uow.actions.foreach { case (msg, action) =>
 
           val message_record = action.message_record
@@ -478,11 +504,24 @@ class JDBM2Client(store: JDBM2Store) {
 
   def getLastQueueKey:Long = last_queue_key
 
+  def get(key: Buffer):Option[Buffer] = {
+    Option(map_db.find(key))
+  }
 
   def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = {
     try {
       import PBSupport._
 
+      streams.using_map_stream { stream=>
+        map_db.cursor { (key, value) =>
+          val record = new MapEntryPB.Bean
+          record.setKey(key)
+          record.setValue(value)
+          record.freeze().writeFramed(stream)
+          true
+        }
+      }
+
       streams.using_queue_stream { queue_stream=>
         queues_db.cursor { (_, value) =>
           val record:QueueRecord = value
@@ -552,6 +591,13 @@ class JDBM2Client(store: JDBM2Store) {
 
         import PBSupport._
 
+        streams.using_map_stream { stream=>
+          foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb =>
+            map_db.put(pb.getKey, pb.getValue)
+            check_flush(1, 10000)
+          }
+        }
+
         streams.using_queue_stream { queue_stream=>
           foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb =>
             val record:QueueRecord = pb

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1159777&r1=1159776&r2=1159777&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala
Fri Aug 19 20:37:08 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.apollo.dto.St
 import org.apache.activemq.apollo.util.OptionSupport._
 import java.io.{InputStream, OutputStream}
 import scala.util.continuations._
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -115,6 +116,12 @@ class JDBM2Store(var config:JDBM2StoreDT
   }
 
 
+  def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = {
+    executor {
+      callback(client.get(key))
+    }
+  }
+
   /**
    * Ges the last queue key identifier stored.
    */



Mime
View raw message