activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1056699 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
Date Sat, 08 Jan 2011 12:26:14 GMT
Author: chirino
Date: Sat Jan  8 12:26:13 2011
New Revision: 1056699

URL: http://svn.apache.org/viewvc?rev=1056699&view=rev
Log:
support using mmap buffers to the file to do the transferTo/transferFrom.  OSX implements
transferTo/transferFrom using mmap anyways but not as efficently as it's doing lots of mmap/munmap
which is slower than caching a single mmap buffer and reusing.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala?rev=1056699&r1=1056698&r2=1056699&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/FileZeroCopyBufferAllocator.scala
Sat Jan  8 12:26:13 2011
@@ -20,10 +20,13 @@ import org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch.internal.DispatcherConfig
 import org.fusesource.hawtdispatch.BaseRetained
 import java.nio.channels.{FileChannel, WritableByteChannel, ReadableByteChannel}
-import java.nio.ByteBuffer
 import java.io._
 import org.apache.activemq.apollo.util._
-
+import java.util.concurrent.TimeUnit
+import java.nio.channels.FileChannel.MapMode
+import java.security.{AccessController, PrivilegedAction}
+import java.lang.reflect.Method
+import java.nio.{MappedByteBuffer, ByteBuffer}
 
 /**
  * <p>Tracks allocated space</p>
@@ -239,85 +242,74 @@ class ActiveAllocator(val range:Allocati
 }
 
 /**
- * <p>A ZeroCopyBuffer which was allocated on a file.</p>
+ * <p>The ByteBufferReleaser allows you to more eagerly deallocate byte buffers.</p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait FileZeroCopyBufferTrait extends ZeroCopyBuffer {
-
-  def offset:Long
-  def channel:FileChannel
+object ByteBufferReleaser {
+  val release: (ByteBuffer) => Unit = {
 
-  def remaining(pos: Int): Int = size-pos
-
-  def read(src: Int, target: WritableByteChannel): Int = {
-    assert(retained > 0)
-    val count: Int = remaining(src)
-    assert(count>=0)
-    channel.transferTo(offset+src, count, target).toInt
-  }
-
-  def read(target: OutputStream): Unit = {
-    assert(retained > 0)
-    val b = ByteBuffer.allocate(size.min(1024*4))
-    var pos = 0
-    while( remaining(pos)> 0 ) {
-      val count = channel.read(b, offset+pos)
-      if( count == -1 ) {
-        throw new EOFException()
+    // Try to drill into the java.nio.DirectBuffer internals...
+    AccessController.doPrivileged(new PrivilegedAction[(ByteBuffer) => Unit]() {
+      def run = {
+        try {
+
+          val cleanerMethod = ByteBuffer.allocateDirect(1).getClass().getMethod("cleaner")
+          cleanerMethod.setAccessible(true)
+          val cleanMethod = cleanerMethod.getReturnType().getMethod("clean")
+
+          def clean(buffer: ByteBuffer):Unit = {
+            try {
+              val cleaner = cleanerMethod.invoke(buffer)
+              if (cleaner != null) {
+                cleanMethod.invoke(cleaner)
+              }
+            } catch {
+              case e: Throwable => e.printStackTrace
+            }
+          }
+
+          clean _
+        } catch {
+          case _ =>
+            def noop(buffer: ByteBuffer):Unit = { }
+            noop _
+        }
       }
-      target.write(b.array, 0, count)
-      pos += count
-      b.clear
-    }
+    })
   }
+}
 
-  def write(src: ReadableByteChannel, target:Int): Int = {
-    assert(retained > 0)
-    val count: Int = remaining(target)
-    assert(count>=0)
-    channel.transferFrom(src, offset+target, count).toInt
-  }
+object FileZeroCopyBufferAllocator {
+  val OS = System.getProperty("os.name").toLowerCase
 
-  def write(src: ByteBuffer, target: Int): Int = {
-    assert(retained > 0)
-    val diff = src.remaining - remaining(target)
-    if( diff > 0 ) {
-      src.limit(src.limit-diff)
-    }
-    try {
-      channel.write(src, offset+target).toInt
-    } finally {
-      if( diff > 0 ) {
-        src.limit(src.limit+diff)
-      }
+  val MMAP_TRANSFER_TO = Option(System.getProperty("apollo.MMAP_TRANSFER_TO")).map(_ == "true").getOrElse{
+    // System prop is not set.. lets pick a good default based on OS
+    if( OS.startsWith("mac") ) {
+      // mmap is faster on the mac than the FileChannel.transferTo call.
+      true
+    } else {
+      false
     }
   }
-
-  def write(target: InputStream): Unit = {
-    assert(retained > 0)
-    val b = ByteBuffer.allocate(size.min(1024*4))
-    var pos = 0
-    while( remaining(pos)> 0 ) {
-      val max = remaining(pos).min(b.capacity)
-      b.clear
-      val count = target.read(b.array, 0, max)
-      if( count == -1 ) {
-        throw new EOFException()
-      }
-      val x = channel.write(b)
-      assert(x == count)
-      pos += count
+  val MMAP_TRANSFER_FROM = Option(System.getProperty("apollo.MMAP_TRANSFER_FROM")).map(_
== "true").getOrElse{
+    // System prop is not set.. lets pick a good default based on OS
+    if( OS.startsWith("mac") ) {
+      false
+    } else {
+      false
     }
   }
 }
 
+
 /**
  * <p>A ZeroCopyBufferAllocator which allocates on files.</p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class FileZeroCopyBufferAllocator(val directory:File) extends ZeroCopyBufferAllocator {
+  import FileZeroCopyBufferAllocator._
 
   // we use thread local allocators to
   class AllocatorContext(val id:Int) {
@@ -340,12 +332,140 @@ class FileZeroCopyBufferAllocator(val di
       }
     }
 
+    var _mmap:MappedByteBuffer = _
+
+    def mmap_slice(offset:Long, size:Int) = {
+      if( _mmap == null ) {
+        _mmap = channel.map(MapMode.READ_WRITE, 0, current_size)
+      }
+
+      // remaps more of the file when needed.
+      if( _mmap.capacity < offset+size ) {
+        assert(current_size >= offset+size)
+        ByteBufferReleaser.release(_mmap)
+
+        val grow = 1024*1024*64
+        _mmap = channel.map(MapMode.READ_WRITE, 0, current_size+grow)
+
+        // initialize the grown part...
+        _mmap.position(current_size.toInt)
+        while(_mmap.hasRemaining) {
+          _mmap.put(0.toByte)
+        }
+        current_size += grow
+        _mmap.clear
+      }
+
+      _mmap.position(offset.toInt)
+      _mmap.limit(offset.toInt+size)
+      val slice = _mmap.slice
+      _mmap.clear
+      slice
+    }
+
     def sync = {
+      if( MMAP_TRANSFER_FROM && _mmap!=null ) {
+        _mmap.force
+      }
       channel.force(size_changed)
     }
 
-    class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileZeroCopyBufferTrait
{
-      def channel: FileChannel = AllocatorContext.this.channel
+    /**
+     * <p>A ZeroCopyBuffer which was allocated on a file.</p>
+     *
+     * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+     */
+    trait FileZeroCopyBufferTrait extends BaseRetained with ZeroCopyBuffer {
+
+      def offset:Long
+
+      def remaining(pos: Int): Int = size-pos
+
+      def time[T](name:String)(func: =>T):T = {
+        val c = new TimeCounter
+        try {
+          c.time(func)
+        } finally {
+          println("%s: %.2f".format(name, c.apply(true).maxTime(TimeUnit.MILLISECONDS)))
+        }
+      }
+
+      def read(src: Int, target: WritableByteChannel): Int = {
+        assert(retained > 0)
+        val count: Int = remaining(src)
+        assert(count>=0)
+
+        if( MMAP_TRANSFER_TO ) {
+          val buffer = mmap_slice(offset+src, count)
+          target.write(buffer)
+        } else {
+          channel.transferTo(offset+src, count, target).toInt
+        }
+      }
+
+      def read(target: OutputStream): Unit = {
+        assert(retained > 0)
+        val b = ByteBuffer.allocate(size.min(1024*4))
+        var pos = 0
+        while( remaining(pos)> 0 ) {
+          val count = channel.read(b, offset+pos)
+          if( count == -1 ) {
+            throw new EOFException()
+          }
+          target.write(b.array, 0, count)
+          pos += count
+          b.clear
+        }
+      }
+
+      def write(src: ReadableByteChannel, target:Int): Int = {
+        assert(retained > 0)
+        val count: Int = remaining(target)
+        assert(count>=0)
+
+        if( MMAP_TRANSFER_FROM ) {
+          val buffer = mmap_slice(offset+target, count)
+          src.read(buffer)
+        } else {
+          channel.transferFrom(src, offset+target, count).toInt
+        }
+
+      }
+
+      def write(src: ByteBuffer, target: Int): Int = {
+        assert(retained > 0)
+        val diff = src.remaining - remaining(target)
+        if( diff > 0 ) {
+          src.limit(src.limit-diff)
+        }
+        try {
+          channel.write(src, offset+target).toInt
+        } finally {
+          if( diff > 0 ) {
+            src.limit(src.limit+diff)
+          }
+        }
+      }
+
+      def write(target: InputStream): Unit = {
+        assert(retained > 0)
+        val b = ByteBuffer.allocate(size.min(1024*4))
+        var pos = 0
+        while( remaining(pos)> 0 ) {
+          val max = remaining(pos).min(b.capacity)
+          b.clear
+          val count = target.read(b.array, 0, max)
+          if( count == -1 ) {
+            throw new EOFException()
+          }
+          val x = channel.write(b)
+          assert(x == count)
+          pos += count
+        }
+      }
+    }
+
+    class AllocationBuffer(val allocation:Allocation) extends FileZeroCopyBufferTrait {
 
       def file = id
       def offset: Long = allocation.offset
@@ -367,6 +487,14 @@ class FileZeroCopyBufferAllocator(val di
       current_size = current_size.max(allocation.offset + allocation.size)
       new AllocationBuffer(allocation)
     }
+
+    def view_buffer(the_offset:Long, the_size:Int):ZeroCopyBuffer = {
+      new FileZeroCopyBufferTrait {
+        def offset: Long = the_offset
+        def size: Int = the_size
+      }
+    }
+
   }
 
   def to_alloc_buffer(buffer:ZeroCopyBuffer) = buffer.asInstanceOf[AllocatorContext#AllocationBuffer]
@@ -414,12 +542,7 @@ class FileZeroCopyBufferAllocator(val di
   }
 
   def view_buffer(file:Int, the_offset:Long, the_size:Int):ZeroCopyBuffer = {
-    val the_channel = contexts.get(file).get.channel
-    new BaseRetained with FileZeroCopyBufferTrait {
-      def offset: Long = the_offset
-      def size: Int = the_size
-      val channel: FileChannel = the_channel
-    }
+    contexts.get(file).get.view_buffer(the_offset, the_size)
   }
 
   def context(i:Int)(func: (AllocatorContext)=>Unit):Unit= {



Mime
View raw message