Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 001A9DC22 for ; Wed, 19 Sep 2012 02:28:44 +0000 (UTC) Received: (qmail 31251 invoked by uid 500); 19 Sep 2012 02:28:44 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 31220 invoked by uid 500); 19 Sep 2012 02:28:44 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 31212 invoked by uid 99); 19 Sep 2012 02:28:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Sep 2012 02:28:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Sep 2012 02:28:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4AFDA238890B for ; Wed, 19 Sep 2012 02:28:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1387436 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala Sink.scala Date: Wed, 19 Sep 2012 02:28:00 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120919022800.4AFDA238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Sep 19 02:27:59 2012 New Revision: 1387436 URL: http://svn.apache.org/viewvc?rev=1387436&view=rev Log: Last commit for APLO-244 introduce a perf regression for the queue case which this commit should fix. Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1387436&r1=1387435&r2=1387436&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Sep 19 02:27:59 2012 @@ -92,7 +92,7 @@ class Queue(val router: LocalRouter, val ack_source.setEventHandler(^ {drain_acks}); ack_source.resume - val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery, Integer.MAX_VALUE, virtual_host.broker.auto_tuned_send_receiver_buffer_size) { + val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery, Integer.MAX_VALUE, 1024*640) { override def time_stamp = now } @@ -223,10 +223,14 @@ class Queue(val router: LocalRouter, val def configure(update:QueueSettingsDTO) = { def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt - producer_swapped_in.size_max += mem_size(update.tail_buffer, "640k") - Option(config).map{ config=> + var new_tail_buffer = mem_size(update.tail_buffer, "640k") + var old_tail_buffer = Option(config).map { config => mem_size(config.tail_buffer, "640k") }.getOrElse(0) + producer_swapped_in.size_max += new_tail_buffer - old_tail_buffer + session_manager.resize(Int.MaxValue, new_tail_buffer) + tune_persistent = virtual_host.store !=null && update.persistent.getOrElse(true) tune_swap = tune_persistent && update.swap.getOrElse(true) tune_swap_range_size = update.swap_range_size.getOrElse(10000) Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1387436&r1=1387435&r2=1387436&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Sep 19 02:27:59 2012 @@ -342,7 +342,7 @@ trait SessionSinkFilter[T] extends Sessi * * @author Hiram Chirino */ -class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T], delivery_credits:Int, val size_credits:Int) { +class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T], var delivery_credits:Int, var size_credits:Int) { var sessions = HashSet[Session[T]]() var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]() @@ -370,6 +370,18 @@ class SessionSinkMux[T](val downstream:S } } + def resize(new_delivery_credits:Int, new_size_credits:Int) = consumer_queue { + val delivery_credits_change = new_delivery_credits - delivery_credits + if ( delivery_credits_change!=0 ) { + for ( session <- sessions ) { + session.credit(delivery_credits_change, 0); + } + } + this.delivery_credits = new_delivery_credits + this.size_credits = new_size_credits + schedual_rebalance + } + def time_stamp = 0L downstream.refiller = ^{ drain_source.merge(0x01) }