activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1146906 - /activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Date Thu, 14 Jul 2011 21:43:31 GMT
Author: chirino
Date: Thu Jul 14 21:43:31 2011
New Revision: 1146906

URL: http://svn.apache.org/viewvc?rev=1146906&view=rev
Log:
Optimize ack credit handling by using an event source

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1146906&r1=1146905&r2=1146906&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Jul 14 21:43:31 2011
@@ -162,6 +162,23 @@ class StompProtocolHandler extends Proto
 //      r.release
 //    }
 
+    val ack_source = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
+      def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
+        if( previous == null ) {
+          event
+        } else {
+          (previous._1+event._1, previous._2+event._2)
+        }
+      }
+      def mergeEvents(previous:(Int, Int), events:(Int, Int)) = mergeEvent(previous, events)
+    }, dispatch_queue)
+
+    ack_source.setEventHandler(^ {
+      val data = ack_source.getData
+      credit_window_filter.credit(data._1, data._2)
+    });
+    ack_source.resume
+
     trait AckHandler {
       def track(delivery:Delivery):Unit
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
@@ -169,11 +186,12 @@ class StompProtocolHandler extends Proto
     }
 
     class AutoAckHandler extends AckHandler {
+
       def track(delivery:Delivery) = {
         if( delivery.ack!=null ) {
           delivery.ack(Delivered, null)
         }
-        credit_window_filter.credit(delivery.size, 1)
+        ack_source.merge((delivery.size, 1))
       }
 
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
@@ -217,12 +235,13 @@ class StompProtocolHandler extends Proto
           for( (id, delivery) <- acked ) {
             for( credit <- delivery.credit ) {
               credit_window_filter.credit(credit, 1)
+              ack_source.merge((credit, 1))
               delivery.credit = None
             }
           }
         } else {
           if( credit_value!=null ) {
-            credit_window_filter.credit(credit_value._1, credit_value._2)
+            ack_source.merge((credit_value._1, credit_value._2))
           }
         }
       }
@@ -280,13 +299,13 @@ class StompProtocolHandler extends Proto
         if( initial_credit_window._3 ) {
           for( delivery <- consumer_acks.get(msgid)) {
             for( credit <- delivery.credit ) {
-              credit_window_filter.credit(credit, 1)
+              ack_source.merge((credit,1))
               delivery.credit = None
             }
           }
         } else {
           if( credit_value!=null ) {
-            credit_window_filter.credit(credit_value._1, credit_value._2)
+            ack_source.merge((credit_value._1, credit_value._2))
           }
         }
       }
@@ -335,7 +354,7 @@ class StompProtocolHandler extends Proto
       sink_manager.close(consumer_sink)
     }
 
-    val dispatch_queue = StompProtocolHandler.this.dispatchQueue
+    def dispatch_queue = StompProtocolHandler.this.dispatchQueue
 
     override def connection = Some(StompProtocolHandler.this.connection)
     override def receive_buffer_size = codec.write_buffer_size
@@ -1004,6 +1023,7 @@ class StompProtocolHandler extends Proto
             (y.toInt, x.toInt, true)
           case x :: y :: z :: _ =>
             (y.toInt, x.toInt, z.toBoolean)
+          case _ => (codec.write_buffer_size, 1, true)
         }
       case None =>
         (codec.write_buffer_size, 1, true)
@@ -1134,6 +1154,7 @@ class StompProtocolHandler extends Proto
             (0, x.toInt)
           case x :: y :: _ =>
             (y.toInt, x.toInt)
+          case _ => (0,0)
         }
 
     }



Mime
View raw message