activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1056471 - in /activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb: BDBClient.scala BDBStore.scala HelperTrait.scala dto/BDBStoreDTO.java
Date Fri, 07 Jan 2011 19:37:42 GMT
Author: chirino
Date: Fri Jan  7 19:37:42 2011
New Revision: 1056471

URL: http://svn.apache.org/viewvc?rev=1056471&view=rev
Log:
Added zero copy support to the bdb store.

Modified:
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
    activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBClient.scala
Fri Jan  7 19:37:42 2011
@@ -59,6 +59,13 @@ class BDBClient(store: BDBStore) extends
 
   var environment:Environment = _
 
+  var zero_copy_buffer_allocator: FileZeroCopyBufferAllocator = _
+
+  def zero_copy_dir = {
+    import FileSupport._
+    config.directory / "zerocp"
+  }
+
   def start() = {
     val env_config = new EnvironmentConfig();
     env_config.setAllowCreate(true);
@@ -66,6 +73,12 @@ class BDBClient(store: BDBStore) extends
     env_config.setTxnSerializableIsolation(false)
 
     directory.mkdirs
+
+    if( Option(config.zero_copy).map(_.booleanValue).getOrElse(false) ) {
+      zero_copy_buffer_allocator = new FileZeroCopyBufferAllocator(zero_copy_dir)
+      zero_copy_buffer_allocator.start
+    }
+
     environment = new Environment(directory, env_config);
 
     with_ctx { ctx=>
@@ -73,11 +86,23 @@ class BDBClient(store: BDBStore) extends
       messages_db
       message_refs_db
       queues_db
+
+      if( zero_copy_buffer_allocator!=null ) {
+        zerocp_db.cursor(tx) { (_,value)=>
+          val v = encode_zcp_value(value)
+          zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
+          true
+        }
+      }
     }
   }
 
   def stop() = {
     environment.close
+    if( zero_copy_buffer_allocator!=null ) {
+      zero_copy_buffer_allocator.stop
+      zero_copy_buffer_allocator = null
+    }
   }
 
   case class TxContext(tx:Transaction) {
@@ -100,6 +125,13 @@ class BDBClient(store: BDBStore) extends
       _messages_db
     }
 
+    private var _zerocp_db:Database = _
+    def zerocp_db:Database = {
+      if( _zerocp_db==null ) {
+        _zerocp_db = environment.openDatabase(tx, "zerocp", long_key_conf)
+      }
+      _zerocp_db
+    }
 
     private var _message_refs_db:Database = _
     def message_refs_db:Database = {
@@ -210,6 +242,20 @@ class BDBClient(store: BDBStore) extends
     callback.run
   }
 
