activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1487677 - in /activemq/activemq-apollo/trunk: apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Date Wed, 29 May 2013 23:19:14 GMT
Author: chirino
Date: Wed May 29 23:19:13 2013
New Revision: 1487677

URL: http://svn.apache.org/r1487677
Log:
Optimize the range ack case a bit when the first message is being acked.  This should lower
the CPU usage as we don't need to iterate all the messages sent to the consumer.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1487677&r1=1487676&r2=1487677&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Wed May 29 23:19:13 2013
@@ -1101,14 +1101,20 @@ class OpenwireProtocolHandler extends Pr
             }
           }
         } else {
-          var found = false
-          val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
-            if( id == msgid ) {
-              found = true
-              true
-            } else {
-              !found
+
+          val acked = if( !consumer_acks.isEmpty && consumer_acks.headOption.get._1
== msgid ) {
+            Seq(consumer_acks.headOption.get)
+          } else {
+            var found = false
+            val (acked, _) = consumer_acks.partition{ case (id, ack)=>
+              if( id == msgid ) {
+                found = true
+                true
+              } else {
+                !found
+              }
             }
+            acked
           }
 
           for( (id, delivery) <- acked ) {
@@ -1150,25 +1156,29 @@ class OpenwireProtocolHandler extends Pr
             }
           }
         } else {
-          // session acks ack all previously received messages..
-          var found = false
-          val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
-            if( id == msgid ) {
-              found = true
-              true
-            } else {
-              !found
+          val acked = if( !consumer_acks.isEmpty && consumer_acks.headOption.get._1
== msgid ) {
+            Seq(consumer_acks.remove(0))
+          } else {
+            // session acks ack all previously received messages..
+            var found = false
+            val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+              if( id == msgid ) {
+                found = true
+                true
+              } else {
+                !found
+              }
+            }
+            if( !found ) {
+              trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address,
msgid, addresses.mkString(",")))
             }
+            consumer_acks = not_acked
+            acked
           }
 
-          if( !found ) {
-            trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address,
msgid, addresses.mkString(",")))
-          } else {
-            consumer_acks = not_acked
-            acked.foreach{case (id, delivery)=>
-              if( delivery.ack!=null ) {
-                delivery.ack(consumed, uow)
-              }
+          acked.foreach{case (id, delivery)=>
+            if( delivery.ack!=null ) {
+              delivery.ack(consumed, uow)
             }
           }
         }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1487677&r1=1487676&r2=1487677&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed May 29 23:19:13 2013
@@ -16,30 +16,28 @@
  */
 package org.apache.activemq.apollo.stomp
 
-import _root_.org.fusesource.hawtbuf._
+import org.fusesource.hawtbuf._
+import collection.mutable.{ListBuffer, HashMap}
 import dto.{StompConnectionStatusDTO, StompDTO}
-import org.fusesource.hawtdispatch._
-
-import org.apache.activemq.apollo.broker._
-import Buffer._
+import java.io.IOException
 import java.lang.String
-import protocol.{ProtocolFilter3, ProtocolHandler}
-import security.SecurityContext
-import Stomp._
-import org.apache.activemq.apollo.selector.SelectorParser
-import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
-import org.apache.activemq.apollo.broker.store._
-import org.apache.activemq.apollo.util._
+import java.util
 import java.util.concurrent.TimeUnit
-import java.util.Map.Entry
-import collection.mutable.{ListBuffer, HashMap}
-import java.io.IOException
+import language.implicitConversions
+import org.apache.activemq.apollo.broker._
+import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.dto._
+import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
+import org.apache.activemq.apollo.selector.SelectorParser
+import org.apache.activemq.apollo.util._
+import org.apache.activemq.apollo.util.path.LiteralPart
+import org.fusesource.hawtdispatch._
 import org.fusesource.hawtdispatch.transport.HeartBeatMonitor
-import path.{LiteralPart, Path, PathParser}
-import scala.Some
-import org.apache.activemq.apollo.broker.SubscriptionAddress
-import java.util
+import org.apache.activemq.apollo.util.path.{Path, PathParser}
+import org.apache.activemq.apollo.broker.protocol.{ProtocolFilter3, ProtocolHandler}
+import org.apache.activemq.apollo.broker.security.SecurityContext
+import org.fusesource.hawtbuf.Buffer._
+import Stomp._
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -51,7 +49,6 @@ case class RichBuffer(self:Buffer) exten
   }
 }
 
-import language.implicitConversions
 object BufferSupport {
   implicit def to_rich_buffer(value:Buffer):RichBuffer = RichBuffer(value)
 }
@@ -153,6 +150,7 @@ object StompProtocolHandler extends Log 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class StompProtocolHandler extends ProtocolHandler {
+
   import StompProtocolHandler._
 
   var connection_log:Log = StompProtocolHandler
@@ -363,14 +361,20 @@ class StompProtocolHandler extends Proto
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit = {
         queue.assertExecuting()
         if( initial_credit_window.auto_credit ) {
-          var found = false
-          val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
-            if( id == msgid ) {
-              found = true
-              true
-            } else {
-              !found
+
+          val acked = if( !consumer_acks.isEmpty && consumer_acks.headOption.get._1
== msgid ) {
+            Seq(consumer_acks.headOption.get)
+          } else {
+            var found = false
+            val (acked, _) = consumer_acks.partition{ case (id, ack)=>
+              if( id == msgid ) {
+                found = true
+                true
+              } else {
+                !found
+              }
             }
+            acked
           }
 
           for( (id, delivery) <- acked ) {
@@ -391,25 +395,29 @@ class StompProtocolHandler extends Proto
         queue.assertExecuting()
         assert(consumer_acks !=null)
 
-        // session acks ack all previously received messages..
-        var found = false
-        val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
-          if( id == msgid ) {
-            found = true
-            true
-          } else {
-            !found
+        val acked = if( !consumer_acks.isEmpty && consumer_acks.headOption.get._1
== msgid ) {
+          Seq(consumer_acks.remove(0))
+        } else {
+          // session acks ack all previously received messages..
+          var found = false
+          val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+            if( id == msgid ) {
+              found = true
+              true
+            } else {
+              !found
+            }
+          }
+          if( !found ) {
+            trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address,
msgid, addresses.mkString(",")))
           }
+          consumer_acks = not_acked
+          acked
         }
 
-        if( !found ) {
-          trace("%s: ACK failed, invalid message id: %s, dest: %s".format(security_context.remote_address,
msgid, addresses.mkString(",")))
-        } else {
-          consumer_acks = not_acked
-          acked.foreach{case (id, delivery)=>
-            if( delivery.ack!=null ) {
-              delivery.ack(consumed, uow)
-            }
+        acked.foreach{case (id, delivery)=>
+          if( delivery.ack!=null ) {
+            delivery.ack(consumed, uow)
           }
         }
 
@@ -751,6 +759,7 @@ class StompProtocolHandler extends Proto
 
   override def set_connection(connection: BrokerConnection) = {
     super.set_connection(connection)
+    import OptionSupport._
     import collection.JavaConversions._
 
     codec = connection.protocol_codec(classOf[StompCodec])
@@ -759,7 +768,6 @@ class StompProtocolHandler extends Proto
 
     protocol_filters = ProtocolFilter3.create_filters(config.protocol_filters.toList, this)
 
-    import OptionSupport._
     Option(config.max_data_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_data_length
= _ )
     Option(config.max_header_length).map(MemoryPropertyEditor.parse(_).toInt).foreach( codec.max_header_length
= _ )
     config.max_headers.foreach( codec.max_headers = _ )



Mime
View raw message