activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1330910 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Date Thu, 26 Apr 2012 15:46:10 GMT
Author: chirino
Date: Thu Apr 26 15:46:09 2012
New Revision: 1330910

URL: http://svn.apache.org/viewvc?rev=1330910&view=rev
Log:
Fixes APLO-198: Apollo sometimes does not send all the messages in a queue

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1330910&r1=1330909&r2=1330910&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Apr 26 15:46:09 2012
@@ -1027,6 +1027,8 @@ class Queue(val router: LocalRouter, val
             entry.increment_nack
             entry.entry.redelivered
             entry.nack
+          case Undelivered =>
+            entry.nack
           case Poisoned =>
             entry.increment_nack
             entry.entry.redelivered
@@ -1040,8 +1042,6 @@ class Queue(val router: LocalRouter, val
             } else {
               entry.nack
             }
-          case Undelivered =>
-            entry.increment_nack
         }
         if( uow!=null ) {
           uow.release()

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1330910&r1=1330909&r2=1330910&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Thu Apr 26 15:46:09 2012
@@ -147,10 +147,19 @@ class StompTestSupport extends BrokerFun
     }
   }
 
-  def wait_for_receipt(id:String, c: StompClient = client): Unit = {
-    val frame = c.receive()
-    frame should startWith("RECEIPT\n")
-    frame should include("receipt-id:"+id+"\n")
+  def wait_for_receipt(id:String, c: StompClient = client, discard_others:Boolean=false):
Unit = {
+    if( !discard_others ) {
+      val frame = c.receive()
+      frame should startWith("RECEIPT\n")
+      frame should include("receipt-id:"+id+"\n")
+    } else {
+      while(true) {
+        val frame = c.receive()
+        if( frame.startsWith("RECEIPT\n") && frame.indexOf("receipt-id:"+id+"\n")>=0
) {
+          return
+        }
+      }
+    }
   }
 }
 
@@ -518,6 +527,43 @@ class Stomp11HeartBeatTest extends Stomp
   }
 
 }
+
+class StompPersistentQueueTest extends StompTestSupport {
+
+  override val broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+  test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
+    connect("1.1")
+    for( i <- 0 until 10000 ) {
+      async_send("/queue/BIGQUEUE", "message #"+i)
+    }
+    sync_send("/queue/BIGQUEUE", "END")
+    client.close
+    
+    var counter = 0
+    for( i <- 0 until 100 ) {
+      connect("1.1")
+      subscribe("1", "/queue/BIGQUEUE", "client", false, "", false)
+      for( j <- 0 until 100 ) {
+        assert_received("message #"+counter)(true)
+        counter+=1
+      }
+      client.write(
+        "DISCONNECT\n" +
+        "receipt:disco\n" +
+        "\n")
+      wait_for_receipt("disco", client, true)
+      client.close
+    }
+
+    connect("1.1")
+    subscribe("1", "/queue/BIGQUEUE", "client")
+    assert_received("END")(true)
+
+  }
+
+}
+
 class StompDestinationTest extends StompTestSupport {
 
   test("Browsing queues does not cause AssertionError.  Reported in APLO-156") {



Mime
View raw message