+  def decrement_message_reference(ctx:TxContext, msg_key:Long) = {
+    import ctx._
+    if( add_and_get(message_refs_db, msg_key, -1, tx)==0 ) {
+      messages_db.delete(tx, msg_key)
+      if( zero_copy_buffer_allocator!=null ){
+        zerocp_db.get(tx, to_database_entry(msg_key)).foreach { v=>
+          val location  = decode_zcp_value(v)
+          zero_copy_buffer_allocator.free(location._1, location._2, location._3)
+        }
+        zerocp_db.delete(tx, msg_key)
+      }
+    }
+  }
+
   def removeQueue(queue_key: Long, callback:Runnable) = {
     with_ctx { ctx=>
       import ctx._
@@ -219,9 +265,7 @@ class BDBClient(store: BDBStore) extends
 
         entries_db.cursor(tx) { (key,value)=>
           val queueEntry:QueueEntryRecord = value
-          if( add_and_get(message_refs_db, queueEntry.message_key, -1, tx)==0 ) {
-            messages_db.delete(tx, queueEntry.message_key)
-          }
+          decrement_message_reference(ctx, queueEntry.message_key)
           true // keep cursoring..
         }
 
@@ -235,13 +279,29 @@ class BDBClient(store: BDBStore) extends
   def store(uows: Seq[BDBStore#DelayableUOW], callback:Runnable) {
     with_ctx { ctx=>
       import ctx._
-
+      var zcp_files_to_sync = Set[Int]()
       uows.foreach { uow =>
           uow.actions.foreach {
             case (msg, action) =>
 
-              if (action.messageRecord != null) {
-                messages_db.put(tx, action.messageRecord.key, action.messageRecord)
+              val message_record = action.messageRecord
+              if (message_record != null) {
+                import PBSupport._
+
+                val pb = if( message_record.zero_copy_buffer != null ) {
+                  val r = to_pb(action.messageRecord).copy
+                  val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
+                  r.setZcpFile(buffer.file)
+                  r.setZcpOffset(buffer.offset)
+                  r.setZcpSize(buffer.size)
+                  zerocp_db.put(tx, message_record.key, (buffer.file, buffer.offset, buffer.size))
+                  zcp_files_to_sync += buffer.file
+                  r.freeze
+                } else {
+                  to_pb(action.messageRecord)
+                }
+
+                messages_db.put(tx, action.messageRecord.key, pb)
               }
 
               action.enqueues.foreach { queueEntry =>
@@ -254,13 +314,14 @@ class BDBClient(store: BDBStore) extends
               action.dequeues.foreach { queueEntry =>
                 with_entries_db(queueEntry.queue_key) { entries_db=>
                   entries_db.delete(tx, queueEntry.entry_seq)
-                  if( add_and_get(message_refs_db, queueEntry.message_key, -1, tx)==0 ) {
-                    messages_db.delete(tx, queueEntry.message_key)
-                  }
+                  decrement_message_reference(ctx, queueEntry.message_key)
                 }
               }
           }
       }
+      if( zero_copy_buffer_allocator!=null ) {
+        zcp_files_to_sync.foreach(zero_copy_buffer_allocator.sync(_))
+      }
     }
     callback.run
   }
@@ -346,27 +407,24 @@ class BDBClient(store: BDBStore) extends
   var metric_load_from_index = metric_load_from_index_counter(false)
 
   def loadMessages(requests: ListBuffer[(Long, (Option[MessageRecord])=>Unit)]) = {
-    val records = with_ctx { ctx=>
-      import ctx._
 
-      requests.flatMap { case (message_key, callback)=>
+    with_ctx { ctx=>
+      import ctx._
+      requests.foreach { case (message_key, callback)=>
         val record = metric_load_from_index_counter.time {
-          messages_db.get(tx, to_database_entry(message_key)).map ( to_message_record _ )
-        }
-        record match {
-          case None =>
-          debug("Message not indexed: %s", message_key)
-          callback(None)
-          None
-          case Some(x) => Some((record, callback))
+          messages_db.get(tx, to_database_entry(message_key)).map{ data=>
+            import PBSupport._
+            val pb:MessagePB.Buffer = data
+            val rc = from_pb(pb)
+            if( pb.hasZcpFile ) {
+              rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile,
pb.getZcpOffset, pb.getZcpSize)
+            }
+            rc
+          }
         }
+        callback(record)
       }
     }
-
-    records.foreach { case (record, callback)=>
-      callback( record )
-    }
-
   }
 
 
@@ -401,9 +459,20 @@ class BDBClient(store: BDBStore) extends
         }
 
         streams.using_message_stream { message_stream=>
-          messages_db.cursor(tx) { (_, value) =>
-            val record:MessageRecord = value
-            record.writeFramed(message_stream)
+          messages_db.cursor(tx) { (_, data) =>
+            import PBSupport._
+            val pb = MessagePB.FACTORY.parseUnframed(data.getData)
+            if( pb.hasZcpFile ) {
+              val zcpb = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset,
pb.getZcpSize)
+              var data = pb.copy
+              data.clearZcpFile
+              data.clearZcpFile
+              // write the pb frame and then the direct buffer data..
+              data.freeze.writeFramed(message_stream)
+              zcpb.read(message_stream)
+            } else {
+              pb.writeFramed(message_stream)
+            }
             true
           }
         }
@@ -459,10 +528,28 @@ class BDBClient(store: BDBStore) extends
           }
         }
 
+        var zcp_counter = 0
+        val max_ctx = zero_copy_buffer_allocator.contexts.size
+
         streams.using_message_stream { message_stream=>
           foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
-            val record:MessageRecord = pb
-            messages_db.put(tx, record.key, record)
+
+            val record:MessagePB.Buffer = if( pb.hasZcpSize ) {
+              val cp = pb.copy
+              val zcpb = zero_copy_buffer_allocator.contexts(zcp_counter % max_ctx).alloc(cp.getZcpSize)
+              cp.setZcpFile(zcpb.file)
+              cp.setZcpOffset(zcpb.offset)
+
+              zcp_counter += 1
+              zcpb.write(message_stream)
+
+              zerocp_db.put(tx, pb.getMessageKey, (zcpb.file, zcpb.offset, zcpb.size))
+              cp.freeze
+            } else {
+              pb
+            }
+
+            messages_db.put(tx, record.getMessageKey, record)
           }
         }
 

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/BDBStore.scala
Fri Jan  7 19:37:42 2011
@@ -80,6 +80,8 @@ class BDBStore extends DelayingStoreSupp
   
   protected def get_next_msg_key = next_msg_key.getAndIncrement
 
+  override def zero_copy_buffer_allocator():ZeroCopyBufferAllocator = client.zero_copy_buffer_allocator
+
   protected def store(uows: Seq[DelayableUOW])(callback: =>Unit) = {
     write_executor {
       client.store(uows, ^{

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/HelperTrait.scala
Fri Jan  7 19:37:42 2011
@@ -20,13 +20,14 @@ import java.util.Comparator
 import java.nio.ByteBuffer
 import com.sleepycat.je._
 import java.io.Serializable
-import org.apache.activemq.apollo.broker.store.{PBSupport, MessageRecord, QueueRecord, QueueEntryRecord}
+import org.fusesource.hawtbuf.Buffer
+import org.apache.activemq.apollo.broker.store._
 import PBSupport._
 
 object HelperTrait {
 
-  implicit def to_message_record(entry: DatabaseEntry): MessageRecord = entry.getData
-  implicit def to_database_entry(v: MessageRecord): DatabaseEntry = new DatabaseEntry(v)
+  implicit def to_message_buffer(entry: DatabaseEntry): MessagePB.Buffer = MessagePB.FACTORY.parseUnframed(entry.getData)
+  implicit def to_database_entry(v: MessagePB.Buffer): DatabaseEntry = new DatabaseEntry(v.toUnframedByteArray)
 
   implicit def to_queue_entry_record(entry: DatabaseEntry): QueueEntryRecord = entry.getData
   implicit def to_database_entry(v: QueueEntryRecord): DatabaseEntry = new DatabaseEntry(v)
@@ -34,6 +35,18 @@ object HelperTrait {
   implicit def to_queue_record(entry: DatabaseEntry): QueueRecord = entry.getData
   implicit def to_database_entry(v: QueueRecord): DatabaseEntry = new DatabaseEntry(v)
 
+  implicit def encode_zcp_value(entry: DatabaseEntry): (Int,Long,Int) = {
+    val e = new Buffer(entry.getData).bigEndianEditor()
+    (e.readInt, e.readLong, e.readInt)
+  }
+  implicit def decode_zcp_value(v: (Int,Long,Int)): DatabaseEntry = {
+    val buf = new Buffer(16)
+    val e = buf.bigEndianEditor()
+    e.writeInt(v._1)
+    e.writeLong(v._2)
+    e.writeInt(v._1)
+    new DatabaseEntry(buf.data)
+  }
 
   implicit def to_bytes(l:Long):Array[Byte] = ByteBuffer.wrap(new Array[Byte](8)).putLong(l).array()
   implicit def to_long(bytes:Array[Byte]):Long = ByteBuffer.wrap(bytes).getLong()

Modified: activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java?rev=1056471&r1=1056470&r2=1056471&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-bdb/src/main/scala/org/apache/activemq/apollo/broker/store/bdb/dto/BDBStoreDTO.java
Fri Jan  7 19:37:42 2011
@@ -37,4 +37,8 @@ public class BDBStoreDTO extends StoreDT
     @XmlAttribute(name="read_threads")
     public Integer read_threads;
 
+    @XmlAttribute(name="zero_copy")
+    public Boolean zero_copy;
+
+
 }



Mime
View raw message