activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1056071 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-hawtdb/src/main/resources/META-INF/services/org.apac...
Date Thu, 06 Jan 2011 20:56:18 GMT
Author: chirino
Date: Thu Jan  6 20:56:17 2011
New Revision: 1056071

URL: http://svn.apache.org/viewvc?rev=1056071&view=rev
Log:
Re structuring how direct buffers work.

Added:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
      - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
      - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
      - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
      - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java
Removed:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueStatus.java
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pool-factory.index
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBDirectBufferPool.scala
    activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBDirectBufferPoolFactory.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/DirectBufferPool.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/DirectBufferPoolFactory.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Thu Jan  6 20:56:17 2011
@@ -22,7 +22,6 @@ import _root_.scala.collection.JavaConve
 import org.fusesource.hawtdispatch._
 
 import java.util.concurrent.TimeUnit
-import org.apache.activemq.apollo.broker.store.{Store, StoreFactory}
 import org.apache.activemq.apollo.util._
 import path.PathFilter
 import ReporterLevel._
@@ -33,6 +32,7 @@ import org.apache.activemq.apollo.util.O
 import org.apache.activemq.apollo.util.path.{Path, PathParser}
 import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO, VirtualHostDTO}
 import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer}
+import org.apache.activemq.apollo.broker.store.{DirectBufferAllocator, Store, StoreFactory}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -81,7 +81,6 @@ class VirtualHost(val broker: Broker, va
   var names:List[String] = Nil;
 
   var store:Store = null
-  var direct_buffer_pool:DirectBufferPool = null
   val queue_id_counter = new LongCounter
 
   val session_counter = new AtomicLong(0)
@@ -106,7 +105,6 @@ class VirtualHost(val broker: Broker, va
     }
   } |>>: dispatch_queue
 
