activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1208432 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apollo-util/src/test/scala/org/apache/activemq/apollo/util/
Date Wed, 30 Nov 2011 13:57:12 GMT
Author: chirino
Date: Wed Nov 30 13:57:11 2011
New Revision: 1208432

URL: http://svn.apache.org/viewvc?rev=1208432&view=rev
Log:
Improved temp destination tests.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1208432&r1=1208431&r2=1208432&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
Wed Nov 30 13:57:11 2011
@@ -930,16 +930,24 @@ class LocalRouter(val virtual_host:Virtu
   
   def remove_temp_destinations(active_connections:scala.collection.Set[Long]) = {
     virtual_host.dispatch_queue.assertExecuting()
+    val min_create_time = virtual_host.broker.now + 1000;
+
     // Auto delete temp destinations..
     queue_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { queue=>
       val owner = temp_owner(queue.destination_dto).get
-      if( owner._1==virtual_host.broker.id && !active_connections.contains(owner._2)
) {
+      if( owner._1==virtual_host.broker.id // are we the broker that owns the temp destination?
+          && !active_connections.contains(owner._2) // Has the connection not around?
+          && queue.service_state.since > min_create_time // It's not a recently
created destination?
+      ) {
         _destroy_queue(queue)
       }
     }
     topic_domain.destinations.filter(x=> is_temp(x.destination_dto)).foreach { topic =>
       val owner = temp_owner(topic.destination_dto).get
-      if( owner._1==virtual_host.broker.id && !active_connections.contains(owner._2)
) {
+      if( owner._1==virtual_host.broker.id // are we the broker that owns the temp destination?
+          && !active_connections.contains(owner._2) // Has the connection not around?
+          && topic.created_at > min_create_time // It's not a recently created
destination?
+      ) {
         topic_domain.destroy_destination(topic.path, topic.destination_dto, null)
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1208432&r1=1208431&r2=1208432&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Wed Nov 30 13:57:11 2011
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker.{LocalRouter, KeyStorage, Broker, BrokerFactory}
 import org.fusesource.hawtbuf.Buffer._
+import java.util.concurrent.TimeUnit._
 
 class StompTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach
with Logging {
   var broker: Broker = null
@@ -93,6 +94,22 @@ class StompTestSupport extends FunSuiteS
     frame should include("receipt-id:"+id+"\n")
   }
 
+  def queue_exists(name:String):Boolean = {
+    val host = broker.virtual_hosts.get(ascii("default")).get
+    host.dispatch_queue.future {
+      val router = host.router.asInstanceOf[LocalRouter]
+      router.queue_domain.destination_by_id.get(name).isDefined
+    }.await()
+  }
+
+  def topic_exists(name:String):Boolean = {
+    val host = broker.virtual_hosts.get(ascii("default")).get
+    host.dispatch_queue.future {
+      val router = host.router.asInstanceOf[LocalRouter]
+      router.topic_domain.destination_by_id.get(name).isDefined
+    }.await()
+  }
+
 }
 
 class Stomp10ConnectTest extends StompTestSupport {
@@ -1616,16 +1633,8 @@ class StompAutoDeleteTest extends StompT
     }
     get("1")
 
-    def queue_exists:Boolean = {
-      val host = broker.virtual_hosts.get(ascii("default")).get
-      host.dispatch_queue.future {
-        val router = host.router.asInstanceOf[LocalRouter]
-        router.queue_domain.destination_by_id.get("autodel").isDefined
-      }.await()
-    }
-
     // The queue should still exist..
-    expect(true)(queue_exists)
+    expect(true)(queue_exists("autodel"))
 
     client.write(
       "UNSUBSCRIBE\n" +
@@ -1637,7 +1646,7 @@ class StompAutoDeleteTest extends StompT
     Thread.sleep(1000);
 
     // Now that we unsubscribe, it should not exist any more.
-    expect(false)(queue_exists)
+    expect(false)(queue_exists("autodel"))
 
   }
 }
@@ -1655,9 +1664,12 @@ class StompTempDestinationTest extends S
         "SEND\n" +
         "destination:/temp-queue/test\n" +
         "reply-to:/temp-queue/test\n" +
+        "receipt:0\n" +
         "\n" +
         "message:"+msg+"\n")
+      wait_for_receipt("0")
     }
+
     put("1")
 
     client.write(
@@ -1685,8 +1697,44 @@ class StompTempDestinationTest extends S
     // The destination and reply-to headers should get updated with actual
     // Queue names
     val message = get("1")
-    message.get("destination").get should startWith("/queue/temp.default.")
+    val actual_temp_dest_name = message.get("destination").get
+    actual_temp_dest_name should startWith("/queue/temp.default.")
     message.get("reply-to") should be === ( message.get("destination") )
+
+    // Different connection should be able to send a message to the temp destination..
+    var other = new StompClient
+    connect("1.1", other)
+    other.write(
+      "SEND\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0", other)
+
+    // First client chould get the message.
+    var frame = client.receive()
+    frame should startWith("MESSAGE\n")
+
+    // But not consume from it.
+    other.write(
+      "SUBSCRIBE\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
+      "id:1\n" +
+      "receipt:0\n" +
+      "\n")
+    frame = other.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex("""message:Not authorized to receive from the temporary destination""")
+    other.close()
+
+    // Check that temp queue is deleted once the client disconnects
+    put("2")
+    assert(queue_exists(actual_temp_dest_name))
+    client.close();
+
+    within(5, SECONDS) {
+      assert(!queue_exists(actual_temp_dest_name))
+    }
   }
 
   test("Temp Topic Send Receive") {
@@ -1719,30 +1767,56 @@ class StompTempDestinationTest extends S
         "SEND\n" +
         "destination:/temp-topic/test\n" +
         "reply-to:/temp-topic/test\n" +
+        "receipt:0\n" +
         "\n" +
         "message:"+msg+"\n")
+      wait_for_receipt("0", client)
     }
     put("1")
 
     // The destination and reply-to headers should get updated with actual
     // Queue names
     val message = get("1")
-    message.get("destination").get should startWith("/topic/temp.default.")
+    val actual_temp_dest_name = message.get("destination").get
+    actual_temp_dest_name should startWith("/topic/temp.default.")
     message.get("reply-to") should be === ( message.get("destination") )
-  }
 
