Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EA490D5FE for ; Fri, 15 Feb 2013 15:43:05 +0000 (UTC) Received: (qmail 610 invoked by uid 500); 15 Feb 2013 15:43:05 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 530 invoked by uid 500); 15 Feb 2013 15:43:05 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 523 invoked by uid 99); 15 Feb 2013 15:43:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Feb 2013 15:43:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Feb 2013 15:42:59 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 38EAF23889D5; Fri, 15 Feb 2013 15:42:39 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1446639 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/... Date: Fri, 15 Feb 2013 15:42:38 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130215154239.38EAF23889D5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Fri Feb 15 15:42:37 2013 New Revision: 1446639 URL: http://svn.apache.org/r1446639 Log: Adding more debug info to uow retainment so we know who fails to release the UOW. Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala Fri Feb 15 15:42:37 2013 @@ -1249,14 +1249,14 @@ class AmqpProtocolHandler extends Protoc def commit(on_complete: => Unit) = { if( host.store!=null ) { - val uow = host.store.create_uow + val uow = host.store.create_uow(toString) // println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id) uow.on_complete { // println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id) on_complete } queue.foreach{ _._1(uow) } - uow.release + uow.release(toString) } else { queue.foreach{ _._1(null) } on_complete Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Fri Feb 15 15:42:37 2013 @@ -590,8 +590,8 @@ class Queue(val router: LocalRouter, val def is_topic_queue = resource_kind eq TopicQueueKind - def create_uow:StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow - def create_uow(uow:StoreUOW):StoreUOW = if(uow==null) create_uow else uow + def create_uow(owner:String):StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow(owner) + def create_uow(owner:String, uow:StoreUOW):StoreUOW = if(uow==null) create_uow(owner) else {uow.retain(owner); uow} object messages extends Sink[(Session[Delivery], Delivery)] { def stall_check = {} @@ -690,7 +690,7 @@ class Queue(val router: LocalRouter, val delivery.ack(if ( expired ) Expired else Undelivered, delivery.uow) } if( delivery.uow!=null ) { - delivery.uow.release() + delivery.uow.release(binding.binding_kind+":"+id) } return true } @@ -746,7 +746,7 @@ class Queue(val router: LocalRouter, val // release the store batch... if (uow != null) { - uow.release + uow.release(binding.binding_kind+":"+id+":offer") queue_delivery.uow = null } @@ -845,7 +845,7 @@ class Queue(val router: LocalRouter, val // remove the expired message if it has not been // acquired. if( !state.is_acquired ) { - val uow = create_uow + val uow = create_uow(binding.binding_kind+":"+id+":swap") entry.dequeue(uow) expired(uow, entry) { if( entry.isLinked ) { @@ -857,7 +857,7 @@ class Queue(val router: LocalRouter, val // remove the expired message if it has not been // acquired. if( !state.is_acquired ) { - val uow = create_uow + val uow = create_uow(binding.binding_kind+":"+id+":swap") entry.dequeue(uow) expired(uow, entry) { if( entry.isLinked ) { @@ -1090,9 +1090,9 @@ class Queue(val router: LocalRouter, val null } else { if( original_uow == null ) { - create_uow + create_uow(id) } else { - original_uow.retain() + original_uow.retain(binding.binding_kind+":"+id+":dlq") original_uow } } @@ -1113,7 +1113,7 @@ class Queue(val router: LocalRouter, val val (delivery, callback) = value; callback(delivery.uow) if( delivery.uow!=null ) { - delivery.uow.release() + delivery.uow.release(binding.binding_kind+":"+id+":dlq") } } } @@ -1153,10 +1153,11 @@ class Queue(val router: LocalRouter, val case Consumed => entry.ack(uow) case Expired=> - val actual = create_uow(uow) + val actual = create_uow(binding.binding_kind+":"+id+":ack", uow) expired(actual, entry.entry) { entry.ack(actual) } + actual.release(binding.binding_kind+":"+id+":ack") case Delivered => entry.increment_nack entry.entry.redelivered @@ -1176,7 +1177,7 @@ class Queue(val router: LocalRouter, val } } if( uow!=null ) { - uow.release() + uow.release(binding.binding_kind+":"+id+":ack-merge") } } } @@ -1224,7 +1225,7 @@ class Queue(val router: LocalRouter, val delivery.message.retain } if( tune_persistent && delivery.uow!=null ) { - delivery.uow.retain + delivery.uow.retain(binding.binding_kind+":"+id+":offer") } val rc = downstream.offer(delivery) assert(rc, "session should accept since it was not full") Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala Fri Feb 15 15:42:37 2013 @@ -205,11 +205,9 @@ class QueueEntry(val queue:Queue, val se def dequeue(uow: StoreUOW) = { if ( queued ) { if (messageKey != -1) { - val actual_uow = queue.create_uow(uow) + val actual_uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dequeue", uow) actual_uow.dequeue(toQueueEntryRecord) - if( uow == null ) { - actual_uow.release - } + actual_uow.release(queue.binding.binding_kind+":"+queue.id+":dequeue") } queue.dequeue_item_counter += 1 queue.dequeue_size_counter += size @@ -475,7 +473,7 @@ class QueueEntry(val queue:Queue, val se } else { // Are we swapping out a non-persistent message? if( delivery.storeKey == -1 ) { - val uow = queue.create_uow + val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out") delivery.uow = uow delivery.storeLocator = new AtomicReference[Object]() delivery.storeKey = uow.store(delivery.createMessageRecord ) @@ -483,7 +481,7 @@ class QueueEntry(val queue:Queue, val se if( asap ) { uow.complete_asap } - uow.release() + uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out") } else { store_enqueue if( asap ) { @@ -579,13 +577,14 @@ class QueueEntry(val queue:Queue, val se queue.assert_executing if( !is_acquired && expiration != 0 && expiration <= queue.now ) { - val uow = queue.create_uow + val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dispatch-expired") entry.dequeue(uow) queue.expired(uow, entry) { if( isLinked ) { remove } } + uow.release(queue.binding.binding_kind+":"+queue.id+":dispatch-expired") return true } @@ -677,7 +676,7 @@ class QueueEntry(val queue:Queue, val se acquiredDelivery.ack = (consumed, uow)=> { if( uow!=null ) { - uow.retain() + uow.retain(queue.binding.binding_kind+":"+queue.id+":ack-merge") } queue.ack_source.merge((acquiredQueueEntry, consumed, uow)) } @@ -848,13 +847,14 @@ class QueueEntry(val queue:Queue, val se queue.assert_executing if( !is_acquired && expiration != 0 && expiration <= queue.now ) { - val uow = queue.create_uow + val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":expire") entry.dequeue(uow) queue.expired(uow, entry) { if( isLinked ) { remove } } + uow.release(queue.binding.binding_kind+":"+queue.id+":expire") return true } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Feb 15 15:42:37 2013 @@ -293,7 +293,7 @@ abstract class DeliveryProducerRoute(rou false } else { if (delivery.uow != null) { - delivery.uow.retain() + delivery.uow.retain("route:"+dispatch_queue.getLabel+":offer") } if ( !is_connected ) { overflow = delivery @@ -345,7 +345,7 @@ abstract class DeliveryProducerRoute(rou if ( target.consumer.is_persistent && copy.persistent && store != null) { if (copy.uow == null) { - copy.uow = store.create_uow + copy.uow = store.create_uow("route:"+dispatch_queue.getLabel+":offer") } if( copy.storeKey == -1L ) { @@ -375,7 +375,7 @@ abstract class DeliveryProducerRoute(rou private def release(delivery: Delivery): Unit = { if (delivery.uow != null) { - delivery.uow.release + delivery.uow.release("route:"+dispatch_queue.getLabel+":offer") } if( delivery.message!=null ) { delivery.message.release Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Fri Feb 15 15:42:37 2013 @@ -108,10 +108,26 @@ trait DelayingStoreSupport extends Store // Implementation of the StoreBatch interface // ///////////////////////////////////////////////////////////////////// - def create_uow() = new DelayableUOW + def create_uow(owner:String) = { + val rc = new DelayableUOW + rc.owners.add(owner) + rc + } class DelayableUOW extends BaseRetained with StoreUOW { + val owners = scala.collection.mutable.HashSet[String]() + + def release(owner: String) { + owners.remove(owner) + super.release() + } + + def retain(owner: String) { + owners.add(owner) + super.retain() + } + class MessageAction { var msg= 0L @@ -359,7 +375,7 @@ trait DelayingStoreSupport extends Store out.println("--- Pending Stores Details ---") out.println("flush_source suspended: "+flush_source.isSuspended) pending_stores.valuesIterator.foreach{ action => - out.println("uow: %d, state:%s, delayable:%s, canceled:%s".format(action.uow.uow_id, action.uow.state, action.uow.delayable, action.uow.canceled)) + out.println("uow: %d, state:%s, owners:%s".format(action.uow.uow_id, action.uow.state, action.uow.owners)) } writer.toString } @@ -508,6 +524,9 @@ trait DelayingStoreSupport extends Store None } else { uow.state = UowFlushing + if(!( !locator_based || uow.have_locators )) { + println("crap") + } assert( !locator_based || uow.have_locators ) // It will not be possible to cancel the UOW anymore.. uow.actions.foreach { case (_, action) => Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala Fri Feb 15 15:42:37 2013 @@ -86,11 +86,11 @@ case class PersistentLongCounter(name:St def update(value: Long)(on_complete: =>Unit) { val s = store if (s!=null) { - val uow = s.create_uow() + val uow = s.create_uow(toString) uow.put(key, encode(value)) uow.complete_asap() uow.on_complete(on_complete) - uow.release() + uow.release(toString) } } Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala Fri Feb 15 15:42:37 2013 @@ -123,7 +123,7 @@ trait Store extends ServiceTrait { * Creates a store uow which is used to perform persistent * operations as unit of work. */ - def create_uow():StoreUOW + def create_uow(owner:String):StoreUOW /** * Removes all previously stored data. Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala Fri Feb 15 15:42:37 2013 @@ -35,7 +35,10 @@ import org.fusesource.hawtbuf.Buffer * * @author Hiram Chirino */ -trait StoreUOW extends Retained { +trait StoreUOW { + + def release(owner:String); + def retain(owner:String); /** * Stores a message. Messages a reference counted, so make sure you also Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Feb 15 15:42:37 2013 @@ -110,7 +110,7 @@ abstract class StoreFunSuiteSupport exte } def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = { - var batch = store.create_uow + var batch = store.create_uow("") var msg_keys = ListBuffer[(Long, AtomicReference[Object], Long)]() var next_seq = first_seq @@ -125,7 +125,7 @@ abstract class StoreFunSuiteSupport exte val task = tracker.task("uow complete") batch.on_complete(task.run) - batch.release + batch.release("") msg_keys.foreach { msgKey => store.flush_message(msgKey._1) {} Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala Fri Feb 15 15:42:37 2013 @@ -119,7 +119,7 @@ abstract class StoreTests extends StoreF test("batch completes after a delay") {x} def x = { val A = add_queue("A") - var batch = store.create_uow + var batch = store.create_uow("") val m1 = add_message(batch, "message 1") batch.enqueue(entry(A, 1, m1)) @@ -127,7 +127,7 @@ abstract class StoreTests extends StoreF val tracker = new TaskTracker("unknown", 0) val task = tracker.task("uow complete") batch.on_complete(task.run) - batch.release + batch.release("") expect(false) { tracker.await(3, TimeUnit.SECONDS) @@ -139,7 +139,7 @@ abstract class StoreTests extends StoreF test("flush cancels the delay") { val A = add_queue("A") - var batch = store.create_uow + var batch = store.create_uow("") val m1 = add_message(batch, "message 1") batch.enqueue(entry(A, 1, m1)) @@ -147,7 +147,7 @@ abstract class StoreTests extends StoreF val tracker = new TaskTracker("unknown", 0) val task = tracker.task("uow complete") batch.on_complete(task.run) - batch.release + batch.release("") store.flush_message(m1._1) {} Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala Fri Feb 15 15:42:37 2013 @@ -45,7 +45,7 @@ class UowHaveLocatorsTest extends StoreF test("APLO-201: Persistent Store: UOW with message locator and no message (previously flushed)"){ val queue = add_queue("A") - val batch = store.create_uow + val batch = store.create_uow("") val m1 = add_message(batch, "Hello!") val queueEntryRecord: QueueEntryRecord = entry(queue, 1, m1) batch.enqueue(queueEntryRecord) @@ -53,7 +53,7 @@ class UowHaveLocatorsTest extends StoreF var tracker = new TaskTracker("uknown", 0) var task = tracker.task("uow complete") batch.on_complete(task.run) - batch.release + batch.release("") assert(queueEntryRecord.message_locator.get() == null) @@ -62,13 +62,13 @@ class UowHaveLocatorsTest extends StoreF } assert(queueEntryRecord.message_locator.get() != null) - val batch2 = store.create_uow + val batch2 = store.create_uow("") batch2.enqueue(queueEntryRecord) tracker = new TaskTracker("uknown", 0) task = tracker.task("uow complete") batch2.on_complete(task.run) - batch2.release + batch2.release("") expect(true) { tracker.await(2, TimeUnit.SECONDS) Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala Fri Feb 15 15:42:37 2013 @@ -466,7 +466,7 @@ object MqttSessionManager { case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends StorageStrategy { val session_key = new UTF8Buffer("mqtt:"+client_id) def update(cb: =>Unit) = { - val uow = store.create_uow() + val uow = store.create_uow(toString) val session_pb = new SessionPB.Bean session_pb.setClientId(client_id) received_message_ids.foreach(session_pb.addReceivedMessageIds(_)) @@ -485,11 +485,11 @@ object MqttSessionManager { cb } } - uow.release() + uow.release(toString) } def destroy(cb: =>Unit) { - val uow = store.create_uow() + val uow = store.create_uow(toString) uow.put(session_key, null) val current = getCurrentQueue uow.on_complete { @@ -498,7 +498,7 @@ object MqttSessionManager { cb } } - uow.release() + uow.release(toString) } def create(store:Store, client_id:UTF8Buffer) = { } Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Fri Feb 15 15:42:37 2013 @@ -651,7 +651,7 @@ class OpenwireProtocolHandler extends Pr val route = OpenwireDeliveryProducerRoute(addresses) if( uow!=null ) { - uow.retain() + uow.retain(toString) } // don't process frames until producer is connected... suspend_read("connecting producer route") @@ -669,7 +669,7 @@ class OpenwireProtocolHandler extends Pr } } if( uow!=null ) { - uow.release() + uow.release(toString) } } } @@ -1240,7 +1240,7 @@ class OpenwireProtocolHandler extends Pr def commit(onComplete: => Unit) = { val uow = if( host.store!=null ) { - host.store.create_uow + host.store.create_uow(toString) } else { null } @@ -1251,7 +1251,7 @@ class OpenwireProtocolHandler extends Pr if( uow!=null ) { uow.on_complete(dispatchQueue{ onComplete }) - uow.release + uow.release(toString) } else { onComplete } Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1446639&r1=1446638&r2=1446639&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Fri Feb 15 15:42:37 2013 @@ -1244,7 +1244,7 @@ class StompProtocolHandler extends Proto val route = new StompProducerRoute(trimmed_dest) // don't process frames until producer is connected... suspend_read("Connecting to destination") if( uow !=null ) { - uow.retain() + uow.retain(toString+":connecting") } host.dispatch_queue { val rc = host.router.connect(route.addresses, route, security_context) @@ -1261,7 +1261,7 @@ class StompProtocolHandler extends Proto } } if( uow !=null ) { - uow.release() + uow.release(toString+":connecting") } } } @@ -1696,14 +1696,14 @@ class StompProtocolHandler extends Proto def commit(on_complete: => Unit) = { if( host.store!=null ) { - val uow = host.store.create_uow + val uow = host.store.create_uow(toString+":commit") // println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id) uow.on_complete { // println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id) on_complete } queue.foreach{ _._1(uow) } - uow.release + uow.release(toString+":commit") } else { queue.foreach{ _._1(null) } on_complete