activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1291171 - /activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Date Mon, 20 Feb 2012 10:02:48 GMT
Author: chirino
Date: Mon Feb 20 10:02:48 2012
New Revision: 1291171

URL: http://svn.apache.org/viewvc?rev=1291171&view=rev
Log:
Openwire should not remove message on a DELIVERED_ACK_TYPE ack.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1291171&r1=1291170&r2=1291171&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Mon Feb 20 10:02:48 2012
@@ -1072,12 +1072,14 @@ class OpenwireProtocolHandler extends Pr
         }
       }
 
-      def perform_ack(messageAck: MessageAck, uow:StoreUOW=null) = {
+      def perform_ack(messageAck: MessageAck, uow:StoreUOW=null):Unit = {
         queue.assertExecuting()
 
-        val msgid = messageAck.getLastMessageId
         val consumed = messageAck.getAckType match {
-          case MessageAck.DELIVERED_ACK_TYPE => Consumed
+          case MessageAck.DELIVERED_ACK_TYPE =>
+            // DELIVERED_ACK_TYPE are just used to send flow control credits
+            // to the broker.. return since we won't be consuming the message.
+            return
           case MessageAck.INDIVIDUAL_ACK_TYPE => Consumed
           case MessageAck.STANDARD_ACK_TYPE => Consumed
           case MessageAck.POSION_ACK_TYPE => Poisoned
@@ -1085,11 +1087,12 @@ class OpenwireProtocolHandler extends Pr
           case MessageAck.UNMATCHED_ACK_TYPE => Consumed
         }
 
+        val msgid = messageAck.getLastMessageId
         if( messageAck.getAckType == MessageAck.INDIVIDUAL_ACK_TYPE) {
           consumer_acks = consumer_acks.filterNot{ case (id, delivery)=>
             if( id == msgid) {
               if( delivery.ack!=null ) {
-                delivery.ack(consumed, uow)
+                delivery.ack(Consumed, uow)
               }
               true
             } else {



Mime
View raw message