activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1032562 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Date Mon, 08 Nov 2010 13:01:45 GMT
Author: chirino
Date: Mon Nov  8 13:01:44 2010
New Revision: 1032562

URL: http://svn.apache.org/viewvc?rev=1032562&view=rev
Log:
Fixes problem where multiple receipts were being sent back to senders on topics which had
consumers.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1032562&r1=1032561&r2=1032562&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Mon Nov  8 13:01:44 2010
@@ -25,10 +25,10 @@ import _root_.org.fusesource.hawtdispatc
 import collection.JavaConversions
 import org.apache.activemq.apollo.util._
 import collection.mutable.{ListBuffer, HashMap}
-import org.apache.activemq.apollo.store.QueueRecord
 import org.apache.activemq.apollo.dto.{PointToPointBindingDTO, BindingDTO}
 import path.{PathFilter, PathMap}
 import scala.collection.immutable.List
+import org.apache.activemq.apollo.store.{StoreUOW, QueueRecord}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -404,6 +404,7 @@ case class DeliveryProducerRoute(val rou
   // Dispatch.
   //
 
+  var pendingAck: (StoreUOW)=>Unit = null
   var overflow:Delivery=null
   var overflowSessions = List[DeliverySession]()
   var refiller:Runnable=null
@@ -416,42 +417,49 @@ case class DeliveryProducerRoute(val rou
     } else {
 
       // Do we need to store the message if we have a matching consumer?
-      delivery.message.retain
+      pendingAck = delivery.ack
+      val copy = delivery.copy
+      copy.message.retain
+
       targets.foreach { target=>
 
         // only deliver to matching consumers
-        if( target.consumer.matches(delivery) ) {
+        if( target.consumer.matches(copy) ) {
 
-          if( delivery.storeKey == -1L && target.consumer.is_persistent &&
delivery.message.persistent ) {
-            if( delivery.uow==null ) {
-              delivery.uow = router.host.store.createStoreUOW
+          if( copy.storeKey == -1L && target.consumer.is_persistent && copy.message.persistent
) {
+            if( copy.uow==null ) {
+              copy.uow = router.host.store.createStoreUOW
             } else {
-              delivery.uow.retain
+              copy.uow.retain
             }
-            delivery.storeKey = delivery.uow.store(delivery.createMessageRecord)
+            copy.storeKey = copy.uow.store(copy.createMessageRecord)
           }
 
-          if( !target.offer(delivery) ) {
+          if( !target.offer(copy) ) {
             overflowSessions ::= target
           }
         }
       }
 
       if( overflowSessions!=Nil ) {
-        overflow = delivery
+        overflow = copy
       } else {
-        delivered(delivery)
+        delivered(copy)
       }
       true
     }
   }
 
   private def delivered(delivery: Delivery): Unit = {
-    if (delivery.ack != null) {
+    if (pendingAck != null) {
       if (delivery.uow != null) {
-        delivery.uow.setDisposer(^ {delivery.ack(null)})
+        delivery.uow.setDisposer(^ {
+          pendingAck(null)
+          pendingAck=null
+        })
       } else {
-        delivery.ack(null)
+        pendingAck(null)
+        pendingAck==null
       }
     }
     if (delivery.uow != null) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1032562&r1=1032561&r2=1032562&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Nov  8 13:01:44 2010
@@ -34,6 +34,7 @@ class StompTestSupport extends FunSuiteS
   }
 
   var client = new StompClient
+  var clients = List[StompClient]()
 
   override protected def afterAll() = {
     broker.stop
@@ -41,7 +42,8 @@ class StompTestSupport extends FunSuiteS
 
   override protected def afterEach() = {
     super.afterEach
-    client.close
+    clients.foreach(_.close)
+    clients = Nil
   }
 
   def connect(version:String, c: StompClient = client) = {
@@ -63,6 +65,7 @@ class StompTestSupport extends FunSuiteS
     frame should startWith("CONNECTED\n")
     frame should include regex("""session:.+?\n""")
     frame should include("version:"+version+"\n")
+    clients ::= c
     c
   }
 
@@ -399,6 +402,58 @@ class StompDestinationTest extends Stomp
   }
 }
 
+class StompReceiptTest extends StompTestSupport {
+
+  test("Receipts on SEND to unconsummed topic") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/receipt-test\n" +
+        "receipt:"+id+"\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    put(1)
+    put(2)
+    wait_for_receipt("1")
+    wait_for_receipt("2")
+
+
+  }
+
+  test("Receipts on SEND to a consumed topic") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/receipt-test\n" +
+        "receipt:"+id+"\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    // start a consumer on a different connection
+    var consumer = new StompClient
+    connect("1.1", consumer)
+    consumer.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/receipt-test\n" +
+      "id:0\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0", consumer)
+
+    put(1)
+    put(2)
+    wait_for_receipt("1")
+    wait_for_receipt("2")
+    
+  }
+}
 class StompTransactionTest extends StompTestSupport {
   
 



Mime
View raw message