activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1056470 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/proto/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/
Date Fri, 07 Jan 2011 19:37:34 GMT
Author: chirino
Date: Fri Jan  7 19:37:34 2011
New Revision: 1056470

URL: http://svn.apache.org/viewvc?rev=1056470&view=rev
Log:
handle import/export the zero copy data too.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto?rev=1056470&r1=1056469&r2=1056470&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/proto/data.proto Fri Jan  7 19:37:34
2011
@@ -27,8 +27,12 @@ message MessagePB {
   required bytes protocol = 2 [java_override_type = "AsciiBuffer"];
   required int32 size = 3;
   optional bytes value = 4;
-  optional bool  direct = 5;
-  optional sint64 expiration = 6;
+  optional sint64 expiration = 5;
+  
+  optional bytes zcp_data = 10;
+  optional int32 zcp_file = 12;
+  optional int64 zcp_offset = 13;
+  optional int32 zcp_size = 14;
 }
 
 message QueuePB {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala?rev=1056470&r1=1056469&r2=1056470&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
Fri Jan  7 19:37:34 2011
@@ -294,6 +294,22 @@ trait FileZeroCopyBufferTrait extends Ze
     }
   }
 
+  def write(target: InputStream): Unit = {
+    assert(retained > 0)
+    val b = ByteBuffer.allocate(size.min(1024*4))
+    var pos = 0
+    while( remaining(pos)> 0 ) {
+      val max = remaining(pos).min(b.capacity)
+      b.clear
+      val count = target.read(b.array, 0, max)
+      if( count == -1 ) {
+        throw new EOFException()
+      }
+      val x = channel.write(b)
+      assert(x == count)
+      pos += count
+    }
+  }
 }
 
 /**
@@ -345,7 +361,7 @@ class FileZeroCopyBufferAllocator(val di
       }
     }
 
-    def alloc(size: Int): ZeroCopyBuffer = current_context { ctx=>
+    def alloc(size: Int) = current_context { ctx=>
       val allocation = allocator.alloc(size)
       assert(allocation!=null)
       current_size = current_size.max(allocation.offset + allocation.size)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala?rev=1056470&r1=1056469&r2=1056470&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ZeroCopyBufferAllocator.scala
Fri Jan  7 19:37:34 2011
@@ -54,4 +54,5 @@ trait ZeroCopyBuffer extends Retained {
 
   def write(src:ByteBuffer, target:Int):Int
 
+  def write(target:InputStream):Unit
 }

Modified: activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala?rev=1056470&r1=1056469&r2=1056470&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-jdbm2/src/main/scala/org/apache/activemq/apollo/broker/store/jdbm2/JDBM2Client.scala
Fri Jan  7 19:37:34 2011
@@ -48,7 +48,7 @@ object JDBM2Client extends Log {
     def deserialize(in: SerializerInput) = decode_queue_entry_record(in)
   }
 
-  object LobValueSerializer extends Serializer[(Int, Long, Int)] {
+  object ZeroCopyValueSerializer extends Serializer[(Int, Long, Int)] {
     def serialize(out: SerializerOutput, v: (Int,Long, Int)) = {
       out.writePackedInt(v._1)
       out.writePackedLong(v._2)
@@ -148,7 +148,7 @@ class JDBM2Client(store: JDBM2Store) {
   var queues_db:HTree[Long, QueueRecord] = _
   var entries_db:BTree[(Long,Long), QueueEntryRecord] = _
   var messages_db:HTree[Long, MessagePB.Buffer] = _
-  var lobs_db:HTree[Long, (Int, Long, Int)] = _
+  var zerocp_db:HTree[Long, (Int, Long, Int)] = _
   var message_refs_db:HTree[Long, java.lang.Integer] = _
 
   var last_message_key = 0L
@@ -201,7 +201,7 @@ class JDBM2Client(store: JDBM2Store) {
 
     transaction {
       messages_db = init_htree("messages", value_serializer = MessageRecordSerializer)
-      lobs_db = init_htree("lobs", value_serializer = LobValueSerializer)
+      zerocp_db = init_htree("lobs", value_serializer = ZeroCopyValueSerializer)
       message_refs_db = init_htree("message_refs")
       queues_db = init_htree("queues", value_serializer = QueueRecordSerializer)
       entries_db = init_btree("enttries", new QueueEntryKeyComparator, QueueEntryKeySerializer,
QueueEntryRecordSerializer)
@@ -210,7 +210,7 @@ class JDBM2Client(store: JDBM2Store) {
       last_queue_key = Option(recman.getNamedObject("last_queue_key")).map(_.longValue).getOrElse(0L)
 
       if( zero_copy_buffer_allocator!=null ) {
-        lobs_db.cursor { (_,v)=>
+        zerocp_db.cursor { (_,v)=>
           zero_copy_buffer_allocator.alloc_at(v._1, v._2, v._3)
           true
         }
@@ -249,7 +249,9 @@ class JDBM2Client(store: JDBM2Store) {
       if( config.directory.isDirectory ) {
         config.directory.listFiles.filter(_.getName.startsWith("jdbm2.")).foreach(_.delete)
       }
-      zero_copy_dir.delete
+      if( zero_copy_dir.isDirectory ) {
+        zero_copy_dir.listFiles.foreach(_.delete)
+      }
     }
     if( recman!=null ) {
       stop
@@ -304,9 +306,10 @@ class JDBM2Client(store: JDBM2Store) {
         message_refs_db.remove(key)
         messages_db.remove(key)
         if( zero_copy_buffer_allocator!=null ){
-          val location = lobs_db.find(key)
+          val location = zerocp_db.find(key)
           if( location!=null ) {
             zero_copy_buffer_allocator.free(location._1, location._2, location._3)
+            zerocp_db.remove(key)
           }
         }
       }
@@ -356,9 +359,11 @@ class JDBM2Client(store: JDBM2Store) {
             val pb = if( message_record.zero_copy_buffer != null ) {
               val r = to_pb(action.messageRecord).copy
               val buffer = zero_copy_buffer_allocator.to_alloc_buffer(message_record.zero_copy_buffer)
-              lobs_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
+              r.setZcpFile(buffer.file)
+              r.setZcpOffset(buffer.offset)
+              r.setZcpSize(buffer.size)
+              zerocp_db.put(message_record.key, (buffer.file, buffer.offset, buffer.size))
               zcp_files_to_sync += buffer.file
-              r.setDirect(true)
               r.freeze
             } else {
               to_pb(action.messageRecord)
@@ -448,9 +453,8 @@ class JDBM2Client(store: JDBM2Store) {
       val record = metric_load_from_index_counter.time {
         Option(messages_db.find(message_key)).map{ pb=>
           val rc = from_pb(pb)
-          if( pb.getDirect ) {
-            val location = lobs_db.find(message_key)
-            rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(location._1, location._2,
location._3)
+          if( pb.hasZcpFile ) {
+            rc.zero_copy_buffer = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset,
pb.getZcpSize)
           }
           rc
         }
@@ -477,9 +481,18 @@ class JDBM2Client(store: JDBM2Store) {
         }
       }
       streams.using_message_stream { message_stream=>
-        messages_db.cursor { (_, value) =>
-          val record:MessageRecord = value
-          record.writeFramed(message_stream)
+        messages_db.cursor { (_, pb) =>
+          if( pb.hasZcpFile ) {
+            val zcpb = zero_copy_buffer_allocator.view_buffer(pb.getZcpFile, pb.getZcpOffset,
pb.getZcpSize)
+            var data = pb.copy
+            data.clearZcpFile
+            data.clearZcpFile
+            // write the pb frame and then the direct buffer data..
+            data.freeze.writeFramed(message_stream)
+            zcpb.read(message_stream)
+          } else {
+            pb.writeFramed(message_stream)
+          }
           true
         }
       }
@@ -539,11 +552,29 @@ class JDBM2Client(store: JDBM2Store) {
 
         recman.commit
 
+        var zcp_counter = 0
+        val max_ctx = zero_copy_buffer_allocator.contexts.size
+
         streams.using_message_stream { message_stream=>
           foreach[MessagePB.Buffer](message_stream, MessagePB.FACTORY) { pb=>
-            val record:MessageRecord = pb
-            messages_db.put(record.key, record)
-            check_flush(record.size, 1024*124*10)
+
+            val record:MessagePB.Buffer = if( pb.hasZcpSize ) {
+              val cp = pb.copy
+              val zcpb = zero_copy_buffer_allocator.contexts(zcp_counter % max_ctx).alloc(cp.getZcpSize)
+              cp.setZcpFile(zcpb.file)
+              cp.setZcpOffset(zcpb.offset)
+
+              zcp_counter += 1
+              zcpb.write(message_stream)
+
+              zerocp_db.put(pb.getMessageKey, (zcpb.file, zcpb.offset, zcpb.size))
+              cp.freeze
+            } else {
+              pb
+            }
+
+            messages_db.put(record.getMessageKey, record)
+            check_flush(record.getSize, 1024*124*10)
           }
         }
 



Mime
View raw message