Author: chirino
Date: Wed Sep 26 16:41:06 2012
New Revision: 1390597
URL: http://svn.apache.org/viewvc?rev=1390597&view=rev
Log:
Fixes APLO-261 : Added support the 1.2 STOMP spec ACK style.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Sep 26 16:41:06 2012
@@ -392,7 +392,7 @@ object Stomp {
val TIMESTAMP = ascii("timestamp")
val SUBSCRIPTION = ascii("subscription")
- val ACK_MODE = ascii("ack")
+ val ACK_HEADER = ascii("ack")
val ID = ascii("id")
val SELECTOR = ascii("selector")
val CREDIT = ascii("credit")
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Wed Sep 26 16:41:06 2012
@@ -39,6 +39,7 @@ import org.fusesource.hawtdispatch.trans
import path.{LiteralPart, Path, PathParser}
import scala.Some
import org.apache.activemq.apollo.broker.SubscriptionAddress
+import java.util
case class RichBuffer(self:Buffer) extends Proxy {
@@ -161,13 +162,43 @@ class StompProtocolHandler extends Proto
def broker = connection.connector.broker
protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
-
- def id(message:Message) = {
- message.asInstanceOf[StompFrameMessage].id
- }
case class InitialCreditWindow(count:Int,size:Int,auto_credit:Boolean)
+
+ //////////////////////////////////////////////////////////////////
+ //
+ // Since ack id's are re-useable once they are acked by the client,
+ // try to re-use them since the first ones generated will be the
+ // shortest ack-ids available.
+ //
+ //////////////////////////////////////////////////////////////////
+ var ack_id_counter = 1L
+ var ack_id_pool = new util.HashSet[AsciiBuffer]()
+ def checkout_ack_id:AsciiBuffer = {
+ if( ack_id_pool!=null && ack_id_pool.size()>0 ) {
+ val i = ack_id_pool.iterator();
+ val rc = i.next()
+ i.remove()
+ rc
+ } else {
+ ack_id_counter += 1
+ ascii(java.lang.Long.toHexString(ack_id_counter))
+ }
+ }
+
+ def checkin_ack_id(id:AsciiBuffer) = {
+ if( ack_id_pool!=null ) {
+ if( ack_id_pool.size() < 0xFFF ) {
+ ack_id_pool.add(id);
+ } else {
+ // if we are sending too many messages at once
+ // then it might not makes sense to pool..
+ ack_id_pool = null
+ }
+ }
+ }
+
class StompConsumer (
val subscription_id:Option[AsciiBuffer],
@@ -237,7 +268,7 @@ class StompProtocolHandler extends Proto
credit_window_source.resume
trait AckHandler {
- def track(event:(Session[Delivery], Delivery)):Unit
+ def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit):Unit
def credit(msgid: AsciiBuffer, credit_value: (Int, Int)):Unit
def perform_ack(consumed:DeliveryResult, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
def close:Unit
@@ -249,19 +280,18 @@ class StompProtocolHandler extends Proto
def close = { closed = true}
- def track(event:(Session[Delivery], Delivery)) = {
- val (session, delivery) = event
- session_manager.delivered(session, delivery.size)
+ def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit) = {
+ session_manager.delivered(session, size)
if( closed ) {
- if( delivery.ack!=null ) {
- delivery.ack(Undelivered, null)
+ if( ack!=null ) {
+ ack(Undelivered, null)
}
} else {
- if( delivery.ack!=null ) {
- delivery.ack(Consumed, null)
+ if( ack!=null ) {
+ ack(Consumed, null)
}
if( !dead ) {
- credit_window_source.merge((1, delivery.size))
+ credit_window_source.merge((1, size))
}
}
}
@@ -290,23 +320,22 @@ class StompProtocolHandler extends Proto
consumer_acks = null
}
- def track(event:(Session[Delivery], Delivery)) = {
+ def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit) = {
queue.assertExecuting()
- val (session, delivery) = event
if( consumer_acks == null ) {
// It can happen if we get closed.. but destination is still sending data..
- if( delivery.ack!=null ) {
- delivery.ack(Undelivered, null)
+ if( ack!=null ) {
+ ack(Undelivered, null)
}
} else {
- if( protocol_version eq V1_0 ) {
+ if( (protocol_version eq V1_0) || (protocol_version eq V1_2) ) {
// register on the connection since 1.0 acks may not include the subscription
id
- connection_ack_handlers += ( id(delivery.message) -> this )
+ connection_ack_handlers += ( msgid -> this )
}
if( initial_credit_window.auto_credit ) {
- consumer_acks += id(delivery.message) -> new TrackedAck(Some((session, delivery.size)),
delivery.ack )
+ consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack )
} else {
- session_manager.delivered(session, delivery.size)
+ session_manager.delivered(session, size)
}
}
}
@@ -366,6 +395,9 @@ class StompProtocolHandler extends Proto
if( protocol_version eq V1_0 ) {
connection_ack_handlers.remove(msgid)
+ } else if( protocol_version eq V1_2 ) {
+ connection_ack_handlers.remove(msgid)
+ checkin_ack_id(msgid)
}
}
@@ -384,23 +416,22 @@ class StompProtocolHandler extends Proto
consumer_acks = null
}
- def track(event:(Session[Delivery], Delivery)) = {
+ def track(session:Session[Delivery], msgid: AsciiBuffer, size:Int, ack:(DeliveryResult,
StoreUOW)=>Unit) = {
queue.assertExecuting();
- val (session, delivery) = event
if( consumer_acks == null ) {
// It can happen if we get closed.. but destination is still sending data..
- if( delivery.ack!=null ) {
- delivery.ack(Undelivered, null)
+ if( ack!=null ) {
+ ack(Undelivered, null)
}
} else {
- if( protocol_version eq V1_0 ) {
+ if( (protocol_version eq V1_0) || (protocol_version eq V1_2) ) {
// register on the connection since 1.0 acks may not include the subscription
id
- connection_ack_handlers += ( id(delivery.message) -> this )
+ connection_ack_handlers += ( msgid -> this )
}
if( initial_credit_window.auto_credit ) {
- consumer_acks += id(delivery.message) -> new TrackedAck(Some((session, delivery.size)),
delivery.ack)
+ consumer_acks += msgid -> new TrackedAck(Some((session, size)), ack)
} else {
- session_manager.delivered(session, delivery.size)
+ session_manager.delivered(session, size)
}
}
}
@@ -435,6 +466,9 @@ class StompProtocolHandler extends Proto
if( protocol_version eq V1_0 ) {
connection_ack_handlers.remove(msgid)
+ } else if( protocol_version eq V1_2 ) {
+ connection_ack_handlers.remove(msgid)
+ checkin_ack_id(msgid)
}
}
}
@@ -450,9 +484,7 @@ class StompProtocolHandler extends Proto
val consumer_sink = sink_manager.open()
val credit_window_filter = new CreditWindowFilter[(Session[Delivery], Delivery)](consumer_sink.map
{ event =>
- ack_handler.track(event)
- val (_, delivery) = event
-
+ val (session, delivery) = event
val message = delivery.message
var frame = if( message.codec eq StompMessageCodec ) {
message.asInstanceOf[StompFrameMessage].frame
@@ -469,6 +501,17 @@ class StompProtocolHandler extends Proto
StompFrame(MESSAGE, headers, BufferContent(body))
}
+ val ack_id = if( (protocol_version eq V1_0) || (protocol_version eq V1_1) ) {
+ frame.header(MESSAGE_ID)
+ } else {
+ val ack_id = checkout_ack_id
+ // we need to add the ACK id.
+ frame = frame.append_headers((ACK_HEADER->ack_id)::Nil)
+ ack_id
+ }
+
+ ack_handler.track(session, ack_id, delivery.size, delivery.ack)
+
if( subscription_id != None ) {
frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
}
@@ -630,7 +673,7 @@ class StompProtocolHandler extends Proto
private def queue = connection.dispatch_queue
- // uses by STOMP 1.0 clients
+ // uses by STOMP 1.0 and 1.2 clients
var connection_ack_handlers = HashMap[AsciiBuffer, StompConsumer#AckHandler]()
var protocol_version:AsciiBuffer = _
@@ -1333,7 +1376,7 @@ class StompProtocolHandler extends Proto
die("The from-seq header is only supported when you subscribe to one destination");
}
- val ack_mode = get(headers, ACK_MODE).getOrElse(ACK_MODE_AUTO)
+ val ack_mode = get(headers, ACK_HEADER).getOrElse(ACK_MODE_AUTO)
val credit_window = get(headers, CREDIT) match {
case Some(value) =>
value.toString.split(",").toList match {
@@ -1470,21 +1513,32 @@ class StompProtocolHandler extends Proto
}
}
- val messageId = get(headers, MESSAGE_ID).getOrElse(null)
- if( credit==null && messageId==null) {
- die("message id header not set")
- }
+ val (messageId,handler) = if( (protocol_version eq V1_0) || (protocol_version eq V1_1)
) {
+ val messageId = get(headers, MESSAGE_ID).getOrElse(null)
+ if( credit==null && messageId==null) {
+ die("message id header not set")
+ }
- val subscription_id = get(headers, SUBSCRIPTION);
- val handler = subscription_id match {
- case None=>
- if( !(protocol_version eq V1_0) ) {
- die("The subscription header is required")
- }
- connection_ack_handlers.get(messageId).orElse(die("Not expecting an ACK/NACK for
message id '%s'".format(messageId)))
- case Some(id) =>
- consumers.get(id).map(_.ack_handler).orElse(die("The subscription '%s' does not exist".format(id)))
+ val subscription_id = get(headers, SUBSCRIPTION);
+ val handler = subscription_id match {
+ case None=>
+ if( !(protocol_version eq V1_0) ) {
+ die("The subscription header is required")
+ }
+ connection_ack_handlers.get(messageId).orElse(die("Not expecting an ACK/NACK for
message id '%s'".format(messageId)))
+ case Some(id) =>
+ consumers.get(id).map(_.ack_handler).orElse(die("The subscription '%s' does not
exist".format(id)))
+ }
+
+ (messageId,handler)
+ } else {
+ val id = get(headers, ID).getOrElse(null)
+ if( credit==null && id==null) {
+ die("id header not set")
+ }
+ val handler = connection_ack_handlers.get(id).orElse(die("Not expecting an ACK/NACK
for id '%s'".format(id)))
+ (id,handler)
}
handler.foreach{ handler=>
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
Wed Sep 26 16:41:06 2012
@@ -37,6 +37,7 @@ class StompClient extends ShouldMatchers
val bufferSize = 64 * 1204
var key_storeage: KeyStorage = null
var bytes_written = 0L
+ var version:String = null
def open(host: String, port: Int) = {
bytes_written = 0
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Wed Sep 26 16:41:06 2012
@@ -1504,4 +1504,42 @@ class StompParallelTest extends StompTes
}
+
+ test("STOMP 1.2 client ack.") {
+ val dest = next_id("/queue/stomp12clientack_")
+
+ connect("1.2")
+ subscribe("my-sub-name", dest, "client")
+ async_send(dest, 1)
+ async_send(dest, 2)
+ assert_received(1, "my-sub-name")
+ assert_received(2, "my-sub-name")(true)
+ disconnect()
+
+ connect("1.2")
+ subscribe("my-sub-name", dest, "client")
+ async_send(dest, 3)
+ assert_received(3, "my-sub-name")(true)
+ disconnect()
+ }
+
+ test("STOMP 1.2 client-individual ack.") {
+ val dest = next_id("/queue/stomp12clientack_")
+
+ connect("1.2")
+ subscribe("my-sub-name", dest, "client-individual")
+ async_send(dest, 1)
+ async_send(dest, 2)
+ assert_received(1, "my-sub-name")
+ assert_received(2, "my-sub-name")(true)
+ disconnect()
+
+ connect("1.2")
+ subscribe("my-sub-name", dest, "client")
+ async_send(dest, 3)
+ assert_received(1, "my-sub-name")(true)
+ assert_received(3, "my-sub-name")(true)
+ disconnect()
+ }
+
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1390597&r1=1390596&r2=1390597&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
Wed Sep 26 16:41:06 2012
@@ -44,20 +44,21 @@ class StompTestSupport extends BrokerFun
"CONNECT\n" +
headers +
"\n")
- case "1.1" =>
+ case "1.1" | "1.2" =>
c.write(
"CONNECT\n" +
- "accept-version:1.1\n" +
+ "accept-version:"+version+"\n" +
"host:localhost\n" +
headers +
"\n")
- case x => throw new RuntimeException("invalid version: %f".format(x))
+ case x => throw new RuntimeException("invalid version: %s".format(x))
}
clients ::= c
c.receive()
}
def connect(version: String, c: StompClient = client, headers: String = "", connector:
String = null) = {
+ c.version = version
val frame = connect_request(version, c, headers, connector)
frame should startWith("CONNECTED\n")
frame should include regex ("""session:.+?\n""")
@@ -140,17 +141,28 @@ class StompTestSupport extends BrokerFun
}
// return a func that can ack the message.
(ack: Boolean) => {
- val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
- val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
- val sub_regex(sub) = frame
- val msgid_regex(msgid) = frame
- c.write(
- (if (ack) "ACK\n" else "NACK\n") +
- "subscription:" + sub + "\n" +
- "message-id:" + msgid + "\n" +
- (if (txid != null) "transaction:" + txid + "\n" else "") +
+ if( c.version == "1.0" || c.version== "1.1" ) {
+ val sub_regex = """(?s).*\nsubscription:([^\n]+)\n.*""".r
+ val msgid_regex = """(?s).*\nmessage-id:([^\n]+)\n.*""".r
+ val sub_regex(sub) = frame
+ val msgid_regex(msgid) = frame
+ c.write(
+ (if (ack) "ACK\n" else "NACK\n") +
+ "subscription:" + sub + "\n" +
+ "message-id:" + msgid + "\n" +
+ (if (txid != null) "transaction:" + txid + "\n" else "") +
+
+ "\n")
+ } else {
+ val ack_regex = """(?s).*\nack:([^\n]+)\n.*""".r
+ val ack_regex(id) = frame
+ c.write(
+ (if (ack) "ACK\n" else "NACK\n") +
+ "id:" + id + "\n" +
+ (if (txid != null) "transaction:" + txid + "\n" else "") +
- "\n")
+ "\n")
+ }
}
}
|