Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E4E187898 for ; Fri, 19 Aug 2011 20:37:37 +0000 (UTC) Received: (qmail 11925 invoked by uid 500); 19 Aug 2011 20:37:37 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 11852 invoked by uid 500); 19 Aug 2011 20:37:37 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 11845 invoked by uid 99); 19 Aug 2011 20:37:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2011 20:37:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Aug 2011 20:37:31 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id F278723889DE for ; Fri, 19 Aug 2011 20:37:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1159777 - in /activemq/activemq-apollo/trunk: apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/ apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-cli/src/main/sc... Date: Fri, 19 Aug 2011 20:37:09 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110819203709.F278723889DE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Aug 19 20:37:08 2011 New Revision: 1159777 URL: http://svn.apache.org/viewvc?rev=1159777&view=rev Log: Extend the store interface so that the broker can store arbitrary key/values. Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala Fri Aug 19 20:37:08 2011 @@ -31,6 +31,7 @@ import scala.Some import java.sql.ClientInfoStatus import com.sleepycat.je._ import javax.management.remote.rmi._RMIConnection_Stub +import org.fusesource.hawtbuf.Buffer object BDBClient extends Log /** @@ -160,6 +161,14 @@ class BDBClient(store: BDBStore) { _queues_db } + private var _map_db:Database = _ + def map_db:Database = { + if( _map_db==null ) { + _map_db = environment.openDatabase(tx, "map", buffer_key_conf) + } + _map_db + } + def close(ok:Boolean) = { if( _messages_db!=null ) { _messages_db.close @@ -173,6 +182,9 @@ class BDBClient(store: BDBStore) { if( _entries_db!=null ) { _entries_db.close } + if( _map_db!=null ) { + _map_db.close + } if(ok){ tx.commit @@ -324,6 +336,15 @@ class BDBClient(store: BDBStore) { import ctx._ var zcp_files_to_sync = Set[Int]() uows.foreach { uow => + + for((key,value) <- uow.map_actions) { + if( value==null ) { + map_db.delete(tx, key) + } else { + map_db.put(tx, key, value) + } + } + uow.actions.foreach { case (msg, action) => @@ -518,6 +539,14 @@ class BDBClient(store: BDBStore) { } } + + def get(key: Buffer):Option[Buffer] = { + with_ctx() { ctx=> + import ctx._ + map_db.get(tx, to_database_entry(key)).map(x=> to_buffer(x)) + } + } + def getLastQueueKey:Long = { with_ctx() { ctx=> import ctx._ @@ -532,6 +561,16 @@ class BDBClient(store: BDBStore) { import ctx._ import PBSupport._ + streams.using_map_stream { stream => + map_db.cursor(tx) { (key,value) => + val record = new MapEntryPB.Bean + record.setKey(key) + record.setValue(value) + record.freeze().writeFramed(stream) + true + } + } + streams.using_queue_stream { queue_stream => queues_db.cursor(tx) { (_, value) => val record:QueueRecord = value @@ -605,6 +644,12 @@ class BDBClient(store: BDBStore) { var zcp_counter = 0 val max_ctx = zero_copy_buffer_allocator.contexts.size + streams.using_map_stream { stream=> + foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb => + map_db.put(tx, pb.getKey, pb.getValue) + } + } + streams.using_message_stream { message_stream=> foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=> Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala Fri Aug 19 20:37:08 2011 @@ -28,6 +28,7 @@ import org.apache.activemq.apollo.dto.{S import org.apache.activemq.apollo.util.OptionSupport._ import java.io.{InputStream, OutputStream, File} import scala.util.continuations._ +import org.fusesource.hawtbuf.Buffer /** * @author Hiram Chirino @@ -127,6 +128,12 @@ class BDBStore(var config:BDBStoreDTO) e } + def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = { + read_executor { + callback(client.get(key)) + } + } + /** * Ges the last queue key identifier stored. */ Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala (original) +++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala Fri Aug 19 20:37:08 2011 @@ -35,6 +35,9 @@ object HelperTrait { implicit def to_queue_record(entry: DatabaseEntry): QueueRecord = entry.getData implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new DatabaseEntry(v) + implicit def to_buffer(entry: DatabaseEntry): Buffer = new Buffer(entry.getData) + implicit def to_database_entry(v: Buffer): DatabaseEntry = new DatabaseEntry(v.toByteArray) + implicit def decode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = { val in = new DataByteArrayInputStream(entry.getData) (in.readVarInt(), in.readVarLong(), in.readVarInt()) @@ -124,6 +127,11 @@ object HelperTrait { } + val buffer_key_conf = new DatabaseConfig(); + buffer_key_conf.setAllowCreate(true) + buffer_key_conf.setTransactional(true); + buffer_key_conf.setSortedDuplicates(false); + val long_key_conf = new DatabaseConfig(); long_key_conf.setAllowCreate(true) long_key_conf.setTransactional(true); Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Aug 19 20:37:08 2011 @@ -51,3 +51,8 @@ message QueueEntryPB { optional sint64 expiration=7; optional bytes messageLocator=8; } + +message MapEntryPB { + required bytes key = 1; + optional bytes value = 2; +} Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Fri Aug 19 20:37:08 2011 @@ -25,6 +25,7 @@ import atomic.{AtomicReference, AtomicIn import org.apache.activemq.apollo.util._ import org.fusesource.hawtdispatch.{BaseRetained, ListEventAggregator} import org.apache.activemq.apollo.dto.{StoreStatusDTO, TimeMetricDTO, IntMetricDTO} +import org.fusesource.hawtbuf.Buffer /** *

