activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1390597 - in /activemq/activemq-apollo/trunk/apollo-stomp/src: main/scala/org/apache/activemq/apollo/stomp/ test/scala/org/apache/activemq/apollo/stomp/test/
Date Wed, 26 Sep 2012 16:41:07 GMT
Author: chirino
Date: Wed Sep 26 16:41:06 2012
New Revision: 1390597

URL: http://svn.apache.org/viewvc?rev=1390597&view=rev
Log:
Fixes APLO-261 : Added support the 1.2 STOMP spec ACK style.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Sep 26 16:41:06 2012
@@ -392,7 +392,7 @@ object Stomp {
   val TIMESTAMP = ascii("timestamp")
   val SUBSCRIPTION = ascii("subscription")
 
-  val ACK_MODE = ascii("ack")
+  val ACK_HEADER = ascii("ack")
   val ID = ascii("id")
   val SELECTOR = ascii("selector")
   val CREDIT = ascii("credit")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Sep 26 16:41:06 2012
@@ -39,6 +39,7 @@ import org.fusesource.hawtdispatch.trans
 import path.{LiteralPart, Path, PathParser}
 import scala.Some
 import org.apache.activemq.apollo.broker.SubscriptionAddress
+import java.util
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -161,13 +162,43 @@ class StompProtocolHandler extends Proto
   def broker = connection.connector.broker
 
   protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
-  
-  def id(message:Message) = {
-    message.asInstanceOf[StompFrameMessage].id
-  }
 
   case class InitialCreditWindow(count:Int,size:Int,auto_credit:Boolean)
 
+
+  //////////////////////////////////////////////////////////////////
+  //
+  // Since ack id's are re-useable once they are acked by the client,
+  // try to re-use them since the first ones generated will be the
+  // shortest ack-ids available.
+  //
+  //////////////////////////////////////////////////////////////////
+  var ack_id_counter = 1L
+  var ack_id_pool = new util.HashSet[AsciiBuffer]()
+  def checkout_ack_id:AsciiBuffer = {
+    if( ack_id_pool!=null && ack_id_pool.size()>0 ) {
+      val i = ack_id_pool.iterator();
+      val rc = i.next()
+      i.remove()
+      rc
+    } else {
+      ack_id_counter += 1
+      ascii(java.lang.Long.toHexString(ack_id_counter))
+    }
+  }
+
+  def checkin_ack_id(id:AsciiBuffer) = {
+    if( ack_id_pool!=null ) {
+      if( ack_id_pool.size() < 0xFFF ) {
+        ack_id_pool.add(id);
+      } else {
+        // if we are sending too many messages at once
+        // then it might not makes sense to pool..
+        ack_id_pool = null
+      }
+    }
+  }
+
   class StompConsumer (
 
     val subscription_id:Option[AsciiBuffer],
@@ -237,7 +268,7 @@ class StompProtocolHandler extends Proto
     credit_window_source.resume
 
     trait AckHandler {
-      def track(event:(Session[Delivery], Delivery)):Unit
+      def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit):Unit
       def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
       def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
       def close:Unit
@@ -249,19 +280,18 @@ class StompProtocolHandler extends Proto
 
       def close = { closed  = true}
 
-      def track(event:(Session[Delivery], Delivery)) = {
-        val (session, delivery) = event
-        session_manager.delivered(session, delivery.size)
+      def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit) = {
+        session_manager.delivered(session, size)
         if( closed ) {
-          if( delivery.ack!=null ) {
-            delivery.ack(Undelivered, null)
+          if( ack!=null ) {
+            ack(Undelivered, null)
           }
         } else {
-          if( delivery.ack!=null ) {
-            delivery.ack(Consumed, null)
+          if( ack!=null ) {
+            ack(Consumed, null)
           }
           if( !dead ) {
-            credit_window_source.merge((1, delivery.size))
+            credit_window_source.merge((1, size))
           }
         }
       }
@@ -290,23 +320,22 @@ class StompProtocolHandler extends Proto
         consumer_acks = null
       }
 
-      def track(event:(Session[Delivery], Delivery)) = {
+      def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit) = {
         queue.assertExecuting()
-        val (session, delivery) = event
         if( consumer_acks == null ) {
           // It can happen if we get closed.. but destination is still sending data..
-          if( delivery.ack!=null ) {
-            delivery.ack(Undelivered, null)
+          if( ack!=null ) {
+            ack(Undelivered, null)
           }
         } else {
-          if( protocol_version eq V1_0 ) {
+          if( (protocol_version eq V1_0) || (protocol_version eq V1_2) ) {
             // register on the connection since 1.0 acks may not include the subscription
id
-            connection_ack_handlers += ( id(delivery.message) -> this )
+            connection_ack_handlers += ( msgid -> this )
           }
           if( initial_credit_window.auto_credit ) {
-            consumer_acks += id(delivery.message) -> new TrackedAck(Some((session, delivery.size)),
delivery.ack )
+            consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack )
           } else {
-            session_manager.delivered(session, delivery.size)
+            session_manager.delivered(session, size)
           }
         }
       }
@@ -366,6 +395,9 @@ class StompProtocolHandler extends Proto
 
         if( protocol_version eq V1_0 ) {
           connection_ack_handlers.remove(msgid)
+        } else if( protocol_version eq V1_2 ) {
+          connection_ack_handlers.remove(msgid)
+          checkin_ack_id(msgid)
         }
       }
 
@@ -384,23 +416,22 @@ class StompProtocolHandler extends Proto
         consumer_acks = null
       }
 
-      def track(event:(Session[Delivery], Delivery)) = {
+      def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit) = {
         queue.assertExecuting();
-        val (session, delivery) = event
         if( consumer_acks == null ) {
           // It can happen if we get closed.. but destination is still sending data..
-          if( delivery.ack!=null ) {
-            delivery.ack(Undelivered, null)
+          if( ack!=null ) {
+            ack(Undelivered, null)
           }
         } else {
-          if( protocol_version eq V1_0 ) {
+          if( (protocol_version eq V1_0) || (protocol_version eq V1_2) ) {
             // register on the connection since 1.0 acks may not include the subscription
id
-            connection_ack_handlers += ( id(delivery.message) -> this )
+            connection_ack_handlers += ( msgid -> this )
           }
           if( initial_credit_window.auto_credit ) {
-            consumer_acks += id(delivery.message) -> new TrackedAck(Some((session, delivery.size)),
delivery.ack)
+            consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack)
           } else {
-            session_manager.delivered(session, delivery.size)
+            session_manager.delivered(session, size)
           }
         }
       }
@@ -435,6 +466,9 @@ class StompProtocolHandler extends Proto
 
         if( protocol_version eq V1_0 ) {
           connection_ack_handlers.remove(msgid)
+        } else if( protocol_version eq V1_2 ) {
+          connection_ack_handlers.remove(msgid)
+          checkin_ack_id(msgid)
         }
       }
     }
@@ -450,9 +484,7 @@ class StompProtocolHandler extends Proto
 
     val consumer_sink = sink_manager.open()
     val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map
{ event =>
-      ack_handler.track(event)
-      val (_, delivery) = event
-
+      val (session, delivery) = event
       val message = delivery.message
       var frame = if( message.codec eq StompMessageCodec ) {
         message.asInstanceOf[StompFrameMessage].frame
@@ -469,6 +501,17 @@ class StompProtocolHandler extends Proto
         StompFrame(MESSAGE, headers, BufferContent(body))
       }
 
+      val ack_id = if( (protocol_version eq V1_0) || (protocol_version eq V1_1) ) {
+        frame.header(MESSAGE_ID)
+      } else {
+        val ack_id = checkout_ack_id
+        // we need to add the ACK id.
+        frame = frame.append_headers((ACK_HEADER->ack_id)::Nil)
+        ack_id
+      }
+
+      ack_handler.track(session, ack_id, delivery.size, delivery.ack)
+
       if( subscription_id != None ) {
         frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
       }
@@ -630,7 +673,7 @@ class StompProtocolHandler extends Proto
 
   private def queue = connection.dispatch_queue
 
-  // uses by STOMP 1.0 clients
+  // uses by STOMP 1.0 and 1.2 clients
   var connection_ack_handlers = HashMap[AsciiBuffer, StompConsumer#AckHandler]()
 
   var protocol_version:AsciiBuffer = _
@@ -1333,7 +1376,7 @@ class StompProtocolHandler extends Proto
       die("The from-seq header is only supported when you subscribe to one destination");
     }
 
-    val ack_mode = get(headers, ACK_MODE).getOrElse(ACK_MODE_AUTO)
+    val ack_mode = get(headers, ACK_HEADER).getOrElse(ACK_MODE_AUTO)
     val credit_window = get(headers, CREDIT) match {
       case Some(value) =>
         value.toString.split(",").toList match {
@@ -1470,21 +1513,32 @@ class StompProtocolHandler extends Proto
         }
 
     }
-    val messageId = get(headers, MESSAGE_ID).getOrElse(null)
 
-    if( credit==null && messageId==null) {
-      die("message id header not set")
-    }
+    val (messageId,handler) = if( (protocol_version eq V1_0) || (protocol_version eq V1_1)
) {
+      val messageId = get(headers, MESSAGE_ID).getOrElse(null)
+      if( credit==null && messageId==null) {
+        die("message id header not set")
+      }
 
-    val subscription_id = get(headers, SUBSCRIPTION);
-    val handler = subscription_id match {
-      case None=>
-        if( !(protocol_version eq V1_0) ) {
-          die("The subscription header is required")
-        }
-        connection_ack_handlers.get(messageId).orElse(die("Not expecting an ACK/NACK for
message id '%s'".format(messageId)))
-      case Some(id) =>
-        consumers.get(id).map(_.ack_handler).orElse(die("The subscription '%s' does not exist".format(id)))
+      val subscription_id = get(headers, SUBSCRIPTION);
+      val handler = subscription_id match {
+        case None=>
+          if( !(protocol_version eq V1_0) ) {
+            die("The subscription header is required")
+          }
+          connection_ack_handlers.get(messageId).orElse(die("Not expecting an ACK/NACK for
message id '%s'".format(messageId)))
+        case Some(id) =>
+          consumers.get(id).map(_.ack_handler).orElse(die("The subscription '%s' does not
exist".format(id)))
+      }
+
+      (messageId,handler)
+    } else {
+      val id = get(headers, ID).getOrElse(null)
+      if( credit==null && id==null) {
+        die("id header not set")
+      }
+      val handler = connection_ack_handlers.get(id).orElse(die("Not expecting an ACK/NACK
for id '%s'".format(id)))
+      (id,handler)
     }
 
     handler.foreach{ handler=>

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
Wed Sep 26 16:41:06 2012
@@ -37,6 +37,7 @@ class StompClient extends ShouldMatchers
   val bufferSize = 64 * 1204
   var key_storeage: KeyStorage = null
   var bytes_written = 0L
+  var version:String = null
 
   def open(host: String, port: Int) = {
     bytes_written = 0

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Wed Sep 26 16:41:06 2012
@@ -1504,4 +1504,42 @@ class StompParallelTest extends StompTes
 
   }
 
+
+  test("STOMP 1.2 client ack.") {
+    val dest = next_id("/queue/stomp12clientack_")
+
+    connect("1.2")
+    subscribe("my-sub-name", dest, "client")
+    async_send(dest, 1)
+    async_send(dest, 2)
+    assert_received(1, "my-sub-name")
+    assert_received(2, "my-sub-name")(true)
+    disconnect()
+
+    connect("1.2")
+    subscribe("my-sub-name", dest, "client")
+    async_send(dest, 3)
+    assert_received(3, "my-sub-name")(true)
+    disconnect()
+  }
+
+  test("STOMP 1.2 client-individual ack.") {
+    val dest = next_id("/queue/stomp12clientack_")
+
+    connect("1.2")
+    subscribe("my-sub-name", dest, "client-individual")
+    async_send(dest, 1)
+    async_send(dest, 2)
+    assert_received(1, "my-sub-name")
+    assert_received(2, "my-sub-name")(true)
+    disconnect()
+
+    connect("1.2")
+    subscribe("my-sub-name", dest, "client")
+    async_send(dest, 3)
+    assert_received(1, "my-sub-name")(true)
+    assert_received(3, "my-sub-name")(true)
+    disconnect()
+  }
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
Wed Sep 26 16:41:06 2012
@@ -44,20 +44,21 @@ class StompTestSupport extends BrokerFun
           "CONNECT\n" +
                   headers +
                   "\n")
-      case "1.1" =>
+      case "1.1" | "1.2" =>
         c.write(
           "CONNECT\n" +
-                  "accept-version:1.1\n" +
+                  "accept-version:"+version+"\n" +
                   "host:localhost\n" +
                   headers +
                   "\n")
-      case x => throw new RuntimeException("invalid version: %f".format(x))
+      case x => throw new RuntimeException("invalid version: %s".format(x))
     }
     clients ::= c
     c.receive()
   }
 
   def connect(version: String, c: StompClient = client, headers: String = "", connector:
String = null) = {
+    c.version = version
     val frame = connect_request(version, c, headers, connector)
     frame should startWith("CONNECTED\n")
     frame should include regex ("""session:.+?\n""")
@@ -140,17 +141,28 @@ class StompTestSupport extends BrokerFun
     }
     // return a func that can ack the message.
     (ack: Boolean) => {
-      val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
-      val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
-      val sub_regex(sub) = frame
-      val msgid_regex(msgid) = frame
-      c.write(
-        (if (ack) "ACK\n" else "NACK\n") +
-                "subscription:" + sub + "\n" +
-                "message-id:" + msgid + "\n" +
-                (if (txid != null) "transaction:" + txid + "\n" else "") +
+      if( c.version == "1.0" || c.version== "1.1" ) {
+        val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
+        val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
+        val sub_regex(sub) = frame
+        val msgid_regex(msgid) = frame
+        c.write(
+          (if (ack) "ACK\n" else "NACK\n") +
+                  "subscription:" + sub + "\n" +
+                  "message-id:" + msgid + "\n" +
+                  (if (txid != null) "transaction:" + txid + "\n" else "") +
+
+                  "\n")
+      } else {
+        val ack_regex = """(?s).*\nack:([^\n]+)\n.*""".r
+        val ack_regex(id) = frame
+        c.write(
+          (if (ack) "ACK\n" else "NACK\n") +
+                  "id:" + id + "\n" +
+                  (if (txid != null) "transaction:" + txid + "\n" else "") +
 
-                "\n")
+                  "\n")
+      }
     }
   }
 



Mime
View raw message