-
   override protected def _start(on_completed:Runnable):Unit = {
 
     val tracker = new LoggingTracker("virtual host startup", dispatch_queue)
@@ -129,19 +127,6 @@ class VirtualHost(val broker: Broker, va
 
     store = StoreFactory.create(config.store)
 
-    //    val memory_pool_config: String = null
-    var direct_buffer_pool_config: String = "hawtdb:activemq.tmp"
-
-    if( direct_buffer_pool_config!=null &&  (store!=null && !store.supports_direct_buffers)
) {
-      warn("The direct buffer pool will not be used because the configured store does not
support them.")
-      direct_buffer_pool_config = null
-    }
-
-    if( direct_buffer_pool_config!=null ) {
-      direct_buffer_pool = DirectBufferPoolFactory.create(direct_buffer_pool_config)
-      direct_buffer_pool.start
-    }
-
     if( store!=null ) {
       store.configure(config.store, LoggingReporter(VirtualHost))
       val store_startup_done = tracker.task("store startup")
@@ -204,11 +189,6 @@ class VirtualHost(val broker: Broker, va
     router.queues.valuesIterator.foreach { queue=>
       tracker.stop(queue)
     }
-    if( direct_buffer_pool!=null ) {
-      direct_buffer_pool.stop
-      direct_buffer_pool = null
-    }
-
     if( store!=null ) {
       tracker.stop(store);
     }

Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala?rev=1056071&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
(added)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala
Thu Jan  6 20:56:17 2011
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker.store
+
+import org.fusesource.hawtdispatch._
+import java.nio.channels.{FileChannel, WritableByteChannel, ReadableByteChannel}
+import java.nio.ByteBuffer
+import java.io._
+import org.apache.activemq.apollo.util._
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait DirectBufferAllocator {
+  def alloc(size:Int):DirectBuffer
+}
+
+/**
+ * <p>
+ * A DirectBuffer is a reference counted buffer on
+ * temp storage designed to be accessed with direct io.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait DirectBuffer extends Retained {
+
+  def size:Int
+
+  def remaining(from_position: Int): Int
+
+  def read(target: OutputStream):Unit
+
+  def read(src: Int, target: WritableByteChannel): Int
+
+  def write(src:ReadableByteChannel, target:Int): Int
+
+  def write(src:ByteBuffer, target:Int):Int
+
+  def link(target:File):Long
+}
+
+trait FileDirectBufferTrait extends DirectBuffer {
+
+  def offset:Long
+  def channel:FileChannel
+
+  def remaining(pos: Int): Int = size-pos
+
+  def read(src: Int, target: WritableByteChannel): Int = {
+    assert(retained > 0)
+    val count: Int = remaining(src)
+    assert(count>=0)
+    channel.transferTo(offset+src, count, target).toInt
+  }
+
+  def read(target: OutputStream): Unit = {
+    assert(retained > 0)
+    val b = ByteBuffer.allocate(size.min(1024*4))
+    var pos = 0
+    while( remaining(pos)> 0 ) {
+      val count = channel.read(b, offset+pos)
+      if( count == -1 ) {
+        throw new EOFException()
+      }
+      target.write(b.array, 0, count)
+      pos += count
+      b.clear
+    }
+  }
+
+  def write(src: ReadableByteChannel, target:Int): Int = {
+    assert(retained > 0)
+    val count: Int = remaining(target)
+    assert(count>=0)
+    channel.transferFrom(src, offset+target, count).toInt
+  }
+
+  def write(src: ByteBuffer, target: Int): Int = {
+    assert(retained > 0)
+    val diff = src.remaining - remaining(target)
+    if( diff > 0 ) {
+      src.limit(src.limit-diff)
+    }
+    try {
+      channel.write(src, offset+target).toInt
+    } finally {
+      if( diff > 0 ) {
+        src.limit(src.limit+diff)
+      }
+    }
+  }
+
+  def link(target: File): Long = {
+    assert(retained > 0)
+    // TODO: implement with a real file system hard link
+    // to get copy on write goodness.
+    import FileSupport._
+    using(new FileOutputStream(target).getChannel) { target=>
+      val count = channel.transferTo(offset, size, target)
+      assert( count == size )
+    }
+    return 0;
+  }
+}
+
+case class Allocation(size:Long, offset:Long) extends Ordered[Allocation] {
+
+  var _free_func: (Allocation)=>Unit = _
+
+  def free() = {
+    _free_func(this)
+  }
+
+  def compare(that: Allocation): Int = {
+    var rc = longWrapper(size).compareTo(that.size)
+    if( rc!=0 ) {
+      rc
+    } else {
+      longWrapper(offset).compareTo(that.offset)
+    }
+  }
+
+  // split the allocation..
+  def split(request:Long):(Allocation, Allocation) = {
+    assert(request < size)
+    var first = Allocation(offset, request)
+    var second = Allocation(offset+request, size-request)
+    (first, second)
+  }
+
+  // join the allocation..
+  def join(that:Allocation):Allocation = {
+    assert( that.offset == offset+size)
+    Allocation(offset, size+that.size)
+  }
+
+}
+
+trait Allocator {
+  def alloc(request:Long):Allocation
+
+  def chain(that:Allocator):Allocator = new Allocator() {
+    def alloc(request: Long): Allocation = {
+      val rc = Allocator.this.alloc(request)
+      if( rc == null ) {
+        that.alloc(request)
+      } else {
+        rc
+      }
+    }
+  }
+}
+
+class TreeAllocator(range:Allocation) extends Allocator {
+
+  // list of the free allocation areas.  Sorted by size then offset
+  val free_by_size = new TreeMap[Allocation, Zilch]()
+  // list of the free allocation areas sorted by offset.
+  val free_by_offset = new TreeMap[Long, Allocation]()
+
+  {
+    val allocation = range.copy()
+    free_by_offset.put(allocation.offset, allocation)
+    free_by_size.put(allocation, null)
+  }
+
+  def alloc(request:Long):Allocation = {
+    var spot_entry = free_by_size.ceilingEntry(Allocation(request, 0))
+    if( spot_entry== null ) {
+      return null
+    }
+
+    val allocation = spot_entry.getKey
+    free_by_size.remove(allocation)
+
+    // might be the perfect size
+    if( allocation.size == request ) {
+      allocation._free_func = free
+      allocation
+    } else {
+      // split the allocation..
+      var (first, second) = allocation.split(request)
+
+      free_by_offset.remove(first.offset)
+      free_by_offset.put(second.offset, second)
+
+      // put the free part in the free map.
+      free_by_size.put(second, null)
+
+      first._free_func = free
+      first
+    }
+  }
+
+  def free(allocation:Allocation):Unit = {
+
+
+    var prev_e = free_by_offset.floorEntry(allocation.offset)
+    var next_e = if( prev_e!=null ) {
+      prev_e.next
+    } else {
+      free_by_offset.ceilingEntry(allocation.offset)
+    }
+
+    val prev = Option(prev_e).map(_.getValue).map( a=> if(a.offset+a.size == allocation.offset)
a else null ).getOrElse(null)
+    val next = Option(prev_e).map(_.getValue).map( a=> if(allocation.offset+allocation.size
== a.offset) a else null ).getOrElse(null)
+
+    (prev, next) match {
+      case (null, null)=>
+        allocation._free_func = null
+        free_by_size.put(allocation, null)
+        free_by_offset.put(allocation.offset, allocation)
+
+      case (prev, null)=>
+        val joined = prev.join(allocation)
+        free_by_size.remove(prev)
+        free_by_size.put(joined, null)
+        free_by_offset.put(joined.offset, joined)
+
+      case (null, next)=>
+        val joined = allocation.join(next)
+        free_by_size.remove(next)
+        free_by_size.put(joined, null)
+
+        free_by_offset.remove(next.offset)
+        free_by_offset.put(joined.offset, joined)
+
+      case (prev, next)=>
+        val joined = prev.join(allocation.join(next))
+        free_by_size.remove(prev)
+        free_by_size.remove(next)
+        free_by_size.put(joined, null)
+
+        free_by_offset.remove(next.offset)
+        free_by_offset.put(joined.offset, joined)
+    }
+  }
+}
+
+/**
+ * Helps minimize the active page set.
+ */
+class ActiveAllocator(val range:Allocation) extends Allocator {
+
+  // the cold allocated start with all the free space..
+  val inactive = new TreeAllocator(range)
+
+  // the hot is clear of any free space.
+  val active = new TreeAllocator(range)
+
+  active.free_by_offset.clear
+  active.free_by_size.clear
+
+  // allocate out of the hot area first since
+  // that should result in less vm swapping
+  val chain = active.chain(inactive)
+
+  def alloc(request:Long):Allocation = {
+    var rc = chain.alloc(request)
+    if( rc!=null ) {
+      rc._free_func = free
+    }
+    rc
+  }
+
+  def free(allocation:Allocation):Unit = {
+    // put stuff back in the hot tree.
+    active.free(allocation)
+  }
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class FileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator {
+
+  // we use thread local allocators to
+  class AllocatorContext(val queue:DispatchQueue) {
+
+    val allocator = new TreeAllocator(Allocation(0, Long.MaxValue))
+    var channel:FileChannel = new RandomAccessFile(queue.getLabel, "rw").getChannel
+
+    class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileDirectBufferTrait
{
+      def channel: FileChannel = AllocatorContext.this.channel
+      def offset: Long = allocation.offset
+      def size: Int = allocation.size.toInt
+
+      override def dispose: Unit = {
+        super.dispose
+        // since we might not get disposed from the same thread
+        // that did the allocation..
+        queue <<| ^{
+          allocation.free()
+        }
+      }
+    }
+
+    def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+      val allocation = allocator.alloc(size)
+      assert(allocation!=null)
+      new AllocationBuffer(allocation)
+    }
+  }
+
+  val _current_allocator_context = new ThreadLocal[AllocatorContext]()
+
+  protected def start() = {
+    directory.mkdirs
+  }
+
+  def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=>
+    ctx.alloc(size)
+  }
+
+  def with_allocator_context[T](func: (AllocatorContext)=>T):T = {
+    if( getCurrentThreadQueue == null ) {
+      getGlobalQueue().future(func(current_allocator_context))()
+    } else {
+      func(current_allocator_context)
+    }
+  }
+
+  def current_allocator_context:AllocatorContext = {
+    val thread_queue = getCurrentThreadQueue
+    assert(thread_queue != null)
+    var rc = _current_allocator_context.get
+    if( rc==null ) {
+      rc = new AllocatorContext(thread_queue)
+      _current_allocator_context.set(rc)
+    }
+    rc
+  }
+}
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
(from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala
Thu Jan  6 20:56:17 2011
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
@@ -14,22 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
 
-import org.apache.activemq.apollo.util.DirectBuffer;
-import org.fusesource.hawtbuf.AsciiBuffer;
-import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.AsciiBuffer
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class MessageRecord {
+class MessageRecord {
 
-    public long key = -1;
-    public AsciiBuffer protocol;
-    public int size;
-    public Buffer buffer;
-    public DirectBuffer direct_buffer = null;
-    public long expiration = 0;
+  var key = -1L
+  var protocol: AsciiBuffer = _
+  var size = 0
+  var buffer: Buffer = _
+  var direct_buffer: DirectBuffer = _
+  var expiration = 0L
 
 }

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
(from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala
Thu Jan  6 20:56:17 2011
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
@@ -14,14 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueEntryRange {
-    public long first_entry_seq;
-    public long last_entry_seq;
-    public int count;
-    public int size;
+class QueueEntryRange {
+  var first_entry_seq = 0L
+  var last_entry_seq = 0L
+  var count = 0
+  var size = 0
 }
\ No newline at end of file

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
(from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala
Thu Jan  6 20:56:17 2011
@@ -3,7 +3,7 @@
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
+ * (the "License") you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
  *      http://www.apache.org/licenses/LICENSE-2.0
@@ -14,20 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
 
-import org.fusesource.hawtbuf.Buffer;
+
+
+import org.fusesource.hawtbuf.Buffer
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueEntryRecord {
+class QueueEntryRecord {
 
-    public long queue_key;
-    public long entry_seq;
-    public long message_key;
-    public Buffer attachment;
-    public int size;
-    public short redeliveries;
+  var queue_key = 0L
+  var entry_seq = 0L
+  var message_key = 0L
+  var attachment:Buffer = _
+  var size = 0
+  var redeliveries:Short = 0
 
 }

Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
(from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala
Thu Jan  6 20:56:17 2011
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.apollo.broker.store;
+package org.apache.activemq.apollo.broker.store
+
+;
 
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
@@ -23,8 +25,8 @@ import org.fusesource.hawtbuf.Buffer;
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-public class QueueRecord {
-    public long key = -1;
-    public AsciiBuffer binding_kind;
-    public Buffer binding_data;
+class QueueRecord {
+  var key = -1L
+  var binding_kind: AsciiBuffer = _
+  var binding_data: Buffer = _
 }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Thu Jan  6 20:56:17 2011
@@ -43,7 +43,7 @@ trait Store extends ServiceTrait {
    * @returns true if the store implementation can handle accepting
    *          MessageRecords with DirectBuffers in them.
    */
-  def supports_direct_buffers() = false
+  def direct_buffer_allocator():DirectBufferAllocator = null
 
   /**
    * Creates a store uow which is used to perform persistent

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Thu Jan  6 20:56:17 2011
@@ -27,10 +27,10 @@ import _root_.scala.collection.JavaConve
 import java.io.{EOFException, DataOutput, DataInput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 import org.apache.activemq.apollo.transport._
-import org.apache.activemq.apollo.broker.store.MessageRecord
 import _root_.org.fusesource.hawtbuf._
 import Buffer._
 import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord}
 
 object StompCodec extends Log {
     val READ_BUFFFER_SIZE = 1024*64;
@@ -157,7 +157,7 @@ class StompCodec extends ProtocolCodec w
   import StompCodec._
   override protected def log: Log = StompCodec
 
-  var memory_pool:DirectBufferPool = null
+  var direct_buffer_allocator:DirectBufferAllocator = null
 
   implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length);
   implicit def wrap(x: Byte) = {
@@ -178,12 +178,11 @@ class StompCodec extends ProtocolCodec w
   var write_channel:WritableByteChannel = null
 
   var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size)
-  var next_write_direct:ByteBuffer = null
-  var next_write_direct_frame:StompFrame = null
+  var next_write_direct:DirectBuffer = null
 
   var write_buffer = ByteBuffer.allocate(0)
-  var write_direct:ByteBuffer = null
-  var write_direct_frame:StompFrame = null
+  var write_direct:DirectBuffer = null
+  var write_direct_pos = 0
 
   def is_full = next_write_direct!=null || next_write_buffer.size() >= (write_buffer_size
>> 2)
   def is_empty = write_buffer.remaining() == 0 && write_direct==null
@@ -252,9 +251,8 @@ class StompCodec extends ProtocolCodec w
 
       frame.content match {
         case x:DirectContent=>
-          next_write_direct = x.direct_buffer.buffer.duplicate
-          next_write_direct.clear
-          next_write_direct_frame = frame
+          assert(next_write_direct==null)
+          next_write_direct = x.direct_buffer
         case x:BufferContent=>
           x.content.writeTo(os)
           END_OF_FRAME_BUFFER.writeTo(os)
@@ -272,25 +270,28 @@ class StompCodec extends ProtocolCodec w
       write_counter += write_channel.write(write_buffer)
     }
     if ( write_buffer.remaining() == 0 && write_direct!=null ) {
-      write_counter += write_channel.write(write_direct)
-      if( write_direct.remaining() == 0 ) {
+      val count = write_direct.read(write_direct_pos, write_channel)
+      write_direct_pos += count
+      write_counter += count
+
+      if( write_direct.remaining(write_direct_pos) == 0 ) {
+        write_direct.release
         write_direct = null
-        write_direct_frame.release
-        write_direct_frame = null
+        write_direct_pos = 0
+
+        write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data)
       }
     }
 
     // if it is now empty try to refill...
-    if ( is_empty && next_write_buffer.size()!=0 ) {
+    if ( is_empty && write_direct==null ) {
         // size of next buffer is based on how much was used in the previous buffer.
         val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size)
         write_buffer = next_write_buffer.toBuffer().toByteBuffer()
         write_direct = next_write_direct
-        write_direct_frame = next_write_direct_frame
 
         next_write_buffer = new DataByteArrayOutputStream(prev_size)
         next_write_direct = null
-        next_write_direct_frame = null
     }
 
     if ( is_empty ) {
@@ -317,6 +318,10 @@ class StompCodec extends ProtocolCodec w
   var read_buffer = ByteBuffer.allocate(read_buffer_size)
   var read_end = 0
   var read_start = 0
+
+  var read_direct:DirectBuffer = null
+  var read_direct_pos = 0
+
   var next_action:FrameReader = read_action
 
   def setReadableByteChannel(channel: ReadableByteChannel) = {
@@ -339,7 +344,13 @@ class StompCodec extends ProtocolCodec w
     var command:Object = null
     while( command==null ) {
       // do we need to read in more data???
-      if (read_end == read_buffer.position()) {
+      if( read_direct!=null && read_direct.remaining(read_direct_pos) > 0) {
+        val count = read_direct.write(read_channel, read_direct_pos)
+        if (count == -1) {
+            throw new EOFException("Peer disconnected")
+        }
+        read_direct_pos += count
+      } else if (read_end == read_buffer.position() ) {
 
           // do we need a new data buffer to read data into??
           if (read_buffer.remaining() == 0) {
@@ -469,40 +480,23 @@ class StompCodec extends ProtocolCodec w
           // lets try to keep the content of big message outside of the JVM's garbage collection
           // to keep the number of GCs down when moving big messages.
           def is_message = action == SEND || action == MESSAGE
-          if( length > 1024 && memory_pool!=null && is_message) {
-
-            val ma = memory_pool.alloc(length+1)
+          if( length > 1024 && direct_buffer_allocator!=null && is_message)
{
 
-            val read_limit = buffer.position
-            if( (read_limit-read_start) < length+1 ) {
-              // buffer did not contain the fully stomp body
+            read_direct = direct_buffer_allocator.alloc(length)
 
-              ma.buffer.put( buffer.array, read_start, read_limit-read_start )
+            val dup = buffer.duplicate
+            dup.position(read_start)
+            dup.limit(buffer.position)
 
-              read_buffer = ma.buffer
-              read_end = read_limit-read_start
-              read_start = 0
-
-              next_action = read_binary_body_direct(action, headers, ma)
-
-            } else {
-              // The current buffer already read in all the data...
-
-              if( buffer.array()(read_start+length)!= 0 ) {
-                 throw new IOException("Expected null termintor after "+length+" content
bytes")
-              }
+            // copy in the body the was read so far...
+            read_direct_pos = read_direct.write(dup, 0)
 
-              // copy the body out to the direct buffer
-              ma.buffer.put( buffer.array, read_start, read_limit-read_start )
-
-              // and reposition to reuse non-direct space.
-              buffer.position(read_start)
-              read_end = read_start
-
-              next_action = read_action
-              rc = new StompFrame(ascii(action), headers.toList, DirectContent(ma))
-            }
+            // since it was copied.. reposition to re-use the copied area..
+            dup.compact
+            buffer.position(buffer.position - read_direct_pos)
+            read_end = read_start
 
+            next_action = read_binary_body_direct(action, headers, length)
           } else {
             next_action = read_binary_body(action, headers, length)
           }
@@ -526,9 +520,17 @@ class StompCodec extends ProtocolCodec w
     None
   }
 
+  def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader
= (buffer)=> {
+    if( read_direct.remaining(read_direct_pos)==0 ) {
+      next_action = read_direct_terminator(action, headers, contentLength, read_direct)
+      read_direct = null
+      read_direct_pos = 0
+    }
+    null
+  }
 
-  def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:DirectBuffer):FrameReader
= (buffer)=> {
-    if( read_content_direct(ma) ) {
+  def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int,
ma:DirectBuffer):FrameReader = (buffer)=> {
+    if( read_frame_terminator(buffer, contentLength) ) {
       next_action = read_action
       new StompFrame(ascii(action), headers.toList, DirectContent(ma))
     } else {
@@ -536,22 +538,17 @@ class StompCodec extends ProtocolCodec w
     }
   }
 
-  def read_content_direct(ma:DirectBuffer) = {
-      val read_limit = ma.buffer.position
-      if( read_limit < ma.size ) {
+  def read_frame_terminator(buffer:ByteBuffer, contentLength:Int):Boolean = {
+      val read_limit = buffer.position
+      if( (read_limit-read_start) < 1 ) {
         read_end = read_limit
         false
       } else {
-        ma.buffer.position(ma.size-1)
-        if( ma.buffer.get != 0 ) {
-           throw new IOException("Expected null termintor after "+(ma.size-1)+" content bytes")
+        if( buffer.array()(read_start)!= 0 ) {
+           throw new IOException("Expected null termintor after "+contentLength+" content
bytes")
         }
-        ma.buffer.rewind
-        ma.buffer.limit(ma.size-1)
-
-        read_buffer = ByteBuffer.allocate(read_buffer_size)
-        read_end = 0
-        read_start = 0
+        read_end = read_start+1
+        read_start = read_end
         true
       }
   }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Thu Jan  6 20:56:17 2011
@@ -25,6 +25,7 @@ import org.apache.activemq.apollo.broker
 import org.apache.activemq.apollo.util._
 import org.fusesource.hawtdispatch.BaseRetained
 import java.io.{OutputStream, DataOutput}
+import org.apache.activemq.apollo.broker.store.DirectBuffer
 
 /**
  *
@@ -207,11 +208,10 @@ case class DirectContent(direct_buffer:D
 
   def writeTo(os:OutputStream) = {
     val buff = new Array[Byte](1024*4)
-    val source = direct_buffer.buffer.duplicate
     var remaining = direct_buffer.size-1
     while( remaining> 0 ) {
       val c = remaining.min(buff.length)
-      source.get(buff, 0, c)
+      direct_buffer.read(os)
       os.write(buff, 0, c)
       remaining -= c
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1056071&r1=1056070&r2=1056071&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Jan  6 20:56:17 2011
@@ -571,9 +571,9 @@ class StompProtocolHandler extends Proto
 
       connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList))
 
-      if( this.host.direct_buffer_pool!=null ) {
+      if( this.host.store!=null && this.host.store.direct_buffer_allocator!=null
) {
         val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
-        wf.memory_pool = this.host.direct_buffer_pool
+        wf.direct_buffer_allocator = this.host.store.direct_buffer_allocator
       }
     }
 



Mime
View raw message