activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1146721 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Date Thu, 14 Jul 2011 14:00:49 GMT
Author: chirino
Date: Thu Jul 14 14:00:49 2011
New Revision: 1146721

URL: http://svn.apache.org/viewvc?rev=1146721&view=rev
Log:
Fixes failing assert due to sloppy UOW reference counting.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1146721&r1=1146720&r2=1146721&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Jul 14 14:00:49 2011
@@ -566,15 +566,19 @@ class Queue(val router: LocalRouter, val
 
   def drain_acks = {
     ack_source.getData.foreach {
-      case (entry, consumed, tx) =>
+      case (entry, consumed, uow) =>
         consumed match {
-          case Delivered   => entry.ack(tx)
+          case Delivered   =>
+            entry.ack(uow)
           case Expired     =>
             entry.entry.queue.expired(entry.entry, false)
-            entry.ack(tx)
+            entry.ack(uow)
           case Poisoned    => entry.nack
           case Undelivered => entry.nack
         }
+        if( uow!=null ) {
+          uow.release()
+        }
     }
     messages.refiller.run
   }
@@ -1265,8 +1269,11 @@ class QueueEntry(val queue:Queue, val se
 
                   val acquiredQueueEntry = sub.acquire(entry)
                   val acquiredDelivery = delivery.copy
-                  acquiredDelivery.ack = (consumed, tx)=> {
-                    queue.ack_source.merge((acquiredQueueEntry, consumed, tx))
+                  acquiredDelivery.ack = (consumed, uow)=> {
+                    if( uow!=null ) {
+                      uow.retain()
+                    }
+                    queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
                   }
 
                   val accepted = sub.offer(acquiredDelivery)
@@ -1707,7 +1714,7 @@ class Subscription(val queue:Queue, val 
     acquired.addLast(this)
     acquired_size += entry.size
 
-    def ack(sb:StoreUOW):Unit = {
+    def ack(uow:StoreUOW):Unit = {
       assert_executing
       if(!isLinked) {
         debug("Internal protocol error: message delivery acked/nacked multiple times: "+entry.seq)
@@ -1719,20 +1726,16 @@ class Subscription(val queue:Queue, val 
       }
       total_ack_count += 1
       if (entry.messageKey != -1) {
-        val storeBatch = if( sb == null ) {
+        val storeBatch = if( uow == null ) {
           queue.virtual_host.store.create_uow
         } else {
-          sb
+          uow
         }
         storeBatch.dequeue(entry.toQueueEntryRecord)
-        if( sb == null ) {
+        if( uow == null ) {
           storeBatch.release
         }
       }
-      if( sb != null ) {
-        sb.release
-      }
-
       queue.dequeue_item_counter += 1
       queue.dequeue_size_counter += entry.size
       queue.dequeue_ts = queue.now



Mime
View raw message