-  test("Receive not allowed on another connection's temp queue") {
+    // Different connection should be able to send a message to the temp destination..
+    var other = new StompClient
+    connect("1.1", other)
+    other.write(
+      "SEND\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0", other)
 
-    connect("1.1")
-    client.write(
+    // First client chould get the message.
+    var frame = client.receive()
+    frame should startWith("MESSAGE\n")
+
+    // But not consume from it.
+    other.write(
       "SUBSCRIBE\n" +
-      "destination:/queue/temp.default.1212112.test\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
       "id:1\n" +
+      "receipt:0\n" +
       "\n")
-
-    val frame = client.receive()
+    frame = other.receive()
     frame should startWith("ERROR\n")
     frame should include regex("""message:Not authorized to receive from the temporary destination""")
+    other.close()
+
+    // Check that temp queue is deleted once the client disconnects
+    put("2")
+    assert(topic_exists(actual_temp_dest_name))
+    client.close();
+
+    within(5, SECONDS) {
+      assert(!topic_exists(actual_temp_dest_name))
+    }
+
 
   }
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala?rev=1208432&r1=1208431&r2=1208432&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
Wed Nov 30 13:57:11 2011
@@ -24,6 +24,7 @@ import java.lang.String
 import collection.immutable.Map
 import org.scalatest._
 import FileSupport._
+import java.util.concurrent.TimeUnit
 
 /**
  * @version $Revision : 1.1 $
@@ -85,4 +86,36 @@ abstract class FunSuiteSupport extends F
     }
   }
 
+  private class ShortCircuitFailure(msg:String) extends RuntimeException(msg)
+
+  def within[T](timeout:Long, unit:TimeUnit)(func: => Unit ):Unit = {
+    val start = System.currentTimeMillis
+    var amount = unit.toMillis(timeout)
+    var sleep_amount = amount / 100
+    var last:Throwable = null
+
+    if( sleep_amount < 1 ) {
+      sleep_amount = 1
+    }
+    try {
+      func
+      return
+    } catch {
+      case e:ShortCircuitFailure => throw e
+      case e:Throwable => last = e
+    }
+
+    while( (System.currentTimeMillis-start) < amount ) {
+      Thread.sleep(sleep_amount)
+      try {
+        func
+        return
+      } catch {
+        case e:ShortCircuitFailure => throw e
+        case e:Throwable => last = e
+      }
+    }
+
+    throw last
+  }
 }
\ No newline at end of file



Mime
View raw message