Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8545321F5 for ; Tue, 3 May 2011 17:25:27 +0000 (UTC) Received: (qmail 96758 invoked by uid 500); 3 May 2011 17:25:26 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 96237 invoked by uid 500); 3 May 2011 17:25:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 96226 invoked by uid 99); 3 May 2011 17:25:25 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2011 17:25:25 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 May 2011 17:25:24 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id DF6E823889C5; Tue, 3 May 2011 17:25:03 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1099136 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ Date: Tue, 03 May 2011 17:25:03 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110503172503.DF6E823889C5@eris.apache.org> Author: chirino Date: Tue May 3 17:25:03 2011 New Revision: 1099136 URL: http://svn.apache.org/viewvc?rev=1099136&view=rev Log: playing with optimizing the session window sizes. Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1099136&r1=1099135&r2=1099136&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Tue May 3 17:25:03 2011 @@ -75,6 +75,7 @@ trait DeliveryConsumer extends Retained trait DeliverySession extends Sink[Delivery] { def producer:DeliveryProducer def consumer:DeliveryConsumer + def remaining_capacity:Int def close:Unit } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1099136&r1=1099135&r2=1099136&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Tue May 3 17:25:03 2011 @@ -95,7 +95,7 @@ class Queue(val router: LocalRouter, val /** * The amount of memory buffer space for receiving messages. */ - def tune_producer_buffer = config.producer_buffer.getOrElse(32*1024) + def tune_producer_buffer = config.producer_buffer.getOrElse(256*1024) /** * The amount of memory buffer space for the queue.. @@ -135,7 +135,7 @@ class Queue(val router: LocalRouter, val tune_persistent = virtual_host.store !=null && config.persistent.getOrElse(true) tune_swap = tune_persistent && config.swap.getOrElse(true) tune_swap_range_size = config.swap_range_size.getOrElse(10000) - tune_consumer_buffer = config.consumer_buffer.getOrElse(32*1024) + tune_consumer_buffer = config.consumer_buffer.getOrElse(256*1024) } configure(config) @@ -469,6 +469,7 @@ class Queue(val router: LocalRouter, val addCapacity( tune_producer_buffer ) } + def remaining_capacity = session.remaining_capacity def close = { session_manager.close(session) Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1099136&r1=1099135&r2=1099136&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Tue May 3 17:25:03 2011 @@ -179,6 +179,10 @@ class MutableSink[T] extends Sink[T] { } +trait SessionSink[T] extends Sink[T] { + def remaining_capacity:Int +} + /** *

* A SinkMux multiplexes access to a target sink so that multiple @@ -230,7 +234,7 @@ class SinkMux[T](val downstream:Sink[T], sessions.foreach(_.credit_adder.resume) } - def open(producer_queue:DispatchQueue):Sink[T] = { + def open(producer_queue:DispatchQueue):SessionSink[T] = { val session = new Session[T](producer_queue, 0, this) consumer_queue <<| ^{ if( overflow.full ) { @@ -259,7 +263,7 @@ class SinkMux[T](val downstream:Sink[T], /** * tracks one producer to consumer session / credit window. */ -class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends Sink[T] { +class Session[T](val producer_queue:DispatchQueue, var credits:Int, mux:SinkMux[T]) extends SessionSink[T] { var refiller:Runnable = NOOP @@ -295,6 +299,7 @@ class Session[T](val producer_queue:Disp // producer serial dispatch queue /////////////////////////////////////////////////// + def remaining_capacity = credits override def full = { assert(producer_queue.isExecuting) Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1099136&r1=1099135&r2=1099136&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Tue May 3 17:25:03 2011 @@ -532,6 +532,8 @@ class OpenwireProtocolHandler extends Pr release } + def remaining_capacity = outbound_session.remaining_capacity + // Delegate all the flow control stuff to the session def offer(delivery:Delivery) = { if( outbound_session.full ) { Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1099136&r1=1099135&r2=1099136&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Tue May 3 17:25:03 2011 @@ -260,6 +260,8 @@ class StompProtocolHandler extends Proto val session = session_manager.open(producer.dispatch_queue) + def remaining_capacity = session.remaining_capacity + def close = { assert(producer.dispatch_queue.isExecuting) if( !closed ) {