activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1447482 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/...
Date Mon, 18 Feb 2013 20:40:27 GMT
Author: chirino
Date: Mon Feb 18 20:40:26 2013
New Revision: 1447482

URL: http://svn.apache.org/r1447482
Log:
Fixes failing MQTT tests.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1447482&r1=1447481&r2=1447482&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Mon Feb 18 20:40:26 2013
@@ -711,7 +711,8 @@ class Queue(val router: LocalRouter, val
 
         // Do we need to do a persistent enqueue???
         val uow = if( queue_delivery.persistent && tune_persistent ) {
-          queue_delivery.uow = create_uow(binding.binding_kind+":"+id+":offer", delivery.uow)
+          assert(delivery.uow !=null)
+          queue_delivery.uow = delivery.uow
           entry.state match {
             case state:entry.Loaded => state.store_enqueue
             case state:entry.Swapped => queue_delivery.uow.enqueue(entry.toQueueEntryRecord)
@@ -1221,8 +1222,8 @@ class Queue(val router: LocalRouter, val
         if( delivery.message!=null ) {
           delivery.message.retain
         }
-        if( delivery.persistent && tune_persistent && delivery.uow!=null
) {
-          delivery.uow.retain(binding.binding_kind+":"+id+":offer")
+        if( delivery.persistent && tune_persistent ) {
+          delivery.uow = create_uow(binding.binding_kind+":"+id+":offer", delivery.uow)
         }
         val rc = downstream.offer(delivery)
         assert(rc, "session should accept since it was not full")

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1447482&r1=1447481&r2=1447482&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
Mon Feb 18 20:40:26 2013
@@ -119,11 +119,16 @@ trait DelayingStoreSupport extends Store
     val owners = scala.collection.mutable.HashSet[String]()
 
     def release(owner: String) {
-      owners.remove(owner)
+      if( !owners.remove(owner) ) {
+        warn("UOW owner already removed! "+owner)
+      }
       super.release()
     }
 
     def retain(owner: String) {
+      if( !owners.add(owner) ) {
+        warn("UOW owner already added! "+owner)
+      }
       owners.add(owner)
       super.retain()
     }

Modified: activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala?rev=1447482&r1=1447481&r2=1447482&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-mqtt/src/test/scala/org/apache/activemq/apollo/mqtt/test/MqttLoadTest.scala
Mon Feb 18 20:40:26 2013
@@ -21,6 +21,7 @@ import org.fusesource.mqtt.client._
 import org.apache.activemq.apollo.broker.Broker
 import org.fusesource.hawtdispatch._
 import java.util.concurrent.{TimeUnit, CountDownLatch}
+import java.util.concurrent.atomic.AtomicReference
 
 class MqttLoadTest extends MqttTestSupport {
 
@@ -31,15 +32,21 @@ class MqttLoadTest extends MqttTestSuppo
       val topic = prefix+"/load"
 
       val receiver = create_client
+      val error = new AtomicReference[Throwable]()
       connect(receiver)
       subscribe(topic, QoS.AT_LEAST_ONCE, receiver)
 
       val done = new CountDownLatch(1)
       Broker.BLOCKABLE_THREAD_POOL {
-        for(i <- 1 to 1000) {
-          should_receive("%0256d".format(i), topic, receiver)
+        try {
+          for (i <- 1 to 1000) {
+            should_receive("%0256d".format(i), topic, receiver)
+          }
+        } catch {
+          case e:Throwable => error.set(e)
+        } finally {
+          done.countDown()
         }
-        done.countDown()
       }
 
       connect()
@@ -48,6 +55,9 @@ class MqttLoadTest extends MqttTestSuppo
       }
 
       done.await(30, TimeUnit.SECONDS) should be(true)
+      if( error.get() != null ) {
+        throw error.get()
+      }
     }
 
   }



Mime
View raw message