Author: chirino
Date: Fri Jan 7 19:37:42 2011
New Revision: 1056471
URL: http://svn.apache.org/viewvc?rev=1056471&view=rev
Log:
Added zero copy support to the bdb store.
Modified:
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Jan 7 19:37:42 2011
@@ -59,6 +59,13 @@ class BDBClient(store: BDBStore) extends
var environment:Environment = _
+ var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
+
+ def zero_copy_dir = {
+ import FileSupport._
+ config.directory / "zerocp"
+ }
+
def start() = {
val env_config = new EnvironmentConfig();
env_config.setAllowCreate(true);
@@ -66,6 +73,12 @@ class BDBClient(store: BDBStore) extends
env_config.setTxnSerializableIsolation(false)
directory.mkdirs
+
+ if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
+ zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
+ zero_copy_buffer_allocator.start
+ }
+
environment = new Environment(directory, env_config);
with_ctx { ctx=>
@@ -73,11 +86,23 @@ class BDBClient(store: BDBStore) extends
messages_db
message_refs_db
queues_db
+
+ if( zero_copy_buffer_allocator!=null ) {
+ zerocp_db.cursor(tx) { (_,value)=>
+ val v = encode_zcp_value(value)
+ zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
+ true
+ }
+ }
}
}
def stop() = {
environment.close
+ if( zero_copy_buffer_allocator!=null ) {
+ zero_copy_buffer_allocator.stop
+ zero_copy_buffer_allocator = null
+ }
}
case class TxContext(tx:Transaction) {
@@ -100,6 +125,13 @@ class BDBClient(store: BDBStore) extends
_messages_db
}
+ private var _zerocp_db:Database = _
+ def zerocp_db:Database = {
+ if( _zerocp_db==null ) {
+ _zerocp_db = environment.openDatabase(tx, "zerocp", long_key_conf)
+ }
+ _zerocp_db
+ }
private var _message_refs_db:Database = _
def message_refs_db:Database = {
@@ -210,6 +242,20 @@ class BDBClient(store: BDBStore) extends
callback.run
}
+ def decrement_message_reference(ctx:TxContext, msg_key:Long) = {
+ import ctx._
+ if( add_and_get(message_refs_db, msg_key, -1, tx)==0 ) {
+ messages_db.delete(tx, msg_key)
+ if( zero_copy_buffer_allocator!=null ){
+ zerocp_db.get(tx, to_database_entry(msg_key)).foreach { v=>
+ val location = decode_zcp_value(v)
+ zero_copy_buffer_allocator.free(location._1, location._2, location._3)
+ }
+ zerocp_db.delete(tx, msg_key)
+ }
+ }
+ }
+
def removeQueue(queue_key: Long, callback:Runnable) = {
with_ctx { ctx=>
import ctx._
@@ -219,9 +265,7 @@ class BDBClient(store: BDBStore) extends
entries_db.cursor(tx) { (key,value)=>
val queueEntry:QueueEntryRecord = value
- if( add_and_get(message_refs_db, queueEntry.message_key, -1, tx)==0 ) {
- messages_db.delete(tx, queueEntry.message_key)
- }
+ decrement_message_reference(ctx, queueEntry.message_key)
true // keep cursoring..
}
@@ -235,13 +279,29 @@ class BDBClient(store: BDBStore) extends
def store(uows: Seq[BDBStore#DelayableUOW], callback:Runnable) {
with_ctx { ctx=>
import ctx._
-
+ var zcp_files_to_sync = Set[Int]()
uows.foreach { uow =>
uow.actions.foreach {
case (msg, action) =>
- if (action.messageRecord != null) {
- messages_db.put(tx, action.messageRecord.key, action.messageRecord)
+ val message_record = action.messageRecord
+ if (message_record != null) {
+ import PBSupport._
+
+ val pb = if( message_record.zero_copy_buffer != null ) {
+ val r = to_pb(action.messageRecord).copy
+ val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
+ r.setZcpFile(buffer.file)
+ r.setZcpOffset(buffer.offset)
+ r.setZcpSize(buffer.size)
+ zerocp_db.put(tx, message_record.key, (buffer.file, buffer.offset, buffer.size))
+ zcp_files_to_sync += buffer.file
+ r.freeze
+ } else {
+ to_pb(action.messageRecord)
+ }
+
+ messages_db.put(tx, action.messageRecord.key, pb)
}
action.enqueues.foreach { queueEntry =>
@@ -254,13 +314,14 @@ class BDBClient(store: BDBStore) extends
action.dequeues.foreach { queueEntry =>
with_entries_db(queueEntry.queue_key) { entries_db=>
entries_db.delete(tx, queueEntry.entry_seq)
- if( add_and_get(message_refs_db, queueEntry.message_key, -1, tx)==0 ) {
- messages_db.delete(tx, queueEntry.message_key)
- }
+ decrement_message_reference(ctx, queueEntry.message_key)
}
}
}
}
+ if( zero_copy_buffer_allocator!=null ) {
+ zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
+ }
}
callback.run
}
@@ -346,27 +407,24 @@ class BDBClient(store: BDBStore) extends
var metric_load_from_index = metric_load_from_index_counter(false)
def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
- val records = with_ctx { ctx=>
- import ctx._
- requests.flatMap { case (message_key, callback)=>
+ with_ctx { ctx=>
+ import ctx._
+ requests.foreach { case (message_key, callback)=>
val record = metric_load_from_index_counter.time {
- messages_db.get(tx, to_database_entry(message_key)).map ( to_message_record _ )
- }
- record match {
- case None =>
- debug("Message not indexed: %s", message_key)
- callback(None)
- None
- case Some(x) => Some((record, callback))
+ messages_db.get(tx, to_database_entry(message_key)).map{ data=>
+ import PBSupport._
+ val pb:MessagePB.Buffer = data
+ val rc = from_pb(pb)
+ if( pb.hasZcpFile ) {
+ rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile,
pb.getZcpOffset, pb.getZcpSize)
+ }
+ rc
+ }
}
+ callback(record)
}
}
-
- records.foreach { case (record, callback)=>
- callback( record )
- }
-
}
@@ -401,9 +459,20 @@ class BDBClient(store: BDBStore) extends
}
streams.using_message_stream { message_stream=>
- messages_db.cursor(tx) { (_, value) =>
- val record:MessageRecord = value
- record.writeFramed(message_stream)
+ messages_db.cursor(tx) { (_, data) =>
+ import PBSupport._
+ val pb = MessagePB.FACTORY.parseUnframed(data.getData)
+ if( pb.hasZcpFile ) {
+ val zcpb = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset,
pb.getZcpSize)
+ var data = pb.copy
+ data.clearZcpFile
+ data.clearZcpFile
+ // write the pb frame and then the direct buffer data..
+ data.freeze.writeFramed(message_stream)
+ zcpb.read(message_stream)
+ } else {
+ pb.writeFramed(message_stream)
+ }
true
}
}
@@ -459,10 +528,28 @@ class BDBClient(store: BDBStore) extends
}
}
+ var zcp_counter = 0
+ val max_ctx = zero_copy_buffer_allocator.contexts.size
+
streams.using_message_stream { message_stream=>
foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
- val record:MessageRecord = pb
- messages_db.put(tx, record.key, record)
+
+ val record:MessagePB.Buffer = if( pb.hasZcpSize ) {
+ val cp = pb.copy
+ val zcpb = zero_copy_buffer_allocator.contexts(zcp_counter % max_ctx).alloc(cp.getZcpSize)
+ cp.setZcpFile(zcpb.file)
+ cp.setZcpOffset(zcpb.offset)
+
+ zcp_counter += 1
+ zcpb.write(message_stream)
+
+ zerocp_db.put(tx, pb.getMessageKey, (zcpb.file, zcpb.offset, zcpb.size))
+ cp.freeze
+ } else {
+ pb
+ }
+
+ messages_db.put(tx, record.getMessageKey, record)
}
}
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Jan 7 19:37:42 2011
@@ -80,6 +80,8 @@ class BDBStore extends DelayingStoreSupp
protected def get_next_msg_key = next_msg_key.getAndIncrement
+ override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
+
protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
write_executor {
client.store(uows, ^{
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
Fri Jan 7 19:37:42 2011
@@ -20,13 +20,14 @@ import java.util.Comparator
import java.nio.ByteBuffer
import com.sleepycat.je._
import java.io.Serializable
-import org.apache.activemq.apollo.broker.store.{PBSupport, MessageRecord, QueueRecord, QueueEntryRecord}
+import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.store._
import PBSupport._
object HelperTrait {
- implicit def to_message_record(entry: DatabaseEntry): MessageRecord = entry.getData
- implicit def to_database_entry(v: MessageRecord): DatabaseEntry = new DatabaseEntry(v)
+ implicit def to_message_buffer(entry: DatabaseEntry): MessagePB.Buffer = MessagePB.FACTORY.parseUnframed(entry.getData)
+ implicit def to_database_entry(v: MessagePB.Buffer): DatabaseEntry = new DatabaseEntry(v.toUnframedByteArray)
implicit def to_queue_entry_record(entry: DatabaseEntry): QueueEntryRecord = entry.getData
implicit def to_database_entry(v: QueueEntryRecord): DatabaseEntry = new DatabaseEntry(v)
@@ -34,6 +35,18 @@ object HelperTrait {
implicit def to_queue_record(entry: DatabaseEntry): QueueRecord = entry.getData
implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new DatabaseEntry(v)
+ implicit def encode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
+ val e = new Buffer(entry.getData).bigEndianEditor()
+ (e.readInt, e.readLong, e.readInt)
+ }
+ implicit def decode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
+ val buf = new Buffer(16)
+ val e = buf.bigEndianEditor()
+ e.writeInt(v._1)
+ e.writeLong(v._2)
+ e.writeInt(v._1)
+ new DatabaseEntry(buf.data)
+ }
implicit def to_bytes(l:Long):Array[Byte] = ByteBuffer.wrap(new Array[Byte](8)).putLong(l).array()
implicit def to_long(bytes:Array[Byte]):Long = ByteBuffer.wrap(bytes).getLong()
Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
Fri Jan 7 19:37:42 2011
@@ -37,4 +37,8 @@ public class BDBStoreDTO extends StoreDT
@XmlAttribute(name="read_threads")
public Integer read_threads;
+ @XmlAttribute(name="zero_copy")
+ public Boolean zero_copy;
+
+
}
|