Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 46486 invoked from network); 7 Jul 2010 04:23:00 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 7 Jul 2010 04:23:00 -0000 Received: (qmail 56586 invoked by uid 500); 7 Jul 2010 04:23:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56540 invoked by uid 500); 7 Jul 2010 04:22:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56533 invoked by uid 99); 7 Jul 2010 04:22:59 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:22:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Jul 2010 04:22:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 925582388C10; Wed, 7 Jul 2010 04:22:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r961215 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/ activemq-hawtdb/src/main/resources/META... Date: Wed, 07 Jul 2010 04:22:01 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100707042201.925582388C10@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Jul 7 04:22:00 2010 New Revision: 961215 URL: http://svn.apache.org/viewvc?rev=961215&view=rev Log: Hooked in the MemoryPool feature into the stomp protocol. It can now handle HUGE messages without a problem, it does not even affect the JVM memory usage since they will get received and transmitted from a memory mapped file. The store interfaces need to get hooked into the MemoryPool so that they do not bring in those huge messages into the JVM's memory space when they are working on persisting the message. Added: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools - copied, changed from r961214, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools Removed: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 04:22:00 2010 @@ -74,7 +74,7 @@ trait DeliverySession extends Sink[Deliv * * @author Hiram Chirino */ -trait Message extends Filterable { +trait Message extends Filterable with Retained { /** * the globally unique id of the message @@ -132,7 +132,7 @@ object Delivery extends Sizer[Delivery] class Delivery extends BaseRetained { /** - * memory size of the delivery. Used for resource allocation tracking + * Total size of the delivery. Used for resource allocation tracking */ var size:Int = 0 Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:22:00 2010 @@ -493,7 +493,7 @@ class Queue(val host: VirtualHost, val d if (session.full) { false } else { - + delivery.message.retain if( tune_persistent && delivery.uow!=null ) { delivery.uow.retain } @@ -755,7 +755,7 @@ class QueueEntry(val queue:Queue, val se def as_head:Head = null /** - * Gets the size of this entry in bytes. The head and tail entries allways return 0. + * Gets the size of this entry in bytes. The head and tail entries always return 0. */ def size = 0 @@ -1000,8 +1000,9 @@ class QueueEntry(val queue:Queue, val se if( flushing ) { queue.flushing_size-=size queue.capacity_used -= size - state = new Flushed(delivery.storeKey, size) + delivery.message.release + state = new Flushed(delivery.storeKey, size) if( can_combine_with_prev ) { getPrevious.as_flushed_range.combineNext } @@ -1020,6 +1021,7 @@ class QueueEntry(val queue:Queue, val se flushing = false queue.flushing_size-=size } + delivery.message.release queue.capacity_used -= size super.remove } @@ -1169,7 +1171,7 @@ class QueueEntry(val queue:Queue, val se delivery.size = messageRecord.size delivery.storeKey = messageRecord.key - queue.capacity_used += size + queue.capacity_used += delivery.size queue.flushed_items -= 1 state = new Loaded(delivery, true) } else { Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 04:22:00 2010 @@ -302,18 +302,20 @@ class DeliveryProducerRoute(val router:R // Do we need to store the message if we have a matching consumer? var storeOnMatch = delivery.message.persistent && router.host.store!=null + delivery.message.retain targets.foreach { target=> // only delivery to matching consumers if( target.consumer.matches(delivery) ) { - + if( storeOnMatch ) { delivery.uow = router.host.store.createStoreUOW delivery.storeKey = delivery.uow.store(delivery.createMessageRecord) storeOnMatch = false } + if( !target.offer(delivery) ) { overflowSessions ::= target } @@ -340,6 +342,7 @@ class DeliveryProducerRoute(val router:R if (delivery.uow != null) { delivery.uow.release } + delivery.message.release } val drainer = ^{ Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 04:22:00 2010 @@ -34,7 +34,7 @@ import org.apache.activemq.apollo.dto.{H import java.io.File import java.util.concurrent.TimeUnit import org.apache.activemq.apollo.util.LongCounter -import org.apache.activemq.apollo.MemoryPool +import org.apache.activemq.apollo.{MemoryPoolFactory, MemoryPool} /** * @author Hiram Chirino @@ -127,6 +127,17 @@ class VirtualHost(val broker: Broker, va override protected def _start(onCompleted:Runnable):Unit = { + +// val memory_pool_config: String = null + val memory_pool_config: String = "hawtdb:activemq.tmp" + + if( MemoryPoolFactory.validate(memory_pool_config) ) { + memory_pool = MemoryPoolFactory.create(memory_pool_config) + if( memory_pool!=null ) { + memory_pool.start + } + } + val tracker = new LoggingTracker("virtual host startup", dispatchQueue) store = StoreFactory.create(config.store) if( store!=null ) { @@ -208,6 +219,11 @@ class VirtualHost(val broker: Broker, va // } // done.await(); + if( memory_pool!=null ) { + memory_pool.stop + memory_pool = null + } + if( store!=null ) { store.stop(onCompleted); } else { Copied: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools (from r961214, activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools?p2=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org.apache.activemq.apollo/memory-pools&p1=activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/resources/META-INF/services/org/apache/activemq/apollo/memory-pools&r1=961214&r2=961215&rev=961215&view=diff ============================================================================== (empty) Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/HawtDBMemoryPoolSPI.scala Wed Jul 7 04:22:00 2010 @@ -39,7 +39,6 @@ class HawtDBMemoryPoolSPI extends Memory if( config.startsWith(prefix) ) { val file = new File(config.substring(prefix.length)) new HawtDBMemoryPool(file) - null } else { null } Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 04:22:00 2010 @@ -24,6 +24,7 @@ import java.lang.{String, Class} import java.io.DataOutput import org.apache.activemq.apollo.broker._ import org.apache.activemq.apollo.MemoryAllocation +import org.fusesource.hawtdispatch.BaseRetained /** * @@ -153,12 +154,17 @@ case class StompFrameMessage(frame:Stomp } } + + def setDisposer(disposer: Runnable) = throw new UnsupportedOperationException + def retained = throw new UnsupportedOperationException + def retain = frame.retain + def release = frame.release } object StompFrame extends Sizer[StompFrame] { - def size(value:StompFrame) = value.size + def size(value:StompFrame) = value.size } trait StompContent { @@ -170,6 +176,8 @@ trait StompContent { def utf8:UTF8Buffer + def retain = {} + def release = {} } object NilStompContent extends StompContent { @@ -208,6 +216,9 @@ case class DirectStompContent(direct:Mem def utf8:UTF8Buffer = { buffer.utf8 } + + override def retain = direct.retain + override def release = direct.release } /** @@ -272,4 +283,6 @@ case class StompFrame(action:AsciiBuffer ).map(_._2).getOrElse(null) } + def retain = content.retain + def release = content.release } Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 04:22:00 2010 @@ -138,6 +138,7 @@ class StompProtocolHandler extends Proto } } val frame = delivery.message.asInstanceOf[StompFrameMessage].frame + frame.retain val rc = session.offer(frame) assert(rc, "offer should be accepted since it was not full") true @@ -163,7 +164,8 @@ class StompProtocolHandler extends Proto var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]() override def onTransportConnected() = { - session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame) + + session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>x}, dispatchQueue, StompFrame) connection_sink = new OverflowSink(session_manager.open(dispatchQueue)); connection_sink.refiller = ^{} @@ -275,6 +277,7 @@ class StompProtocolHandler extends Proto } case None=> + frame.release die("destination not set.") } } @@ -328,8 +331,7 @@ class StompProtocolHandler extends Proto connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt)))) } } - - + frame.release } def on_stomp_subscribe(headers:HeaderMap) = { Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul 7 04:22:00 2010 @@ -152,9 +152,11 @@ class StompWireFormat extends WireFormat var next_write_buffer = new DataByteArrayOutputStream(write_buffer_size) var next_write_direct:ByteBuffer = null + var next_write_direct_frame:StompFrame = null var write_buffer = ByteBuffer.allocate(0) var write_direct:ByteBuffer = null + var write_direct_frame:StompFrame = null def is_full = next_write_direct!=null || next_write_buffer.size() >= (write_buffer_size >> 2) def is_empty = write_buffer.remaining() == 0 && write_direct==null @@ -205,6 +207,7 @@ class StompWireFormat extends WireFormat val buffer2 = frame.content.asInstanceOf[BufferStompContent].content; val length = (buffer2.offset-buffer1.offset)+buffer2.length os.write( buffer1.data, offset, length) + END_OF_FRAME_BUFFER.writeTo(os) } else { for( (key, value) <- frame.headers ) { @@ -218,14 +221,15 @@ class StompWireFormat extends WireFormat frame.content match { case x:DirectStompContent=> next_write_direct = x.direct.buffer.duplicate - next_write_direct.limit(next_write_direct.limit-1) + next_write_direct.clear + next_write_direct_frame = frame case x:BufferStompContent=> x.content.writeTo(os) + END_OF_FRAME_BUFFER.writeTo(os) case _=> + END_OF_FRAME_BUFFER.writeTo(os) } - } - END_OF_FRAME_BUFFER.writeTo(os) } @@ -237,6 +241,11 @@ class StompWireFormat extends WireFormat } if ( write_buffer.remaining() == 0 && write_direct!=null ) { write_counter += write_channel.write(write_direct) + if( write_direct.remaining() == 0 ) { + write_direct = null + write_direct_frame.release + write_direct_frame = null + } } // if it is now empty try to refill... @@ -245,8 +254,11 @@ class StompWireFormat extends WireFormat val prev_size = (write_buffer.position()+512).max(512).min(write_buffer_size) write_buffer = next_write_buffer.toBuffer().toByteBuffer() write_direct = next_write_direct + write_direct_frame = next_write_direct_frame + next_write_buffer = new DataByteArrayOutputStream(prev_size) next_write_direct = null + next_write_direct_frame = null } if ( is_empty ) { @@ -382,13 +394,13 @@ class StompWireFormat extends WireFormat action = action.trim() } if (action.length() > 0) { - next_action = read_headers(action) + next_action = read_headers(action.ascii) } } null } - def read_headers(action:Buffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> { + def read_headers(action:AsciiBuffer, headers:HeaderMapBuffer=new HeaderMapBuffer()):FrameReader = (buffer)=> { var rc:StompFrame = null val line = read_line(buffer, MAX_HEADER_LENGTH, "The maximum header length was exceeded") if( line !=null ) { @@ -434,7 +446,10 @@ class StompWireFormat extends WireFormat throw new IOException("The maximum data length was exceeded") } - if( length > 1024 && memory_pool!=null ) { + // lets try to keep the content of big message outside of the JVM's garbage collection + // to keep the number of GCs down when moving big messages. + def is_message = action == Commands.SEND || action == Responses.MESSAGE + if( length > 1024 && memory_pool!=null && is_message) { val ma = memory_pool.alloc(length+1) @@ -492,7 +507,7 @@ class StompWireFormat extends WireFormat } - def read_binary_body_direct(action:Buffer, headers:HeaderMapBuffer, ma:MemoryAllocation):FrameReader = (buffer)=> { + def read_binary_body_direct(action:AsciiBuffer, headers:HeaderMapBuffer, ma:MemoryAllocation):FrameReader = (buffer)=> { if( read_content_direct(ma) ) { next_action = read_action new StompFrame(ascii(action), headers.toList, DirectStompContent(ma)) @@ -521,7 +536,7 @@ class StompWireFormat extends WireFormat } } - def read_binary_body(action:Buffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> { + def read_binary_body(action:AsciiBuffer, headers:HeaderMapBuffer, contentLength:Int):FrameReader = (buffer)=> { val content:Buffer=read_content(buffer, contentLength) if( content != null ) { next_action = read_action @@ -562,7 +577,7 @@ class StompWireFormat extends WireFormat } - def read_text_body(action:Buffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> { + def read_text_body(action:AsciiBuffer, headers:HeaderMapBuffer):FrameReader = (buffer)=> { val content:Buffer=read_to_null(buffer) if( content != null ) { next_action = read_action Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md?rev=961215&r1=961214&r2=961215&view=diff ============================================================================== --- activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md (original) +++ activemq/sandbox/activemq-apollo-actor/activemq-web/readme.md Wed Jul 7 04:22:00 2010 @@ -4,4 +4,4 @@ Run with ./run test with a json client.. perhaps curl like this: -curl -i -H "Accept: application/json" localhost:8080/default \ No newline at end of file +curl -i -H "Accept: application/json" localhost:8080/brokers \ No newline at end of file