@@ -87,6 +88,12 @@ trait DelayingStoreSupport extends Store var flush_listeners = ListBuffer[() => Unit]() var disable_delay = false + var map_actions = Map[Buffer, Buffer]() + + def put(key: Buffer, value: Buffer) = { + map_actions += (key -> value) + } + def on_flush(callback: =>Unit) = { if( this.synchronized { if( flushed ) { @@ -121,7 +128,7 @@ trait DelayingStoreSupport extends Store def rm(msg:Long) = { actions -= msg - if( actions.isEmpty ) { + if( actions.isEmpty && map_actions.isEmpty ) { cancel } } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Fri Aug 19 20:37:08 2011 @@ -21,8 +21,10 @@ import org.apache.activemq.apollo.util._ import java.io.{InputStream, OutputStream} import scala.util.continuations._ import java.util.concurrent.atomic.{AtomicReference, AtomicLong} +import org.fusesource.hawtbuf.Buffer trait StreamManager[A] { + def using_map_stream(func: (A)=>Unit) def using_queue_stream(func: (A)=>Unit) def using_message_stream(func: (A)=>Unit) def using_queue_entry_stream(func: (A)=>Unit) @@ -76,6 +78,11 @@ trait Store extends ServiceTrait { def remove_queue(queueKey:Long)(callback:(Boolean)=>Unit):Unit /** + * Gets a value of a previously stored map entry. + */ + def get(key:Buffer)(callback:(Option[Buffer])=>Unit ) + + /** * Loads the queue information for a given queue key. */ def get_queue(queueKey:Long)(callback:(Option[QueueRecord])=>Unit ) Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala Fri Aug 19 20:37:08 2011 @@ -18,6 +18,7 @@ package org.apache.activemq.apollo.broke import org.fusesource.hawtdispatch.{Retained} import org.apache.activemq.apollo.broker.store._ +import org.fusesource.hawtbuf.Buffer /** * A store uow is used to perform persistent @@ -56,6 +57,12 @@ trait StoreUOW extends Retained { def dequeue(entry:QueueEntryRecord) /** + * Creates or updates a map entry. Set value to null to + * remove the entry. + */ + def put(key:Buffer, value:Buffer) + + /** * Marks this uow as needing to be completed * as soon as possible. If not called, the Store * implementation may delay completing the uow in Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreExport.scala Fri Aug 19 20:37:08 2011 @@ -91,6 +91,7 @@ class StoreExport extends Action { def using_queue_stream(func: (OutputStream) => Unit) = entry("queues.dat", func) def using_queue_entry_stream(func: (OutputStream) => Unit) = entry("queue_entries.dat", func) def using_message_stream(func: (OutputStream) => Unit) = entry("messages.dat", func) + def using_map_stream(func: (OutputStream) => Unit) = entry("map.dat", func) } reset { val rc = store.export_pb(manager) Modified: activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-cli/src/main/scala/org/apache/activemq/apollo/cli/commands/StoreImport.scala Fri Aug 19 20:37:08 2011 @@ -96,6 +96,7 @@ class StoreImport extends Action { def using_queue_stream(func: (InputStream) => Unit) = entry("queues.dat", func) def using_queue_entry_stream(func: (InputStream) => Unit) = entry("queue_entries.dat", func) def using_message_stream(func: (InputStream) => Unit) = entry("messages.dat", func) + def using_map_stream(func: (InputStream) => Unit) = entry("map.dat", func) } reset { val rc = store.import_pb(manager) Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/proto/data.proto Fri Aug 19 20:37:08 2011 @@ -29,10 +29,7 @@ enum Type { ADD_QUEUE = 10; REMOVE_QUEUE = 11; - ADD_MAP = 30; - REMOVE_MAP = 31; - PUT_MAP_ENTRY = 32; - REMOVE_MAP_ENTRY = 33; + MAP_ENTRY = 32; ADD_SUBSCRIPTION = 50; REMOVE_SUBSCRIPTION = 51; @@ -109,24 +106,9 @@ message RemoveSubscription { required bytes name = 1 [java_override_type = "AsciiBuffer"]; } - -/////////////////////////////////////////////////////////////// -// Map related operations. -/////////////////////////////////////////////////////////////// -message AddMap { - optional bytes mapName = 1 [java_override_type = "AsciiBuffer"]; -} -message RemoveMap { - optional bytes mapName = 1 [java_override_type = "AsciiBuffer"]; -} -message PutMapEntry { - optional bytes mapName = 1 [java_override_type = "AsciiBuffer"]; - optional bytes id = 2 [java_override_type = "AsciiBuffer"]; - optional bytes value = 3; -} -message RemoveMapEntry { - optional bytes mapName = 1 [java_override_type = "AsciiBuffer"]; - optional bytes id = 2 [java_override_type = "AsciiBuffer"]; +message MapEntry { + required bytes key=1; + optional bytes value = 2; } /////////////////////////////////////////////////////////////// Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBClient.scala Fri Aug 19 20:37:08 2011 @@ -228,11 +228,18 @@ class HawtDBClient(hawtDBStore: HawtDBSt _store(update, callback) } - def store(txs: Seq[HawtDBStore#DelayableUOW], callback:Runnable) { + def store(uows: Seq[HawtDBStore#DelayableUOW], callback:Runnable) { var batch = ListBuffer[TypeCreatable]() - txs.foreach { - tx => - tx.actions.foreach { + uows.foreach { uow => + + for((key,value) <- uow.map_actions) { + val entry = new MapEntry.Bean + entry.setKey(key) + entry.setValue(value) + batch += entry + } + + uow.actions.foreach { case (msg, action) => if (action.message_record != null) { val update: AddMessage.Bean = action.message_record @@ -273,6 +280,14 @@ class HawtDBClient(hawtDBStore: HawtDBSt rc } + def get(key: Buffer):Option[Buffer] = { + withTx { tx => + val helper = new TxHelper(tx) + import helper._ + Option(mapIndex.get(key)) + } + } + def getQueue(queueKey: Long): Option[QueueRecord] = { withTx { tx => val helper = new TxHelper(tx) @@ -897,13 +912,13 @@ class HawtDBClient(hawtDBStore: HawtDBSt cleanup(_tx); info("Store purged."); - case x: AddSubscription.Getter => - case x: RemoveSubscription.Getter => - - case x: AddMap.Getter => - case x: RemoveMap.Getter => - case x: PutMapEntry.Getter => - case x: RemoveMapEntry.Getter => + case x: MapEntry.Getter => + val value = x.getValue + if( value==null ) { + mapIndex.remove(x.getKey) + } else { + mapIndex.put(x.getKey, value) + } } } @@ -996,6 +1011,7 @@ class HawtDBClient(hawtDBStore: HawtDBSt private case class Update(update: TypeCreatable, location: Location) private class TxHelper(val _tx: Transaction) { + lazy val mapIndex = MAP_INDEX_FACTORY.open(_tx, rootBuffer.getMapIndexPage) lazy val queueIndex = QUEUE_INDEX_FACTORY.open(_tx, rootBuffer.getQueueIndexPage) lazy val dataFileRefIndex = DATA_FILE_REF_INDEX_FACTORY.open(_tx, rootBuffer.getDataFileRefIndexPage) lazy val messageKeyIndex = MESSAGE_KEY_INDEX_FACTORY.open(_tx, rootBuffer.getMessageKeyIndexPage) Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBStore.scala Fri Aug 19 20:37:08 2011 @@ -28,6 +28,7 @@ import org.fusesource.hawtdispatch.ListE import org.apache.activemq.apollo.util.OptionSupport._ import java.io.{InputStream, OutputStream} import scala.util.continuations._ +import org.fusesource.hawtbuf.Buffer object HawtDBStore extends Log { val DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; @@ -144,6 +145,11 @@ class HawtDBStore(var config:HawtDBStore } } + def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = { + executor_pool { + callback(client.get(key)) + } + } /** * Ges the last queue key identifier stored. Modified: activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala (original) +++ activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/Helpers.scala Fri Aug 19 20:37:08 2011 @@ -140,6 +140,12 @@ object Helpers { QUEUE_INDEX_FACTORY.setValueCodec(QueueRootRecord.FRAMED_CODEC); QUEUE_INDEX_FACTORY.setDeferredEncoding(true); + // maps queue key -> QueueRootRecord + val MAP_INDEX_FACTORY = new BTreeIndexFactory[Buffer, Buffer](); + MAP_INDEX_FACTORY.setKeyCodec(BufferCodec.INSTANCE); + MAP_INDEX_FACTORY.setValueCodec(BufferCodec.INSTANCE); + MAP_INDEX_FACTORY.setDeferredEncoding(true); + // maps queue seq -> AddQueueEntry val QUEUE_ENTRY_INDEX_FACTORY = new BTreeIndexFactory[jl.Long, AddQueueEntry.Buffer](); QUEUE_ENTRY_INDEX_FACTORY.setKeyCodec(VarLongCodec.INSTANCE); Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala (original) +++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala Fri Aug 19 20:37:08 2011 @@ -31,6 +31,8 @@ import jdbm.helper._ import PBSupport._ import org.fusesource.hawtbuf.proto.PBMessageFactory import java.io._ +import org.fusesource.hawtbuf.Buffer + object JDBM2Client extends Log { object MessageRecordSerializer extends Serializer[MessagePB.Buffer] { @@ -60,6 +62,19 @@ object JDBM2Client extends Log { } } + object BufferSerializer extends Serializer[Buffer] { + def serialize(out: SerializerOutput, v: Buffer) = { + out.writePackedInt(v.length()) + out.write(v.data, v.offset, v.length) + } + + def deserialize(in: SerializerInput) = { + val rc = new Buffer(in.readPackedInt()); + in.readFully(rc.data) + rc + } + } + object QueueEntryKeySerializer extends Serializer[(Long,Long)] { def serialize(out: SerializerOutput, v: (Long,Long)) = { out.writePackedLong(v._1) @@ -150,6 +165,7 @@ class JDBM2Client(store: JDBM2Store) { var messages_db:HTree[Long, MessagePB.Buffer] = _ var zerocp_db:HTree[Long, (Int, Long, Int)] = _ var message_refs_db:HTree[Long, java.lang.Integer] = _ + var map_db:HTree[Buffer, Buffer] = _ var last_message_key = 0L var last_queue_key = 0L @@ -203,6 +219,7 @@ class JDBM2Client(store: JDBM2Store) { transaction { messages_db = init_htree("messages", value_serializer = MessageRecordSerializer) + map_db = init_htree("map", value_serializer = BufferSerializer, key_serializer = BufferSerializer) zerocp_db = init_htree("lobs", value_serializer = ZeroCopyValueSerializer) message_refs_db = init_htree("message_refs") queues_db = init_htree("queues", value_serializer = QueueRecordSerializer) @@ -353,6 +370,15 @@ class JDBM2Client(store: JDBM2Store) { transaction { var zcp_files_to_sync = Set[Int]() uows.foreach { uow => + + for((key,value) <- uow.map_actions) { + if( value==null ) { + map_db.remove(key) + } else { + map_db.put(key, value) + } + } + uow.actions.foreach { case (msg, action) => val message_record = action.message_record @@ -478,11 +504,24 @@ class JDBM2Client(store: JDBM2Store) { def getLastQueueKey:Long = last_queue_key + def get(key: Buffer):Option[Buffer] = { + Option(map_db.find(key)) + } def export_pb(streams:StreamManager[OutputStream]):Result[Zilch,String] = { try { import PBSupport._ + streams.using_map_stream { stream=> + map_db.cursor { (key, value) => + val record = new MapEntryPB.Bean + record.setKey(key) + record.setValue(value) + record.freeze().writeFramed(stream) + true + } + } + streams.using_queue_stream { queue_stream=> queues_db.cursor { (_, value) => val record:QueueRecord = value @@ -552,6 +591,13 @@ class JDBM2Client(store: JDBM2Store) { import PBSupport._ + streams.using_map_stream { stream=> + foreach[MapEntryPB.Buffer](stream, MapEntryPB.FACTORY) { pb => + map_db.put(pb.getKey, pb.getValue) + check_flush(1, 10000) + } + } + streams.using_queue_stream { queue_stream=> foreach[QueuePB.Buffer](queue_stream, QueuePB.FACTORY) { pb => val record:QueueRecord = pb Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala?rev=1159777&r1=1159776&r2=1159777&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Store.scala Fri Aug 19 20:37:08 2011 @@ -28,6 +28,7 @@ import org.apache.activemq.apollo.dto.St import org.apache.activemq.apollo.util.OptionSupport._ import java.io.{InputStream, OutputStream} import scala.util.continuations._ +import org.fusesource.hawtbuf.Buffer /** * @author Hiram Chirino @@ -115,6 +116,12 @@ class JDBM2Store(var config:JDBM2StoreDT } + def get(key: Buffer)(callback: (Option[Buffer]) => Unit) = { + executor { + callback(client.get(key)) + } + } + /** * Ges the last queue key identifier stored. */