Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 26400 invoked from network); 6 Jan 2011 20:56:45 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 6 Jan 2011 20:56:45 -0000 Received: (qmail 30866 invoked by uid 500); 6 Jan 2011 20:56:45 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 30804 invoked by uid 500); 6 Jan 2011 20:56:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 30797 invoked by uid 99); 6 Jan 2011 20:56:45 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jan 2011 20:56:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jan 2011 20:56:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F232223889B3; Thu, 6 Jan 2011 20:56:18 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1056071 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-hawtdb/src/main/resources/META-INF/services/org.apac... Date: Thu, 06 Jan 2011 20:56:18 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110106205618.F232223889B3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Thu Jan 6 20:56:17 2011 New Revision: 1056071 URL: http://svn.apache.org/viewvc?rev=1056071&view=rev Log: Re structuring how direct buffers work. Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala - copied, changed from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java Removed: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueStatus.java activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/direct-buffer-pool-factory.index activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBDirectBufferPool.scala activemq/activemq-apollo/trunk/apollo-hawtdb/src/main/scala/org/apache/activemq/apollo/broker/store/hawtdb/HawtDBDirectBufferPoolFactory.scala activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/DirectBufferPool.scala activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/DirectBufferPoolFactory.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1056071&r1=1056070&r2=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Thu Jan 6 20:56:17 2011 @@ -22,7 +22,6 @@ import _root_.scala.collection.JavaConve import org.fusesource.hawtdispatch._ import java.util.concurrent.TimeUnit -import org.apache.activemq.apollo.broker.store.{Store, StoreFactory} import org.apache.activemq.apollo.util._ import path.PathFilter import ReporterLevel._ @@ -33,6 +32,7 @@ import org.apache.activemq.apollo.util.O import org.apache.activemq.apollo.util.path.{Path, PathParser} import org.apache.activemq.apollo.dto.{DestinationDTO, QueueDTO, BindingDTO, VirtualHostDTO} import security.{AclAuthorizer, JaasAuthenticator, Authenticator, Authorizer} +import org.apache.activemq.apollo.broker.store.{DirectBufferAllocator, Store, StoreFactory} /** * @author Hiram Chirino @@ -81,7 +81,6 @@ class VirtualHost(val broker: Broker, va var names:List[String] = Nil; var store:Store = null - var direct_buffer_pool:DirectBufferPool = null val queue_id_counter = new LongCounter val session_counter = new AtomicLong(0) @@ -106,7 +105,6 @@ class VirtualHost(val broker: Broker, va } } |>>: dispatch_queue - override protected def _start(on_completed:Runnable):Unit = { val tracker = new LoggingTracker("virtual host startup", dispatch_queue) @@ -129,19 +127,6 @@ class VirtualHost(val broker: Broker, va store = StoreFactory.create(config.store) - // val memory_pool_config: String = null - var direct_buffer_pool_config: String = "hawtdb:activemq.tmp" - - if( direct_buffer_pool_config!=null && (store!=null && !store.supports_direct_buffers) ) { - warn("The direct buffer pool will not be used because the configured store does not support them.") - direct_buffer_pool_config = null - } - - if( direct_buffer_pool_config!=null ) { - direct_buffer_pool = DirectBufferPoolFactory.create(direct_buffer_pool_config) - direct_buffer_pool.start - } - if( store!=null ) { store.configure(config.store, LoggingReporter(VirtualHost)) val store_startup_done = tracker.task("store startup") @@ -204,11 +189,6 @@ class VirtualHost(val broker: Broker, va router.queues.valuesIterator.foreach { queue=> tracker.stop(queue) } - if( direct_buffer_pool!=null ) { - direct_buffer_pool.stop - direct_buffer_pool = null - } - if( store!=null ) { tracker.stop(store); } Added: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala?rev=1056071&view=auto ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala (added) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DirectBufferAllocator.scala Thu Jan 6 20:56:17 2011 @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.apollo.broker.store + +import org.fusesource.hawtdispatch._ +import java.nio.channels.{FileChannel, WritableByteChannel, ReadableByteChannel} +import java.nio.ByteBuffer +import java.io._ +import org.apache.activemq.apollo.util._ +/** + *

