activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1448756 - in /activemq/activemq-apollo/trunk: apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/...
Date Thu, 21 Feb 2013 18:23:17 GMT
Author: chirino
Date: Thu Feb 21 18:23:16 2013
New Revision: 1448756

URL: http://svn.apache.org/r1448756
Log:
Remove the extra uow retain/release debugging since those bugs have been stored.

Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
    activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js

Modified: activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/src/main/scala/org/apache/activemq/apollo/amqp/AmqpProtocolHandler.scala
Thu Feb 21 18:23:16 2013
@@ -1250,14 +1250,14 @@ class AmqpProtocolHandler extends Protoc
 
     def commit(on_complete: => Unit) = {
       if( host.store!=null ) {
-        val uow = host.store.create_uow(toString)
+        val uow = host.store.create_uow
 //        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
         uow.on_complete {
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
           on_complete
         }
         queue.foreach{ _._1(uow) }
-        uow.release(toString)
+        uow.release
       } else {
         queue.foreach{ _._1(null) }
         on_complete

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Feb 21 18:23:16 2013
@@ -591,8 +591,8 @@ class Queue(val router: LocalRouter, val
 
 
   def is_topic_queue = resource_kind eq TopicQueueKind
-  def create_uow(owner:String):StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow(owner)
-  def create_uow(owner:String, uow:StoreUOW):StoreUOW = if(uow==null) create_uow(owner) else
{uow.retain(owner); uow}
+  def create_uow:StoreUOW = if(virtual_host.store==null) null else virtual_host.store.create_uow
+  def create_uow(uow:StoreUOW):StoreUOW = if(uow==null) create_uow else {uow.retain; uow}
 
   object messages extends Sink[(Session[Delivery], Delivery)] {
     def stall_check = {}
@@ -692,7 +692,7 @@ class Queue(val router: LocalRouter, val
           }
           if( delivery.persistent && tune_persistent ) {
             assert(delivery.uow!=null)
-            delivery.uow.release(binding.binding_kind+":"+id+":offer")
+            delivery.uow.release
           }
           return true
         }
@@ -746,7 +746,7 @@ class Queue(val router: LocalRouter, val
 
         // release the store batch...
         if (uow != null) {
-          uow.release(binding.binding_kind+":"+id+":offer")
+          uow.release
         }
 
         
@@ -844,7 +844,7 @@ class Queue(val router: LocalRouter, val
             // remove the expired message if it has not been
             // acquired.
             if( !state.is_acquired ) {
-              val uow = create_uow(binding.binding_kind+":"+id+":swap")
+              val uow = create_uow
               entry.dequeue(uow)
               expired(uow, entry) {
                 if( entry.isLinked ) {
@@ -856,7 +856,7 @@ class Queue(val router: LocalRouter, val
             // remove the expired message if it has not been
             // acquired.
             if( !state.is_acquired ) {
-              val uow = create_uow(binding.binding_kind+":"+id+":swap")
+              val uow = create_uow
               entry.dequeue(uow)
               expired(uow, entry) {
                 if( entry.isLinked ) {
@@ -1088,7 +1088,7 @@ class Queue(val router: LocalRouter, val
         delivery.uow = if(delivery.storeKey == -1) {
           null
         } else {
-          create_uow(binding.binding_kind+":"+id+":dlq", original_uow)
+          create_uow(original_uow)
         }
         delivery.expiration=0
 
@@ -1107,7 +1107,7 @@ class Queue(val router: LocalRouter, val
               val (delivery, callback) = value;
               callback(delivery.uow)
               if( delivery.uow!=null ) {
-                delivery.uow.release(binding.binding_kind+":"+id+":dlq")
+                delivery.uow.release
                 delivery.uow = null
               }
             }
@@ -1148,11 +1148,11 @@ class Queue(val router: LocalRouter, val
           case Consumed =>
             entry.ack(uow)
           case Expired=>
-            val actual = create_uow(binding.binding_kind+":"+id+":ack", uow)
+            val actual = create_uow(uow)
             expired(actual, entry.entry) {
               entry.ack(actual)
             }
-            actual.release(binding.binding_kind+":"+id+":ack")
+            actual.release
           case Delivered =>
             entry.increment_nack
             entry.entry.redelivered
@@ -1172,7 +1172,7 @@ class Queue(val router: LocalRouter, val
             }
         }
         if( uow!=null ) {
-          uow.release(binding.binding_kind+":"+id+":ack-merge:"+entry.entry.seq)
+          uow.release
         }
     }
   }
@@ -1220,7 +1220,7 @@ class Queue(val router: LocalRouter, val
           delivery.message.retain
         }
         if( delivery.persistent && tune_persistent ) {
-          delivery.uow = create_uow(binding.binding_kind+":"+id+":offer", delivery.uow)
+          delivery.uow = create_uow(delivery.uow)
         }
         val rc = downstream.offer(delivery)
         assert(rc, "session should accept since it was not full")

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueEntry.scala
Thu Feb 21 18:23:16 2013
@@ -205,9 +205,9 @@ class QueueEntry(val queue:Queue, val se
   def dequeue(uow: StoreUOW) = {
     if ( queued ) {
       if (messageKey != -1) {
-        val actual_uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dequeue",
uow)
+        val actual_uow = queue.create_uow(uow)
         actual_uow.dequeue(toQueueEntryRecord)
-        actual_uow.release(queue.binding.binding_kind+":"+queue.id+":dequeue")
+        actual_uow.release
       }
       queue.dequeue_item_counter += 1
       queue.dequeue_size_counter += size
@@ -465,7 +465,7 @@ class QueueEntry(val queue:Queue, val se
           switch_to_swapped
         } else {
           swapping_out=true
-          val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":swap_out")
+          val uow = queue.create_uow
 
           // Are we swapping out a non-persistent message?
           val flush = if( delivery.storeKey == -1 ) {
@@ -483,7 +483,7 @@ class QueueEntry(val queue:Queue, val se
               uow.complete_asap
             }
           }
-          uow.release(queue.binding.binding_kind+":"+queue.id+":swap_out")
+          uow.release
         }
       }
     }
@@ -570,14 +570,14 @@ class QueueEntry(val queue:Queue, val se
       queue.assert_executing
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
-        val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":dispatch-expired")
+        val uow = queue.create_uow
         entry.dequeue(uow)
         queue.expired(uow, entry) {
           if( isLinked ) {
             remove
           }
         }
-        uow.release(queue.binding.binding_kind+":"+queue.id+":dispatch-expired")
+        uow.release
         return true
       }
 
@@ -669,7 +669,7 @@ class QueueEntry(val queue:Queue, val se
 
                 acquiredDelivery.ack = (consumed, uow)=> {
                   if( uow!=null ) {
-                    uow.retain(queue.binding.binding_kind+":"+queue.id+":ack-merge:"+seq)
+                    uow.retain
                   }
                   queue.ack_source.merge((acquiredQueueEntry, consumed, uow))
                 }
@@ -840,14 +840,14 @@ class QueueEntry(val queue:Queue, val se
       queue.assert_executing
 
       if( !is_acquired && expiration != 0 && expiration <= queue.now )
{
-        val uow = queue.create_uow(queue.binding.binding_kind+":"+queue.id+":expire")
+        val uow = queue.create_uow
         entry.dequeue(uow)
         queue.expired(uow, entry) {
           if( isLinked ) {
             remove
           }
         }
-        uow.release(queue.binding.binding_kind+":"+queue.id+":expire")
+        uow.release
         return true
       }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Thu Feb 21 18:23:16 2013
@@ -294,7 +294,7 @@ abstract class DeliveryProducerRoute(rou
       false
     } else {
       if (delivery.uow != null) {
-        delivery.uow.retain("route:"+dispatch_queue.getLabel+":offer")
+        delivery.uow.retain
       }
       if ( !is_connected ) {
         overflow = delivery
@@ -346,7 +346,7 @@ abstract class DeliveryProducerRoute(rou
         if ( target.consumer.is_persistent && copy.persistent && store !=
null) {
 
           if (copy.uow == null) {
-            copy.uow = store.create_uow("route:"+dispatch_queue.getLabel+":offer")
+            copy.uow = store.create_uow
           }
 
           if( copy.storeKey == -1L ) {
@@ -376,7 +376,7 @@ abstract class DeliveryProducerRoute(rou
 
   private def release(delivery: Delivery): Unit = {
     if (delivery.uow != null) {
-      delivery.uow.release("route:"+dispatch_queue.getLabel+":offer")
+      delivery.uow.release
     }
     if( delivery.message!=null ) {
       delivery.message.release

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Thu Feb 21 18:23:16 2013
@@ -108,37 +108,12 @@ trait DelayingStoreSupport extends Store
   // Implementation of the StoreBatch interface
   //
   /////////////////////////////////////////////////////////////////////
-  def create_uow(owner:String) = {
-    val rc = new DelayableUOW
-    rc.owners.add(owner)
-    rc
-  }
+  def create_uow = new DelayableUOW
 
   class DelayableUOW extends BaseRetained with StoreUOW {
 
     override def toString: String = uow_id.toString
 
-    val owners = scala.collection.mutable.HashSet[String]()
-
-    def release(owner: String) {
-      this.synchronized {
-        if( !owners.remove(owner) ) {
-          warn("UOW owner already removed! "+owner)
-        }
-      }
-      super.release()
-    }
-
-    def retain(owner: String) {
-      this.synchronized {
-        if( !owners.add(owner) ) {
-          warn("UOW owner already added! "+owner)
-        }
-      }
-      owners.add(owner)
-      super.retain()
-    }
-
     class MessageAction {
 
       var msg= 0L
@@ -387,7 +362,7 @@ trait DelayingStoreSupport extends Store
     out.println("--- Pending Stores Details ---")
     out.println("flush_source suspended: "+flush_source.isSuspended)
     pending_stores.valuesIterator.foreach{ action =>
-      out.println("uow: %d, state:%s, owners:%s".format(action.uow.uow_id, action.uow.state,
action.uow.owners))
+      out.println("uow: %d, state:%s".format(action.uow.uow_id, action.uow.state))
     }
     writer.toString
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/PersistentLongCounter.scala
Thu Feb 21 18:23:16 2013
@@ -86,11 +86,11 @@ case class PersistentLongCounter(name:St
   def update(value: Long)(on_complete: =>Unit) {
     val s = store
     if (s!=null) {
-      val uow = s.create_uow(toString)
+      val uow = s.create_uow
       uow.put(key, encode(value))
       uow.complete_asap()
       uow.on_complete(on_complete)
-      uow.release(toString)
+      uow.release
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/Store.scala
Thu Feb 21 18:23:16 2013
@@ -123,7 +123,7 @@ trait Store extends ServiceTrait {
    * Creates a store uow which is used to perform persistent
    * operations as unit of work.
    */
-  def create_uow(owner:String):StoreUOW
+  def create_uow:StoreUOW
 
   /**
    * Removes all previously stored data.

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/StoreUOW.scala
Thu Feb 21 18:23:16 2013
@@ -35,10 +35,7 @@ import org.fusesource.hawtbuf.Buffer
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait StoreUOW {
-
-  def release(owner:String);
-  def retain(owner:String);
+trait StoreUOW extends Retained {
 
   /**
    * Stores a message.  Messages a reference counted, so make sure you also

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
Thu Feb 21 18:23:16 2013
@@ -110,7 +110,7 @@ abstract class StoreFunSuiteSupport exte
   }
 
   def populate(queue_key:Long, messages:List[String], first_seq:Long=1) = {
-    var batch = store.create_uow("")
+    var batch = store.create_uow
     var msg_keys = ListBuffer[(Long, AtomicReference[Object], Long)]()
     var next_seq = first_seq
 
@@ -125,7 +125,7 @@ abstract class StoreFunSuiteSupport exte
 
     val task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release("")
+    batch.release
 
     msg_keys.foreach { msgKey =>
       store.flush_message(msgKey._1) {}

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreTests.scala
Thu Feb 21 18:23:16 2013
@@ -119,7 +119,7 @@ abstract class StoreTests extends StoreF
   test("batch completes after a delay") {x}
   def x = {
     val A = add_queue("A")
-    var batch = store.create_uow("")
+    var batch = store.create_uow
 
     val m1 = add_message(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))
@@ -127,7 +127,7 @@ abstract class StoreTests extends StoreF
     val tracker = new TaskTracker("unknown", 0)
     val task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release("")
+    batch.release
 
     expect(false) {
       tracker.await(3, TimeUnit.SECONDS)
@@ -139,7 +139,7 @@ abstract class StoreTests extends StoreF
 
   test("flush cancels the delay") {
     val A = add_queue("A")
-    var batch = store.create_uow("")
+    var batch = store.create_uow
 
     val m1 = add_message(batch, "message 1")
     batch.enqueue(entry(A, 1, m1))
@@ -147,7 +147,7 @@ abstract class StoreTests extends StoreF
     val tracker = new TaskTracker("unknown", 0)
     val task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release("")
+    batch.release
 
     store.flush_message(m1._1) {}
 

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/test/scala/org/apache/activemq/apollo/broker/store/leveldb/UowHaveLocatorsTest.scala
Thu Feb 21 18:23:16 2013
@@ -45,7 +45,7 @@ class UowHaveLocatorsTest extends StoreF
 
   test("APLO-201: Persistent Store: UOW with message locator and no message (previously flushed)"){
     val queue = add_queue("A")
-    val batch = store.create_uow("")
+    val batch = store.create_uow
     val m1 = add_message(batch, "Hello!")
     val queueEntryRecord: QueueEntryRecord =  entry(queue, 1, m1)
     batch.enqueue(queueEntryRecord)
@@ -53,7 +53,7 @@ class UowHaveLocatorsTest extends StoreF
     var tracker = new TaskTracker("uknown", 0)
     var task = tracker.task("uow complete")
     batch.on_complete(task.run)
-    batch.release("")
+    batch.release
 
     assert(queueEntryRecord.message_locator.get() == null)
 
@@ -62,13 +62,13 @@ class UowHaveLocatorsTest extends StoreF
     }
     assert(queueEntryRecord.message_locator.get() != null)
 
-    val batch2 = store.create_uow("")
+    val batch2 = store.create_uow
     batch2.enqueue(queueEntryRecord)
 
     tracker = new TaskTracker("uknown", 0)
     task = tracker.task("uow complete")
     batch2.on_complete(task.run)
-    batch2.release("")
+    batch2.release
 
     expect(true) {
       tracker.await(2, TimeUnit.SECONDS)

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/main/scala/org/apache/activemq/apollo/mqtt/MqttProtocolHandler.scala
Thu Feb 21 18:23:16 2013
@@ -466,7 +466,7 @@ object MqttSessionManager {
     case class StoreStrategy(store:Store, client_id:UTF8Buffer) extends StorageStrategy {
       val session_key = new UTF8Buffer("mqtt:"+client_id)
       def update(cb: =>Unit) = {
-        val uow = store.create_uow(toString)
+        val uow = store.create_uow
         val session_pb = new SessionPB.Bean
         session_pb.setClientId(client_id)
         received_message_ids.foreach(session_pb.addReceivedMessageIds(_))
@@ -485,11 +485,11 @@ object MqttSessionManager {
             cb
           }
         }
-        uow.release(toString)
+        uow.release
       }
 
       def destroy(cb: =>Unit) {
-        val uow = store.create_uow(toString)
+        val uow = store.create_uow
         uow.put(session_key, null)
         val current = getCurrentQueue
         uow.on_complete {
@@ -498,7 +498,7 @@ object MqttSessionManager {
             cb
           }
         }
-        uow.release(toString)
+        uow.release
       }
       def create(store:Store, client_id:UTF8Buffer) = {
       }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Thu Feb 21 18:23:16 2013
@@ -653,7 +653,7 @@ class OpenwireProtocolHandler extends Pr
         val route = OpenwireDeliveryProducerRoute(addresses)
 
         if( uow!=null ) {
-          uow.retain(toString)
+          uow.retain
         }
         // don't process frames until producer is connected...
         suspend_read("connecting producer route")
@@ -671,7 +671,7 @@ class OpenwireProtocolHandler extends Pr
                 }
             }
             if( uow!=null ) {
-              uow.release(toString)
+              uow.release
             }
           }
         }
@@ -1243,7 +1243,7 @@ class OpenwireProtocolHandler extends Pr
     def commit(onComplete: => Unit) = {
 
       val uow = if( host.store!=null ) {
-        host.store.create_uow(toString)
+        host.store.create_uow
       } else {
         null
       }
@@ -1254,7 +1254,7 @@ class OpenwireProtocolHandler extends Pr
 
       if( uow!=null ) {
         uow.on_complete(dispatchQueue{ onComplete })
-        uow.release(toString)
+        uow.release
       } else {
         onComplete
       }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Feb 21 18:23:16 2013
@@ -1231,7 +1231,7 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  var producer_routes = new LRUCache[AsciiBuffer, StompProducerRoute](10) {
+  var producer_routes = new LRUCache[AsciiBuffer, StompProducerRoute](1) {
     override def onCacheEviction(eldest: Entry[AsciiBuffer, StompProducerRoute]) = {
       host.dispatch_queue {
         host.router.disconnect(eldest.getValue.addresses, eldest.getValue)
@@ -1249,7 +1249,7 @@ class StompProtocolHandler extends Proto
         val route = new StompProducerRoute(trimmed_dest)   // don't process frames until
producer is connected...
         suspend_read("Connecting to destination")
         if( uow !=null ) {
-          uow.retain(toString+":connecting")
+          uow.retain
         }
         host.dispatch_queue {
           val rc = host.router.connect(route.addresses, route, security_context)
@@ -1266,7 +1266,7 @@ class StompProtocolHandler extends Proto
                 }
             }
             if( uow !=null ) {
-              uow.release(toString+":connecting")
+              uow.release
             }
           }
         }
@@ -1703,14 +1703,14 @@ class StompProtocolHandler extends Proto
 
     def commit(on_complete: => Unit) = {
       if( host.store!=null ) {
-        val uow = host.store.create_uow(toString+":commit")
+        val uow = host.store.create_uow
 //        println("UOW starting: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
         uow.on_complete {
 //          println("UOW completed: "+uow.asInstanceOf[DelayingStoreSupport#DelayableUOW].uow_id)
           on_complete
         }
         queue.foreach{ _._1(uow) }
-        uow.release(toString+":commit")
+        uow.release
       } else {
         queue.foreach{ _._1(null) }
         on_complete

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
Thu Feb 21 18:23:16 2013
@@ -41,7 +41,7 @@
     <leveldb_store directory="${testdatadir}"/>
   </virtual_host>
 
-  <!--<web_admin bind="http://0.0.0.0:61680"/>-->
+  <web_admin bind="http://0.0.0.0:61680"/>
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
   <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
   <connector id="stomp-udp" bind="udp://0.0.0.0:0" protocol="stomp-udp"/>

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js?rev=1448756&r1=1448755&r2=1448756&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/console/js/app.js Thu Feb 21
18:23:16 2013
@@ -229,9 +229,24 @@ App.LoginController = Em.Controller.crea
     var was_logged_in = this.get('is_logged_in')
     var kind = this.get('kind')
     App.ajax("GET", "/session/whoami", function(data) {
-        App.LoginController.set('content', data);
-        if( App.LoginController.get('is_logged_in') ) {
-          App.refresh();
+        if( data.length==0 ) {
+          App.ajax("GET", "/broker", function(broker) {
+            data.push({kind:"UserPrincipal", name:"<anonymous>"});
+            App.LoginController.set('content', data);
+            if( App.LoginController.get('is_logged_in') ) {
+              App.refresh();
+            }
+          }, function(error){
+            App.LoginController.set('content', data);
+            if( App.LoginController.get('is_logged_in') ) {
+              App.refresh();
+            }
+          });
+        } else {
+          App.LoginController.set('content', data);
+          if( App.LoginController.get('is_logged_in') ) {
+            App.refresh();
+          }
         }
       },
       function(xhr, status, thrown) {
@@ -778,7 +793,7 @@ App.ConfigurationController = Ember.Cont
       App.ConfigurationController.set("files", json);
     },
     function(xhr, status, thrown) {
-      if( xhr.status == 401 ) {
+      if( xhr.status == 401 || xhr.status == 404 ) {
         App.ConfigurationController.set("files", null);
       } else {
         App.default_error_handler(xhr, status, thrown)



Mime
View raw message