activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1133620 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Thu, 09 Jun 2011 01:12:34 GMT
Author: chirino
Date: Thu Jun  9 01:12:33 2011
New Revision: 1133620

URL: http://svn.apache.org/viewvc?rev=1133620&view=rev
Log:
Tune the delivery sessions based on the the connection's read/write buffer sizes.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Thu Jun  9 01:12:33 2011
@@ -39,6 +39,8 @@ trait DeliveryProducer {
 
   def connection:Option[BrokerConnection] = None
 
+  def send_buffer_size = 64*1024
+
   def collocate(value:DispatchQueue):Unit = {
     if( value.getTargetQueue ne dispatch_queue.getTargetQueue ) {
       debug("co-locating %s with %s", dispatch_queue.getLabel, value.getLabel);
@@ -57,6 +59,8 @@ trait DeliveryConsumer extends Retained 
 
   def connection:Option[BrokerConnection] = None
 
+  def receive_buffer_size = 64*1024
+
   def browser = false
   def exclusive = false
   def dispatch_queue:DispatchQueue;

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Jun  9 01:12:33 2011
@@ -90,11 +90,6 @@ class Queue(val router: LocalRouter, val
   //
 
   /**
-   *  The amount of memory buffer space for receiving messages.
-   */
-  def tune_producer_buffer = config.producer_buffer.getOrElse(256*1024)
-
-  /**
    *  The amount of memory buffer space for the queue..
    */
   def tune_queue_buffer = config.queue_buffer.getOrElse(32*1024)
@@ -468,11 +463,12 @@ class Queue(val router: LocalRouter, val
 
     override def producer = p
 
-    val session = session_manager.open(producer.dispatch_queue)
+    val session_max = producer.send_buffer_size
+    val session = session_manager.open(producer.dispatch_queue, session_max)
 
     dispatch_queue {
       inbound_sessions += this
-      addCapacity( tune_producer_buffer )
+      addCapacity( session_max )
     }
 
     def remaining_capacity = session.remaining_capacity
@@ -480,7 +476,7 @@ class Queue(val router: LocalRouter, val
     def close = {
       session_manager.close(session)
       dispatch_queue {
-        addCapacity( -tune_producer_buffer )
+        addCapacity( -session_max )
         inbound_sessions -= this
       }
       release
@@ -592,6 +588,7 @@ class Queue(val router: LocalRouter, val
 
   override def connection:Option[BrokerConnection] = None
 
+  override def send_buffer_size = tune_queue_buffer
 
   /////////////////////////////////////////////////////////////////////
   //

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
(original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
Thu Jun  9 01:12:33 2011
@@ -37,12 +37,6 @@ public class QueueDTO extends StringIdDT
     public Boolean unified;
 
     /**
-     *  The amount of memory buffer space for receiving messages.
-     */
-    @XmlAttribute(name="producer_buffer")
-    public Integer producer_buffer;
-
-    /**
      *  The amount of memory buffer space for the queue..
      */
     @XmlAttribute(name="queue_buffer")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1133620&r1=1133619&r2=1133620&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Jun  9 01:12:33 2011
@@ -253,6 +253,7 @@ class StompProtocolHandler extends Proto
     val dispatch_queue = StompProtocolHandler.this.dispatchQueue
 
     override def connection = Some(StompProtocolHandler.this.connection)
+    override def receive_buffer_size = codec.write_buffer_size
 
     def is_persistent = false
 
@@ -281,7 +282,7 @@ class StompProtocolHandler extends Proto
       def consumer = StompConsumer.this
       var closed = false
 
-      val session = session_manager.open(producer.dispatch_queue)
+      val session = session_manager.open(producer.dispatch_queue, codec.write_buffer_size)
 
       def remaining_capacity = session.remaining_capacity
 
@@ -373,6 +374,8 @@ class StompProtocolHandler extends Proto
 
   var destination_parser = Stomp.destination_parser
 
+  var codec:StompCodec = _
+
   implicit def toDestinationDTO(value:AsciiBuffer):Array[DestinationDTO] = {
     val rc = destination_parser.decode_destination(value.toString)
     if( rc==null ) {
@@ -385,7 +388,7 @@ class StompProtocolHandler extends Proto
     super.set_connection(connection)
     import collection.JavaConversions._
 
-    val codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+    codec = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
     config = connection.connector.config.protocols.find( _.isInstanceOf[StompDTO]).map(_.asInstanceOf[StompDTO]).getOrElse(new
StompDTO)
 
     protocol_filters = ProtocolFilter.create_filters(config.protocol_filters.toList, this)
@@ -758,6 +761,7 @@ class StompProtocolHandler extends Proto
         // create the producer route...
 
         val route = new DeliveryProducerRoute(host.router) {
+          override def send_buffer_size = codec.read_buffer_size
           override def connection = Some(StompProtocolHandler.this.connection)
           override def dispatch_queue = queue
 



Mime
View raw message