+ *

+ * + * @author Hiram Chirino + */ +trait DirectBufferAllocator { + def alloc(size:Int):DirectBuffer +} + +/** + *

+ * A DirectBuffer is a reference counted buffer on + * temp storage designed to be accessed with direct io. + *

+ * + * @author Hiram Chirino + */ +trait DirectBuffer extends Retained { + + def size:Int + + def remaining(from_position: Int): Int + + def read(target: OutputStream):Unit + + def read(src: Int, target: WritableByteChannel): Int + + def write(src:ReadableByteChannel, target:Int): Int + + def write(src:ByteBuffer, target:Int):Int + + def link(target:File):Long +} + +trait FileDirectBufferTrait extends DirectBuffer { + + def offset:Long + def channel:FileChannel + + def remaining(pos: Int): Int = size-pos + + def read(src: Int, target: WritableByteChannel): Int = { + assert(retained > 0) + val count: Int = remaining(src) + assert(count>=0) + channel.transferTo(offset+src, count, target).toInt + } + + def read(target: OutputStream): Unit = { + assert(retained > 0) + val b = ByteBuffer.allocate(size.min(1024*4)) + var pos = 0 + while( remaining(pos)> 0 ) { + val count = channel.read(b, offset+pos) + if( count == -1 ) { + throw new EOFException() + } + target.write(b.array, 0, count) + pos += count + b.clear + } + } + + def write(src: ReadableByteChannel, target:Int): Int = { + assert(retained > 0) + val count: Int = remaining(target) + assert(count>=0) + channel.transferFrom(src, offset+target, count).toInt + } + + def write(src: ByteBuffer, target: Int): Int = { + assert(retained > 0) + val diff = src.remaining - remaining(target) + if( diff > 0 ) { + src.limit(src.limit-diff) + } + try { + channel.write(src, offset+target).toInt + } finally { + if( diff > 0 ) { + src.limit(src.limit+diff) + } + } + } + + def link(target: File): Long = { + assert(retained > 0) + // TODO: implement with a real file system hard link + // to get copy on write goodness. + import FileSupport._ + using(new FileOutputStream(target).getChannel) { target=> + val count = channel.transferTo(offset, size, target) + assert( count == size ) + } + return 0; + } +} + +case class Allocation(size:Long, offset:Long) extends Ordered[Allocation] { + + var _free_func: (Allocation)=>Unit = _ + + def free() = { + _free_func(this) + } + + def compare(that: Allocation): Int = { + var rc = longWrapper(size).compareTo(that.size) + if( rc!=0 ) { + rc + } else { + longWrapper(offset).compareTo(that.offset) + } + } + + // split the allocation.. + def split(request:Long):(Allocation, Allocation) = { + assert(request < size) + var first = Allocation(offset, request) + var second = Allocation(offset+request, size-request) + (first, second) + } + + // join the allocation.. + def join(that:Allocation):Allocation = { + assert( that.offset == offset+size) + Allocation(offset, size+that.size) + } + +} + +trait Allocator { + def alloc(request:Long):Allocation + + def chain(that:Allocator):Allocator = new Allocator() { + def alloc(request: Long): Allocation = { + val rc = Allocator.this.alloc(request) + if( rc == null ) { + that.alloc(request) + } else { + rc + } + } + } +} + +class TreeAllocator(range:Allocation) extends Allocator { + + // list of the free allocation areas. Sorted by size then offset + val free_by_size = new TreeMap[Allocation, Zilch]() + // list of the free allocation areas sorted by offset. + val free_by_offset = new TreeMap[Long, Allocation]() + + { + val allocation = range.copy() + free_by_offset.put(allocation.offset, allocation) + free_by_size.put(allocation, null) + } + + def alloc(request:Long):Allocation = { + var spot_entry = free_by_size.ceilingEntry(Allocation(request, 0)) + if( spot_entry== null ) { + return null + } + + val allocation = spot_entry.getKey + free_by_size.remove(allocation) + + // might be the perfect size + if( allocation.size == request ) { + allocation._free_func = free + allocation + } else { + // split the allocation.. + var (first, second) = allocation.split(request) + + free_by_offset.remove(first.offset) + free_by_offset.put(second.offset, second) + + // put the free part in the free map. + free_by_size.put(second, null) + + first._free_func = free + first + } + } + + def free(allocation:Allocation):Unit = { + + + var prev_e = free_by_offset.floorEntry(allocation.offset) + var next_e = if( prev_e!=null ) { + prev_e.next + } else { + free_by_offset.ceilingEntry(allocation.offset) + } + + val prev = Option(prev_e).map(_.getValue).map( a=> if(a.offset+a.size == allocation.offset) a else null ).getOrElse(null) + val next = Option(prev_e).map(_.getValue).map( a=> if(allocation.offset+allocation.size == a.offset) a else null ).getOrElse(null) + + (prev, next) match { + case (null, null)=> + allocation._free_func = null + free_by_size.put(allocation, null) + free_by_offset.put(allocation.offset, allocation) + + case (prev, null)=> + val joined = prev.join(allocation) + free_by_size.remove(prev) + free_by_size.put(joined, null) + free_by_offset.put(joined.offset, joined) + + case (null, next)=> + val joined = allocation.join(next) + free_by_size.remove(next) + free_by_size.put(joined, null) + + free_by_offset.remove(next.offset) + free_by_offset.put(joined.offset, joined) + + case (prev, next)=> + val joined = prev.join(allocation.join(next)) + free_by_size.remove(prev) + free_by_size.remove(next) + free_by_size.put(joined, null) + + free_by_offset.remove(next.offset) + free_by_offset.put(joined.offset, joined) + } + } +} + +/** + * Helps minimize the active page set. + */ +class ActiveAllocator(val range:Allocation) extends Allocator { + + // the cold allocated start with all the free space.. + val inactive = new TreeAllocator(range) + + // the hot is clear of any free space. + val active = new TreeAllocator(range) + + active.free_by_offset.clear + active.free_by_size.clear + + // allocate out of the hot area first since + // that should result in less vm swapping + val chain = active.chain(inactive) + + def alloc(request:Long):Allocation = { + var rc = chain.alloc(request) + if( rc!=null ) { + rc._free_func = free + } + rc + } + + def free(allocation:Allocation):Unit = { + // put stuff back in the hot tree. + active.free(allocation) + } + +} + +/** + *

+ *

+ * + * @author Hiram Chirino + */ +class FileDirectBufferAllocator(val directory:File) extends DirectBufferAllocator { + + // we use thread local allocators to + class AllocatorContext(val queue:DispatchQueue) { + + val allocator = new TreeAllocator(Allocation(0, Long.MaxValue)) + var channel:FileChannel = new RandomAccessFile(queue.getLabel, "rw").getChannel + + class AllocationBuffer(val allocation:Allocation) extends BaseRetained with FileDirectBufferTrait { + def channel: FileChannel = AllocatorContext.this.channel + def offset: Long = allocation.offset + def size: Int = allocation.size.toInt + + override def dispose: Unit = { + super.dispose + // since we might not get disposed from the same thread + // that did the allocation.. + queue <<| ^{ + allocation.free() + } + } + } + + def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=> + val allocation = allocator.alloc(size) + assert(allocation!=null) + new AllocationBuffer(allocation) + } + } + + val _current_allocator_context = new ThreadLocal[AllocatorContext]() + + protected def start() = { + directory.mkdirs + } + + def alloc(size: Int): DirectBuffer = with_allocator_context { ctx=> + ctx.alloc(size) + } + + def with_allocator_context[T](func: (AllocatorContext)=>T):T = { + if( getCurrentThreadQueue == null ) { + getGlobalQueue().future(func(current_allocator_context))() + } else { + func(current_allocator_context) + } + } + + def current_allocator_context:AllocatorContext = { + val thread_queue = getCurrentThreadQueue + assert(thread_queue != null) + var rc = _current_allocator_context.get + if( rc==null ) { + rc = new AllocatorContext(thread_queue) + _current_allocator_context.set(rc) + } + rc + } +} \ No newline at end of file Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala (from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.java (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/MessageRecord.scala Thu Jan 6 20:56:17 2011 @@ -3,7 +3,7 @@ * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with + * (the "License") you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -14,22 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.broker.store; +package org.apache.activemq.apollo.broker.store -import org.apache.activemq.apollo.util.DirectBuffer; -import org.fusesource.hawtbuf.AsciiBuffer; -import org.fusesource.hawtbuf.Buffer; +import org.fusesource.hawtbuf.AsciiBuffer +import org.fusesource.hawtbuf.Buffer /** * @author Hiram Chirino */ -public class MessageRecord { +class MessageRecord { - public long key = -1; - public AsciiBuffer protocol; - public int size; - public Buffer buffer; - public DirectBuffer direct_buffer = null; - public long expiration = 0; + var key = -1L + var protocol: AsciiBuffer = _ + var size = 0 + var buffer: Buffer = _ + var direct_buffer: DirectBuffer = _ + var expiration = 0L } Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala (from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java&r1=1056021&r2=1056071&rev=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.java (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRange.scala Thu Jan 6 20:56:17 2011 @@ -3,7 +3,7 @@ * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with + * (the "License") you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -14,14 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.broker.store; +package org.apache.activemq.apollo.broker.store /** * @author Hiram Chirino */ -public class QueueEntryRange { - public long first_entry_seq; - public long last_entry_seq; - public int count; - public int size; +class QueueEntryRange { + var first_entry_seq = 0L + var last_entry_seq = 0L + var count = 0 + var size = 0 } \ No newline at end of file Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala (from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.java (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueEntryRecord.scala Thu Jan 6 20:56:17 2011 @@ -3,7 +3,7 @@ * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with + * (the "License") you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 @@ -14,20 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.broker.store; +package org.apache.activemq.apollo.broker.store -import org.fusesource.hawtbuf.Buffer; + + +import org.fusesource.hawtbuf.Buffer /** * @author Hiram Chirino */ -public class QueueEntryRecord { +class QueueEntryRecord { - public long queue_key; - public long entry_seq; - public long message_key; - public Buffer attachment; - public int size; - public short redeliveries; + var queue_key = 0L + var entry_seq = 0L + var message_key = 0L + var attachment:Buffer = _ + var size = 0 + var redeliveries:Short = 0 } Copied: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala (from r1056021, activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java) URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala?p2=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala&p1=activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java&r1=1056021&r2=1056071&rev=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.java (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/QueueRecord.scala Thu Jan 6 20:56:17 2011 @@ -14,7 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.apollo.broker.store; +package org.apache.activemq.apollo.broker.store + +; import org.fusesource.hawtbuf.AsciiBuffer; import org.fusesource.hawtbuf.Buffer; @@ -23,8 +25,8 @@ import org.fusesource.hawtbuf.Buffer; /** * @author Hiram Chirino */ -public class QueueRecord { - public long key = -1; - public AsciiBuffer binding_kind; - public Buffer binding_data; +class QueueRecord { + var key = -1L + var binding_kind: AsciiBuffer = _ + var binding_data: Buffer = _ } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1056071&r1=1056070&r2=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Thu Jan 6 20:56:17 2011 @@ -43,7 +43,7 @@ trait Store extends ServiceTrait { * @returns true if the store implementation can handle accepting * MessageRecords with DirectBuffers in them. */ - def supports_direct_buffers() = false + def direct_buffer_allocator():DirectBufferAllocator = null /** * Creates a store uow which is used to perform persistent Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1056071&r1=1056070&r2=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Thu Jan 6 20:56:17 2011 @@ -27,10 +27,10 @@ import _root_.scala.collection.JavaConve import java.io.{EOFException, DataOutput, DataInput, IOException} import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel} import org.apache.activemq.apollo.transport._ -import org.apache.activemq.apollo.broker.store.MessageRecord import _root_.org.fusesource.hawtbuf._ import Buffer._ import org.apache.activemq.apollo.util._ +import org.apache.activemq.apollo.broker.store.{DirectBuffer, DirectBufferAllocator, MessageRecord} object StompCodec extends Log { val READ_BUFFFER_SIZE = 1024*64; @@ -157,7 +157,7 @@ class StompCodec extends ProtocolCodec w import StompCodec._ override protected def log: Log = StompCodec - var memory_pool:DirectBufferPool = null + var direct_buffer_allocator:DirectBufferAllocator = null implicit def wrap(x: Buffer) = ByteBuffer.wrap(x.data, x.offset, x.length); implicit def wrap(x: Byte) = { @@ -178,12 +178,11 @@ class StompCodec extends ProtocolCodec w var write_channel:WritableByteChannel = null var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size) - var next_write_direct:ByteBuffer = null - var next_write_direct_frame:StompFrame = null + var next_write_direct:DirectBuffer = null var write_buffer = ByteBuffer.allocate(0) - var write_direct:ByteBuffer = null - var write_direct_frame:StompFrame = null + var write_direct:DirectBuffer = null + var write_direct_pos = 0 def is_full = next_write_direct!=null || next_write_buffer.size() >= (write_buffer_size >> 2) def is_empty = write_buffer.remaining() == 0 && write_direct==null @@ -252,9 +251,8 @@ class StompCodec extends ProtocolCodec w frame.content match { case x:DirectContent=> - next_write_direct = x.direct_buffer.buffer.duplicate - next_write_direct.clear - next_write_direct_frame = frame + assert(next_write_direct==null) + next_write_direct = x.direct_buffer case x:BufferContent=> x.content.writeTo(os) END_OF_FRAME_BUFFER.writeTo(os) @@ -272,25 +270,28 @@ class StompCodec extends ProtocolCodec w write_counter += write_channel.write(write_buffer) } if ( write_buffer.remaining() == 0 && write_direct!=null ) { - write_counter += write_channel.write(write_direct) - if( write_direct.remaining() == 0 ) { + val count = write_direct.read(write_direct_pos, write_channel) + write_direct_pos += count + write_counter += count + + if( write_direct.remaining(write_direct_pos) == 0 ) { + write_direct.release write_direct = null - write_direct_frame.release - write_direct_frame = null + write_direct_pos = 0 + + write_buffer = ByteBuffer.wrap(END_OF_FRAME_BUFFER.data) } } // if it is now empty try to refill... - if ( is_empty && next_write_buffer.size()!=0 ) { + if ( is_empty && write_direct==null ) { // size of next buffer is based on how much was used in the previous buffer. val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size) write_buffer = next_write_buffer.toBuffer().toByteBuffer() write_direct = next_write_direct - write_direct_frame = next_write_direct_frame next_write_buffer = new DataByteArrayOutputStream(prev_size) next_write_direct = null - next_write_direct_frame = null } if ( is_empty ) { @@ -317,6 +318,10 @@ class StompCodec extends ProtocolCodec w var read_buffer = ByteBuffer.allocate(read_buffer_size) var read_end = 0 var read_start = 0 + + var read_direct:DirectBuffer = null + var read_direct_pos = 0 + var next_action:FrameReader = read_action def setReadableByteChannel(channel: ReadableByteChannel) = { @@ -339,7 +344,13 @@ class StompCodec extends ProtocolCodec w var command:Object = null while( command==null ) { // do we need to read in more data??? - if (read_end == read_buffer.position()) { + if( read_direct!=null && read_direct.remaining(read_direct_pos) > 0) { + val count = read_direct.write(read_channel, read_direct_pos) + if (count == -1) { + throw new EOFException("Peer disconnected") + } + read_direct_pos += count + } else if (read_end == read_buffer.position() ) { // do we need a new data buffer to read data into?? if (read_buffer.remaining() == 0) { @@ -469,40 +480,23 @@ class StompCodec extends ProtocolCodec w // lets try to keep the content of big message outside of the JVM's garbage collection // to keep the number of GCs down when moving big messages. def is_message = action == SEND || action == MESSAGE - if( length > 1024 && memory_pool!=null && is_message) { - - val ma = memory_pool.alloc(length+1) + if( length > 1024 && direct_buffer_allocator!=null && is_message) { - val read_limit = buffer.position - if( (read_limit-read_start) < length+1 ) { - // buffer did not contain the fully stomp body + read_direct = direct_buffer_allocator.alloc(length) - ma.buffer.put( buffer.array, read_start, read_limit-read_start ) + val dup = buffer.duplicate + dup.position(read_start) + dup.limit(buffer.position) - read_buffer = ma.buffer - read_end = read_limit-read_start - read_start = 0 - - next_action = read_binary_body_direct(action, headers, ma) - - } else { - // The current buffer already read in all the data... - - if( buffer.array()(read_start+length)!= 0 ) { - throw new IOException("Expected null termintor after "+length+" content bytes") - } + // copy in the body the was read so far... + read_direct_pos = read_direct.write(dup, 0) - // copy the body out to the direct buffer - ma.buffer.put( buffer.array, read_start, read_limit-read_start ) - - // and reposition to reuse non-direct space. - buffer.position(read_start) - read_end = read_start - - next_action = read_action - rc = new StompFrame(ascii(action), headers.toList, DirectContent(ma)) - } + // since it was copied.. reposition to re-use the copied area.. + dup.compact + buffer.position(buffer.position - read_direct_pos) + read_end = read_start + next_action = read_binary_body_direct(action, headers, length) } else { next_action = read_binary_body(action, headers, length) } @@ -526,9 +520,17 @@ class StompCodec extends ProtocolCodec w None } + def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> { + if( read_direct.remaining(read_direct_pos)==0 ) { + next_action = read_direct_terminator(action, headers, contentLength, read_direct) + read_direct = null + read_direct_pos = 0 + } + null + } - def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:DirectBuffer):FrameReader = (buffer)=> { - if( read_content_direct(ma) ) { + def read_direct_terminator(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int, ma:DirectBuffer):FrameReader = (buffer)=> { + if( read_frame_terminator(buffer, contentLength) ) { next_action = read_action new StompFrame(ascii(action), headers.toList, DirectContent(ma)) } else { @@ -536,22 +538,17 @@ class StompCodec extends ProtocolCodec w } } - def read_content_direct(ma:DirectBuffer) = { - val read_limit = ma.buffer.position - if( read_limit < ma.size ) { + def read_frame_terminator(buffer:ByteBuffer, contentLength:Int):Boolean = { + val read_limit = buffer.position + if( (read_limit-read_start) < 1 ) { read_end = read_limit false } else { - ma.buffer.position(ma.size-1) - if( ma.buffer.get != 0 ) { - throw new IOException("Expected null termintor after "+(ma.size-1)+" content bytes") + if( buffer.array()(read_start)!= 0 ) { + throw new IOException("Expected null termintor after "+contentLength+" content bytes") } - ma.buffer.rewind - ma.buffer.limit(ma.size-1) - - read_buffer = ByteBuffer.allocate(read_buffer_size) - read_end = 0 - read_start = 0 + read_end = read_start+1 + read_start = read_end true } } Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1056071&r1=1056070&r2=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Thu Jan 6 20:56:17 2011 @@ -25,6 +25,7 @@ import org.apache.activemq.apollo.broker import org.apache.activemq.apollo.util._ import org.fusesource.hawtdispatch.BaseRetained import java.io.{OutputStream, DataOutput} +import org.apache.activemq.apollo.broker.store.DirectBuffer /** * @@ -207,11 +208,10 @@ case class DirectContent(direct_buffer:D def writeTo(os:OutputStream) = { val buff = new Array[Byte](1024*4) - val source = direct_buffer.buffer.duplicate var remaining = direct_buffer.size-1 while( remaining> 0 ) { val c = remaining.min(buff.length) - source.get(buff, 0, c) + direct_buffer.read(os) os.write(buff, 0, c) remaining -= c } Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1056071&r1=1056070&r2=1056071&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Jan 6 20:56:17 2011 @@ -571,9 +571,9 @@ class StompProtocolHandler extends Proto connection_sink.offer(StompFrame(CONNECTED,connected_headers.toList)) - if( this.host.direct_buffer_pool!=null ) { + if( this.host.store!=null && this.host.store.direct_buffer_allocator!=null ) { val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec] - wf.memory_pool = this.host.direct_buffer_pool + wf.direct_buffer_allocator = this.host.store.direct_buffer_allocator } }