activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1447440 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala QueueEntry.scala store/DelayingStoreSupport.scala
Date Mon, 18 Feb 2013 19:10:41 GMT
Author: chirino
Date: Mon Feb 18 19:10:41 2013
New Revision: 1447440

URL: http://svn.apache.org/r1447440
Log:
Fixes failing stomp test cases.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1447440&r1=1447439&r2=1447440&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Mon Feb 18 19:10:41 2013
@@ -701,13 +701,6 @@ class Queue(val router: LocalRouter, val
         queue_delivery.seq = entry.seq
         entry.init(queue_delivery)
         
-        val uow = if( tune_persistent ) {
-          delivery.uow
-        } else {
-          null
-        }
-        queue_delivery.uow = uow
-
         entries.addLast(entry)
         enqueue_item_counter += 1
         enqueue_size_counter += entry.size
@@ -717,11 +710,15 @@ class Queue(val router: LocalRouter, val
         enqueue_remaining_take(entry.size)
 
         // Do we need to do a persistent enqueue???
-        if (uow != null) {
+        val uow = if( queue_delivery.persistent && tune_persistent ) {
+          queue_delivery.uow = create_uow(binding.binding_kind+":"+id+":offer", delivery.uow)
           entry.state match {
             case state:entry.Loaded => state.store_enqueue
-            case state:entry.Swapped => delivery.uow.enqueue(entry.toQueueEntryRecord)
+            case state:entry.Swapped => queue_delivery.uow.enqueue(entry.toQueueEntryRecord)
           }
+          queue_delivery.uow
+        } else {
+          null
         }
 
         if( entry.hasSubs ) {
@@ -1224,7 +1221,7 @@ class Queue(val router: LocalRouter, val
         if( delivery.message!=null ) {
           delivery.message.retain
         }
-        if( tune_persistent && delivery.uow!=null ) {
+        if( delivery.persistent && tune_persistent && delivery.uow!=null
) {
           delivery.uow.retain(binding.binding_kind+":"+id+":offer")
         }
         val rc = downstream.offer(delivery)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1447440&r1=1447439&r2=1447440&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Mon Feb 18 19:10:41 2013
@@ -465,31 +465,25 @@ class QueueEntry(val queue:Queue, val se
           switch_to_swapped
         } else {
           swapping_out=true
-          if( delivery.uow!=null ) {
-            assert( delivery.storeKey != -1 )
-            if( asap ) {
-              delivery.uow.complete_asap
-            }
+          delivery.uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out",
delivery.uow)
+
+          // Are we swapping out a non-persistent message?
+          val flush = if( delivery.storeKey == -1 ) {
+            delivery.storeLocator = new AtomicReference[Object]()
+            delivery.storeKey = delivery.uow.store(delivery.createMessageRecord )
+            false
           } else {
-            // Are we swapping out a non-persistent message?
-            if( delivery.storeKey == -1 ) {
-              val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out")
-              delivery.uow = uow
-              delivery.storeLocator = new AtomicReference[Object]()
-              delivery.storeKey = uow.store(delivery.createMessageRecord )
-              store_enqueue
-              if( asap ) {
-                uow.complete_asap
-              }
-              uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out")
+            true
+          }
+          store_enqueue
+          if( asap ) {
+            if ( flush ) {
+              queue.virtual_host.store.flush_message(delivery.storeKey) {}
             } else {
-              store_enqueue
-              if( asap ) {
-                queue.virtual_host.store.flush_message(delivery.storeKey) {
-                }
-              }
+              delivery.uow.complete_asap
             }
           }
+          delivery.uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out")
         }
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1447440&r1=1447439&r2=1447440&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Mon Feb 18 19:10:41 2013
@@ -254,6 +254,7 @@ trait DelayingStoreSupport extends Store
     }
 
     def enqueue(entry: QueueEntryRecord) = {
+      assert( !locator_based || entry.message_locator!=null )
       val a = this.synchronized {
         val a = action(entry.message_key)
         a.enqueues += entry



Mime
View raw message