activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1446639 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/...
Date Fri, 15 Feb 2013 15:42:38 GMT
Author: chirino
Date: Fri Feb 15 15:42:37 2013
New Revision: 1446639

URL: http://svn.apache.org/r1446639
Log:
Adding more debug info to uow retainment so we know who fails to release the UOW.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Fri Feb 15 15:42:37 2013
@@ -1249,14 +1249,14 @@ class AmqpProtocolHandler extends Protoc
 
     def commit(on_complete: => Unit) = {
       if( host.store!=null ) {
-        val uow = host.store.create_uow
+        val uow = host.store.create_uow(toString)
 //        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
         uow.on_complete {
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
           on_complete
         }
         queue.foreach{ _._1(uow) }
-        uow.release
+        uow.release(toString)
       } else {
         queue.foreach{ _._1(null) }
         on_complete

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Feb 15 15:42:37 2013
@@ -590,8 +590,8 @@ class Queue(val router: LocalRouter, val
 
 
   def is_topic_queue = resource_kind eq TopicQueueKind
-  def create_uow:StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow
-  def create_uow(uow:StoreUOW):StoreUOW = if(uow==null) create_uow else uow
+  def create_uow(owner:String):StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow(owner)
+  def create_uow(owner:String, uow:StoreUOW):StoreUOW = if(uow==null) create_uow(owner) else
{uow.retain(owner); uow}
 
   object messages extends Sink[(Session[Delivery], Delivery)] {
     def stall_check = {}
@@ -690,7 +690,7 @@ class Queue(val router: LocalRouter, val
             delivery.ack(if ( expired ) Expired else Undelivered, delivery.uow)
           }
           if( delivery.uow!=null ) {
-            delivery.uow.release()
+            delivery.uow.release(binding.binding_kind+":"+id)
           }
           return true
         }
@@ -746,7 +746,7 @@ class Queue(val router: LocalRouter, val
 
         // release the store batch...
         if (uow != null) {
-          uow.release
+          uow.release(binding.binding_kind+":"+id+":offer")
           queue_delivery.uow = null
         }
 
@@ -845,7 +845,7 @@ class Queue(val router: LocalRouter, val
             // remove the expired message if it has not been
             // acquired.
             if( !state.is_acquired ) {
-              val uow = create_uow
+              val uow = create_uow(binding.binding_kind+":"+id+":swap")
               entry.dequeue(uow)
               expired(uow, entry) {
                 if( entry.isLinked ) {
@@ -857,7 +857,7 @@ class Queue(val router: LocalRouter, val
             // remove the expired message if it has not been
             // acquired.
             if( !state.is_acquired ) {
-              val uow = create_uow
+              val uow = create_uow(binding.binding_kind+":"+id+":swap")
               entry.dequeue(uow)
               expired(uow, entry) {
                 if( entry.isLinked ) {
@@ -1090,9 +1090,9 @@ class Queue(val router: LocalRouter, val
           null
         } else {
           if( original_uow == null ) {
-            create_uow
+            create_uow(id)
           } else {
-            original_uow.retain()
+            original_uow.retain(binding.binding_kind+":"+id+":dlq")
             original_uow
           }
         }
@@ -1113,7 +1113,7 @@ class Queue(val router: LocalRouter, val
               val (delivery, callback) = value;
               callback(delivery.uow)
               if( delivery.uow!=null ) {
-                delivery.uow.release()
+                delivery.uow.release(binding.binding_kind+":"+id+":dlq")
               }
             }
           }
@@ -1153,10 +1153,11 @@ class Queue(val router: LocalRouter, val
           case Consumed =>
             entry.ack(uow)
           case Expired=>
-            val actual = create_uow(uow)
+            val actual = create_uow(binding.binding_kind+":"+id+":ack", uow)
             expired(actual, entry.entry) {
               entry.ack(actual)
             }
+            actual.release(binding.binding_kind+":"+id+":ack")
           case Delivered =>
             entry.increment_nack
             entry.entry.redelivered
@@ -1176,7 +1177,7 @@ class Queue(val router: LocalRouter, val
             }
         }
         if( uow!=null ) {
-          uow.release()
+          uow.release(binding.binding_kind+":"+id+":ack-merge")
         }
     }
   }
@@ -1224,7 +1225,7 @@ class Queue(val router: LocalRouter, val
           delivery.message.retain
         }
         if( tune_persistent && delivery.uow!=null ) {
-          delivery.uow.retain
+          delivery.uow.retain(binding.binding_kind+":"+id+":offer")
         }
         val rc = downstream.offer(delivery)
         assert(rc, "session should accept since it was not full")

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Fri Feb 15 15:42:37 2013
@@ -205,11 +205,9 @@ class QueueEntry(val queue:Queue, val se
   def dequeue(uow: StoreUOW) = {
     if ( queued ) {
       if (messageKey != -1) {
-        val actual_uow = queue.create_uow(uow)
+        val actual_uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dequeue",
uow)
         actual_uow.dequeue(toQueueEntryRecord)
-        if( uow == null ) {
-          actual_uow.release
-        }
+        actual_uow.release(queue.binding.binding_kind+":"+queue.id+":dequeue")
       }
       queue.dequeue_item_counter += 1
       queue.dequeue_size_counter += size
@@ -475,7 +473,7 @@ class QueueEntry(val queue:Queue, val se
           } else {
             // Are we swapping out a non-persistent message?
             if( delivery.storeKey == -1 ) {
-              val uow = queue.create_uow
+              val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out")
               delivery.uow = uow
               delivery.storeLocator = new AtomicReference[Object]()
               delivery.storeKey = uow.store(delivery.createMessageRecord )
@@ -483,7 +481,7 @@ class QueueEntry(val queue:Queue, val se
               if( asap ) {
                 uow.complete_asap
               }
-              uow.release()
+              uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out")
             } else {
               store_enqueue
               if( asap ) {
@@ -579,13 +577,14 @@ class QueueEntry(val queue:Queue, val se
       queue.assert_executing
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
-        val uow = queue.create_uow
+        val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dispatch-expired")
         entry.dequeue(uow)
         queue.expired(uow, entry) {
           if( isLinked ) {
             remove
           }
         }
+        uow.release(queue.binding.binding_kind+":"+queue.id+":dispatch-expired")
         return true
       }
 
@@ -677,7 +676,7 @@ class QueueEntry(val queue:Queue, val se
 
                 acquiredDelivery.ack = (consumed, uow)=> {
                   if( uow!=null ) {
-                    uow.retain()
+                    uow.retain(queue.binding.binding_kind+":"+queue.id+":ack-merge")
                   }
                   queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
                 }
@@ -848,13 +847,14 @@ class QueueEntry(val queue:Queue, val se
       queue.assert_executing
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
-        val uow = queue.create_uow
+        val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":expire")
         entry.dequeue(uow)
         queue.expired(uow, entry) {
           if( isLinked ) {
             remove
           }
         }
+        uow.release(queue.binding.binding_kind+":"+queue.id+":expire")
         return true
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Fri Feb 15 15:42:37 2013
@@ -293,7 +293,7 @@ abstract class DeliveryProducerRoute(rou
       false
     } else {
       if (delivery.uow != null) {
-        delivery.uow.retain()
+        delivery.uow.retain("route:"+dispatch_queue.getLabel+":offer")
       }
       if ( !is_connected ) {
         overflow = delivery
@@ -345,7 +345,7 @@ abstract class DeliveryProducerRoute(rou
         if ( target.consumer.is_persistent && copy.persistent && store !=
null) {
 
           if (copy.uow == null) {
-            copy.uow = store.create_uow
+            copy.uow = store.create_uow("route:"+dispatch_queue.getLabel+":offer")
           }
 
           if( copy.storeKey == -1L ) {
@@ -375,7 +375,7 @@ abstract class DeliveryProducerRoute(rou
 
   private def release(delivery: Delivery): Unit = {
     if (delivery.uow != null) {
-      delivery.uow.release
+      delivery.uow.release("route:"+dispatch_queue.getLabel+":offer")
     }
     if( delivery.message!=null ) {
       delivery.message.release

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Fri Feb 15 15:42:37 2013
@@ -108,10 +108,26 @@ trait DelayingStoreSupport extends Store
   // Implementation of the StoreBatch interface
   //
   /////////////////////////////////////////////////////////////////////
-  def create_uow() = new DelayableUOW
+  def create_uow(owner:String) = {
+    val rc = new DelayableUOW
+    rc.owners.add(owner)
+    rc
+  }
 
   class DelayableUOW extends BaseRetained with StoreUOW {
 
+    val owners = scala.collection.mutable.HashSet[String]()
+
+    def release(owner: String) {
+      owners.remove(owner)
+      super.release()
+    }
+
+    def retain(owner: String) {
+      owners.add(owner)
+      super.retain()
+    }
+
     class MessageAction {
 
       var msg= 0L
@@ -359,7 +375,7 @@ trait DelayingStoreSupport extends Store
     out.println("--- Pending Stores Details ---")
     out.println("flush_source suspended: "+flush_source.isSuspended)
     pending_stores.valuesIterator.foreach{ action =>
-      out.println("uow: %d, state:%s, delayable:%s, canceled:%s".format(action.uow.uow_id,
action.uow.state, action.uow.delayable, action.uow.canceled))
+      out.println("uow: %d, state:%s, owners:%s".format(action.uow.uow_id, action.uow.state,
action.uow.owners))
     }
     writer.toString
   }
@@ -508,6 +524,9 @@ trait DelayingStoreSupport extends Store
         None
       } else {
         uow.state = UowFlushing
+        if(!( !locator_based || uow.have_locators )) {
+          println("crap")
+        }
         assert( !locator_based || uow.have_locators )
         // It will not be possible to cancel the UOW anymore..
         uow.actions.foreach { case (_, action) =>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
Fri Feb 15 15:42:37 2013
@@ -86,11 +86,11 @@ case class PersistentLongCounter(name:St
   def update(value: Long)(on_complete: =>Unit) {
     val s = store
     if (s!=null) {
-      val uow = s.create_uow()
+      val uow = s.create_uow(toString)
       uow.put(key, encode(value))
       uow.complete_asap()
       uow.on_complete(on_complete)
-      uow.release()
+      uow.release(toString)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Fri Feb 15 15:42:37 2013
@@ -123,7 +123,7 @@ trait Store extends ServiceTrait {
    * Creates a store uow which is used to perform persistent
    * operations as unit of work.
    */
-  def create_uow():StoreUOW
+  def create_uow(owner:String):StoreUOW
 
   /**
    * Removes all previously stored data.

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Fri Feb 15 15:42:37 2013
@@ -35,7 +35,10 @@ import org.fusesource.hawtbuf.Buffer
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait StoreUOW extends Retained {
+trait StoreUOW {
+
+  def release(owner:String);
+  def retain(owner:String);
 
   /**
    * Stores a message.  Messages a reference counted, so make sure you also

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
Fri Feb 15 15:42:37 2013
@@ -110,7 +110,7 @@ abstract class StoreFunSuiteSupport exte
   }
 
   def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
-    var batch = store.create_uow
+    var batch = store.create_uow("")
     var msg_keys = ListBuffer[(Long, AtomicReference[Object], Long)]()
     var next_seq = first_seq
 
@@ -125,7 +125,7 @@ abstract class StoreFunSuiteSupport exte
 
     val task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release
+    batch.release("")
 
     msg_keys.foreach { msgKey =>
       store.flush_message(msgKey._1) {}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
Fri Feb 15 15:42:37 2013
@@ -119,7 +119,7 @@ abstract class StoreTests extends StoreF
   test("batch completes after a delay") {x}
   def x = {
     val A = add_queue("A")
-    var batch = store.create_uow
+    var batch = store.create_uow("")
 
     val m1 = add_message(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))
@@ -127,7 +127,7 @@ abstract class StoreTests extends StoreF
     val tracker = new TaskTracker("unknown", 0)
     val task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release
+    batch.release("")
 
     expect(false) {
       tracker.await(3, TimeUnit.SECONDS)
@@ -139,7 +139,7 @@ abstract class StoreTests extends StoreF
 
   test("flush cancels the delay") {
     val A = add_queue("A")
-    var batch = store.create_uow
+    var batch = store.create_uow("")
 
     val m1 = add_message(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))
@@ -147,7 +147,7 @@ abstract class StoreTests extends StoreF
     val tracker = new TaskTracker("unknown", 0)
     val task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release
+    batch.release("")
 
     store.flush_message(m1._1) {}
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
Fri Feb 15 15:42:37 2013
@@ -45,7 +45,7 @@ class UowHaveLocatorsTest extends StoreF
 
   test("APLO-201: Persistent Store: UOW with message locator and no message (previously flushed)"){
     val queue = add_queue("A")
-    val batch = store.create_uow
+    val batch = store.create_uow("")
     val m1 = add_message(batch, "Hello!")
     val queueEntryRecord: QueueEntryRecord =  entry(queue, 1, m1)
     batch.enqueue(queueEntryRecord)
@@ -53,7 +53,7 @@ class UowHaveLocatorsTest extends StoreF
     var tracker = new TaskTracker("uknown", 0)
     var task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release
+    batch.release("")
 
     assert(queueEntryRecord.message_locator.get() == null)
 
@@ -62,13 +62,13 @@ class UowHaveLocatorsTest extends StoreF
     }
     assert(queueEntryRecord.message_locator.get() != null)
 
-    val batch2 = store.create_uow
+    val batch2 = store.create_uow("")
     batch2.enqueue(queueEntryRecord)
 
     tracker = new TaskTracker("uknown", 0)
     task = tracker.task("uow complete")
     batch2.on_complete(task.run)
-    batch2.release
+    batch2.release("")
 
     expect(true) {
       tracker.await(2, TimeUnit.SECONDS)

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
Fri Feb 15 15:42:37 2013
@@ -466,7 +466,7 @@ object MqttSessionManager {
     case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends StorageStrategy {
       val session_key = new UTF8Buffer("mqtt:"+client_id)
       def update(cb: =>Unit) = {
-        val uow = store.create_uow()
+        val uow = store.create_uow(toString)
         val session_pb = new SessionPB.Bean
         session_pb.setClientId(client_id)
         received_message_ids.foreach(session_pb.addReceivedMessageIds(_))
@@ -485,11 +485,11 @@ object MqttSessionManager {
             cb
           }
         }
-        uow.release()
+        uow.release(toString)
       }
 
       def destroy(cb: =>Unit) {
-        val uow = store.create_uow()
+        val uow = store.create_uow(toString)
         uow.put(session_key, null)
         val current = getCurrentQueue
         uow.on_complete {
@@ -498,7 +498,7 @@ object MqttSessionManager {
             cb
           }
         }
-        uow.release()
+        uow.release(toString)
       }
       def create(store:Store, client_id:UTF8Buffer) = {
       }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Fri Feb 15 15:42:37 2013
@@ -651,7 +651,7 @@ class OpenwireProtocolHandler extends Pr
         val route = OpenwireDeliveryProducerRoute(addresses)
 
         if( uow!=null ) {
-          uow.retain()
+          uow.retain(toString)
         }
         // don't process frames until producer is connected...
         suspend_read("connecting producer route")
@@ -669,7 +669,7 @@ class OpenwireProtocolHandler extends Pr
                 }
             }
             if( uow!=null ) {
-              uow.release()
+              uow.release(toString)
             }
           }
         }
@@ -1240,7 +1240,7 @@ class OpenwireProtocolHandler extends Pr
     def commit(onComplete: => Unit) = {
 
       val uow = if( host.store!=null ) {
-        host.store.create_uow
+        host.store.create_uow(toString)
       } else {
         null
       }
@@ -1251,7 +1251,7 @@ class OpenwireProtocolHandler extends Pr
 
       if( uow!=null ) {
         uow.on_complete(dispatchQueue{ onComplete })
-        uow.release
+        uow.release(toString)
       } else {
         onComplete
       }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Feb 15 15:42:37 2013
@@ -1244,7 +1244,7 @@ class StompProtocolHandler extends Proto
         val route = new StompProducerRoute(trimmed_dest)   // don't process frames until
producer is connected...
         suspend_read("Connecting to destination")
         if( uow !=null ) {
-          uow.retain()
+          uow.retain(toString+":connecting")
         }
         host.dispatch_queue {
           val rc = host.router.connect(route.addresses, route, security_context)
@@ -1261,7 +1261,7 @@ class StompProtocolHandler extends Proto
                 }
             }
             if( uow !=null ) {
-              uow.release()
+              uow.release(toString+":connecting")
             }
           }
         }
@@ -1696,14 +1696,14 @@ class StompProtocolHandler extends Proto
 
     def commit(on_complete: => Unit) = {
       if( host.store!=null ) {
-        val uow = host.store.create_uow
+        val uow = host.store.create_uow(toString+":commit")
 //        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
         uow.on_complete {
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
           on_complete
         }
         queue.foreach{ _._1(uow) }
-        uow.release
+        uow.release(toString+":commit")
       } else {
         queue.foreach{ _._1(null) }
         on_complete



Mime
View raw message