activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1447576 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala QueueEntry.scala
Date Tue, 19 Feb 2013 02:42:42 GMT
Author: chirino
Date: Tue Feb 19 02:42:42 2013
New Revision: 1447576

URL: http://svn.apache.org/r1447576
Log:
Fixes assertion failures logged durring tests.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1447576&r1=1447575&r2=1447576&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Tue Feb 19 02:42:42 2013
@@ -30,6 +30,7 @@ import security.{SecuredResource, Securi
 import org.apache.activemq.apollo.dto._
 import java.util.regex.Pattern
 import collection.mutable.ListBuffer
+import org.fusesource.hawtbuf.Buffer
 
 object Queue extends Log {
   val subscription_counter = new AtomicInteger(0)
@@ -712,12 +713,12 @@ class Queue(val router: LocalRouter, val
         // Do we need to do a persistent enqueue???
         val uow = if( queue_delivery.persistent && tune_persistent ) {
           assert(delivery.uow !=null)
-          queue_delivery.uow = delivery.uow
+          val uow = delivery.uow
           entry.state match {
-            case state:entry.Loaded => state.store_enqueue
-            case state:entry.Swapped => queue_delivery.uow.enqueue(entry.toQueueEntryRecord)
+            case state:entry.Loaded => state.store_enqueue(uow)
+            case state:entry.Swapped => uow.enqueue(entry.toQueueEntryRecord)
           }
-          queue_delivery.uow
+          uow
         } else {
           null
         }
@@ -739,13 +740,12 @@ class Queue(val router: LocalRouter, val
         }
 
         if( delivery.ack!=null ) {
-          delivery.ack(Consumed, queue_delivery.uow)
+          delivery.ack(Consumed, uow)
         }
 
         // release the store batch...
         if (uow != null) {
           uow.release(binding.binding_kind+":"+id+":offer")
-          queue_delivery.uow = null
         }
 
         

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1447576&r1=1447575&r2=1447576&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Tue Feb 19 02:42:42 2013
@@ -440,14 +440,14 @@ class QueueEntry(val queue:Queue, val se
 
     override  def as_loaded = this
 
-    def store_enqueue = {
+    def store_enqueue(uow:StoreUOW) = {
       assert(queue.service_state.is_starting_or_started)
       if(!enqueue_stored && !storing_enqueue) {
         storing_enqueue = true
-        assert( delivery.uow!=null )
-        delivery.uow.enqueue(toQueueEntryRecord)
+        assert( uow!=null )
+        uow.enqueue(toQueueEntryRecord)
         queue.swapping_out_size+=size
-        delivery.uow.on_flush { canceled =>
+        uow.on_flush { canceled =>
           queue.swap_out_completes_source.merge(^{
             this.swapped_out(!canceled)
             queue.swapping_out_size-=size
@@ -465,25 +465,25 @@ class QueueEntry(val queue:Queue, val se
           switch_to_swapped
         } else {
           swapping_out=true
-          delivery.uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out",
delivery.uow)
+          val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out")
 
           // Are we swapping out a non-persistent message?
           val flush = if( delivery.storeKey == -1 ) {
             delivery.storeLocator = new AtomicReference[Object]()
-            delivery.storeKey = delivery.uow.store(delivery.createMessageRecord )
+            delivery.storeKey = uow.store(delivery.createMessageRecord )
             false
           } else {
             true
           }
-          store_enqueue
+          store_enqueue(uow)
           if( asap ) {
             if ( flush ) {
               queue.virtual_host.store.flush_message(delivery.storeKey) {}
             } else {
-              delivery.uow.complete_asap
+              uow.complete_asap
             }
           }
-          delivery.uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out")
+          uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out")
         }
       }
     }
@@ -509,8 +509,6 @@ class QueueEntry(val queue:Queue, val se
         delivery.storeKey = -1
       }
 
-      delivery.uow = null
-
       if( swapping_out ) {
         swapping_out = false
         if( not_canceled ) {



Mime
View raw message