activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1210707 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Date Mon, 05 Dec 2011 23:33:12 GMT
Author: chirino
Date: Mon Dec  5 23:33:12 2011
New Revision: 1210707

URL: http://svn.apache.org/viewvc?rev=1210707&view=rev
Log:
Fixes APLO-99, APLO-100, and APLO-101 Adding more options to `SUBSCRIBE` so you can implement
Kafka style durable pub/sub over a queue.

Support a `browser-end:false` option on a `SUBSCRIBE` frame to allow a Queue browser to wait
for new messages instead of closing the subscription.
Support a `include-seq` and `from-seq` option on the `SUBSCRIBE` frame to finely control from
where in a queue a subscription starts from.
Support Kafka style durable pub/sub.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1210707&r1=1210706&r2=1210707&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Mon Dec  5 23:33:12 2011
@@ -60,6 +60,10 @@ trait DeliveryConsumer extends Retained 
 
   def receive_buffer_size = 64*1024
 
+  def close_on_drain = browser
+  def start_from_tail = false
+  def set_starting_seq(seq:Long) = {}
+
   def browser = false
   def exclusive = false
   def dispatch_queue:DispatchQueue;
@@ -181,6 +185,11 @@ class Delivery {
   var message: Message = null
 
   /**
+   * The sequence id the destination assigned the message
+   */
+  var seq:Long = -1
+
+  /**
    * The id the store assigned the message
    */
   var storeKey:Long = -1
@@ -211,6 +220,7 @@ class Delivery {
 
   def set(other:Delivery) = {
     size = other.size
+    seq = other.seq
     message = other.message
     storeKey = other.storeKey
     storeLocator = other.storeLocator

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1210707&r1=1210706&r2=1210707&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Mon Dec  5 23:33:12 2011
@@ -479,6 +479,7 @@ class Queue(val router: LocalRouter, val
         val entry = tail_entry
         tail_entry = new QueueEntry(Queue.this, next_message_seq)
         val queueDelivery = delivery.copy
+        queueDelivery.seq = entry.seq
         entry.init(queueDelivery)
         
         if( tune_persistent ) {
@@ -1730,8 +1731,13 @@ class Subscription(val queue:Queue, val 
   // This opens up the consumer
   def open() = {
     consumer.retain
-    pos = queue.head_entry;
+    if(consumer.start_from_tail) {
+      pos = queue.tail_entry;
+    } else {
+      pos = queue.head_entry;
+    }
     assert(pos!=null)
+    consumer.set_starting_seq(pos.seq)
 
     session = consumer.connect(this)
     session.refiller = dispatch_queue.runnable {
@@ -1739,7 +1745,7 @@ class Subscription(val queue:Queue, val 
         pos.run
       }
     }
-    queue.head_entry ::= this
+    pos ::= this
 
     queue.all_subscriptions += consumer -> this
     queue.consumer_counter += 1
@@ -1752,7 +1758,7 @@ class Subscription(val queue:Queue, val 
     if( queue.service_state.is_started ) {
       // kick off the initial dispatch.
       refill_prefetch
-      queue.dispatch_queue << queue.head_entry
+      queue.dispatch_queue << pos
     }
     queue.check_idle
   }
@@ -1805,7 +1811,7 @@ class Subscription(val queue:Queue, val 
 
     if( tail_parked ) {
       tail_parkings += 0
-      if( browser ) {
+      if( session.consumer.close_on_drain ) {
         close
       }
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1210707&r1=1210706&r2=1210707&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Mon Dec  5 23:33:12 2011
@@ -405,9 +405,12 @@ object Stomp {
   val REDIRECT_HEADER = ascii("redirect")
 
   val BROWSER = ascii("browser")
+  val BROWSER_END = ascii("browser-end")
   val EXCLUSIVE = ascii("exclusive")
   val USER_ID = ascii("user-id")
   val TEMP = ascii("temp")
+  val INCLUDE_SEQ = ascii("include-seq")
+  val FROM_SEQ = ascii("from-seq")
 
   ///////////////////////////////////////////////////////////////////
   // Common Values

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1210707&r1=1210706&r2=1210707&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Mon Dec  5 23:33:12 2011
@@ -129,7 +129,7 @@ class StompProtocolHandler extends Proto
 
   protected def dispatchQueue:DispatchQueue = connection.dispatch_queue
 
-  class StompConsumer(
+  class StompConsumer (
 
     val subscription_id:Option[AsciiBuffer],
     val destination:Array[DestinationDTO],
@@ -137,7 +137,10 @@ class StompProtocolHandler extends Proto
     val selector:(String, BooleanExpression),
     override val browser:Boolean,
     override val exclusive:Boolean,
-    val initial_credit_window:(Int,Int, Boolean)
+    val initial_credit_window:(Int,Int, Boolean),
+    val include_seq:Option[AsciiBuffer],
+    val from_seq:Long,
+    override val close_on_drain:Boolean
   ) extends BaseRetained with DeliveryConsumer {
 
 ////  The following comes in handy if we need to debug the
@@ -163,6 +166,13 @@ class StompProtocolHandler extends Proto
 //      r.release
 //    }
 
+    override def start_from_tail = from_seq == -1
+
+    var starting_seq:Long = 0L
+    override def set_starting_seq(seq: Long):Unit = {
+      starting_seq=seq
+    }
+
     val ack_source = createSource(new EventAggregator[(Int, Int), (Int, Int)] {
       def mergeEvent(previous:(Int, Int), event:(Int, Int)) = {
         if( previous == null ) {
@@ -341,6 +351,9 @@ class StompProtocolHandler extends Proto
         val value = ascii(delivery.redeliveries.toString())
         frame = frame.append_headers((header, value)::Nil)
       }
+      if( include_seq.isDefined ) {
+        frame = frame.append_headers((include_seq.get, ascii(delivery.seq.toString))::Nil)
+      }
       frame
     }, Delivery)
 
@@ -362,16 +375,34 @@ class StompProtocolHandler extends Proto
 
     def is_persistent = false
 
-    def matches(delivery:Delivery) = {
-      if( delivery.message.protocol eq StompProtocol ) {
-        if( selector!=null ) {
-          selector._2.matches(delivery.message)
-        } else {
-          true
-        }
-      } else {
-        false
+    def match_protocol(delivery:Delivery)= delivery.message.protocol eq StompProtocol
+    def match_selector(delivery:Delivery)= selector._2.matches(delivery.message)
+    def match_from_seq(delivery:Delivery)= delivery.seq >= from_seq
+    def match_from_tail(delivery:Delivery)= delivery.seq >= starting_seq
+
+    val matchers = {
+      var l = ListBuffer[(Delivery)=>Boolean]()
+      l += match_protocol
+      if( from_seq > 0 ) {
+        l += match_from_seq
+      }
+      if( start_from_tail ) {
+        l += match_from_tail
+      }
+      if( selector!=null ) {
+        l += match_selector 
+      }
+      l.toArray
+    }
+
+    def matches(delivery:Delivery):Boolean = {
+      var i=0;
+      while( i < matchers.length ) {
+        if(!matchers(i)(delivery))
+          return false
+        i+=1
       }
+      true
     }
 
     class StompConsumerSession(val producer:DeliveryProducer) extends DeliverySession with
SessionSinkFilter[Delivery] {
@@ -389,7 +420,7 @@ class StompProtocolHandler extends Proto
         assert(producer.dispatch_queue.isExecuting)
         if( !closed ) {
           closed = true
-          if( browser ) {
+          if( browser && close_on_drain ) {
             // Then send the end of browse message.
             val headers:HeaderMap = List(DESTINATION->EMPTY, MESSAGE_ID->EMPTY, BROWSER->END)
             var frame = StompFrame(MESSAGE, headers, BufferContent(EMPTY_BUFFER))
@@ -1047,7 +1078,21 @@ class StompProtocolHandler extends Proto
 //    val topic = destination.isInstanceOf[TopicDestinationDTO]
     var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
     var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
-    var exclusive = get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
+    var browser_end = browser && get(headers, BROWSER_END).map( _ == TRUE ).getOrElse(true)
+    var exclusive = !browser && get(headers, EXCLUSIVE).map( _ == TRUE ).getOrElse(false)
+    var include_seq = get(headers, INCLUDE_SEQ)
+    val from_seq_opt = get(headers, FROM_SEQ)
+    
+    
+    def is_multi_destination = if( destination.length > 1 ) {
+      true
+    } else {
+      val path = destination_parser.decode_path(destination(0).path)
+      PathParser.containsWildCards(path)
+    }
+    if( from_seq_opt.isDefined && is_multi_destination ) {
+      die("The from-seq header is only supported when you subscribe to one destination");
+    }
 
     val ack_mode = get(headers, ACK_MODE).getOrElse(ACK_MODE_AUTO)
     val credit_window = get(headers, CREDIT) match {
@@ -1096,7 +1141,8 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser,
exclusive, credit_window);
+    val from_seq = from_seq_opt.map(_.toString.toLong).getOrElse(0L)
+    val consumer = new StompConsumer(subscription_id, destination, ack_mode, selector, browser,
exclusive, credit_window, include_seq, from_seq, browser_end);
     consumers += (id -> consumer)
 
     reset {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1210707&r1=1210706&r2=1210707&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Mon Dec  5 23:33:12 2011
@@ -251,6 +251,174 @@ class Stomp11HeartBeatTest extends Stomp
 
 class StompDestinationTest extends StompTestSupport {
 
+  test("Setting `from-seq` header to -1 results in subscription starting at end of the queue.")
{
+    connect("1.1")
+
+    def send(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/from-seq-end\n" +
+        "receipt:0\n"+
+        "\n" +
+        "message:"+id+"\n")
+      wait_for_receipt("0")
+    }
+
+    send(1)
+    send(2)
+    send(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/from-seq-end\n" +
+      "receipt:0\n"+
+      "browser:true\n"+
+      "browser-end:false\n"+
+      "id:0\n" +
+      "from-seq:-1\n"+
+      "\n")
+    wait_for_receipt("0")
+
+    send(4)
+
+    def get(seq:Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("message:"+seq+"\n")
+    }
+    get(4)
+  }
+
+  test("The `browser-end:false` can be used to continously browse a queue.") {
+    connect("1.1")
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/browsing-continous\n" +
+      "browser:true\n"+
+      "browser-end:false\n"+
+      "receipt:0\n"+
+      "id:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    def send(id:Int) = client.write(
+      "SEND\n" +
+      "destination:/queue/browsing-continous\n" +
+      "\n" +
+      "message:"+id+"\n")
+
+    send(1)
+    send(2)
+
+    def get(seq:Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      expect(true)(frame.contains("message:"+seq+"\n"))
+    }
+    get(1)
+    get(2)
+  }
+
+  test("Message sequence headers are added when `include-seq` is used.") {
+    connect("1.1")
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/seq_queue\n" +
+      "receipt:0\n"+
+      "id:0\n" +
+      "include-seq:seq\n"+
+      "\n")
+    wait_for_receipt("0")
+
+    def send(id:Int) = client.write(
+      "SEND\n" +
+      "destination:/queue/seq_queue\n" +
+      "\n" +
+      "message:"+id+"\n")
+
+    send(1)
+    send(2)
+
+    def get(seq:Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      expect(true)(frame.contains("seq:"+seq+"\n"))
+    }
+    get(1)
+    get(2)
+  }
+
+  test("The `from-seq` header can be used to resume delivery from a given point in a queue.")
{
+    connect("1.1")
+
+    def send(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/from_queue\n" +
+        "receipt:0\n"+
+        "\n" +
+        "message:"+id+"\n")
+      wait_for_receipt("0")
+    }
+
+    send(1)
+    send(2)
+    send(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/from_queue\n" +
+      "receipt:0\n"+
+      "browser:true\n"+
+      "id:0\n" +
+      "include-seq:seq\n"+
+      "from-seq:2\n"+
+      "\n")
+    wait_for_receipt("0")
+
+    def get(seq:Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("seq:"+seq+"\n")
+    }
+    get(2)
+    get(3)
+  }
+
+
+  test("The `from-seq` header is not supported with wildcard or composite destinations.")
{
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/some,/queue/other\n" +
+      "browser:true\n"+
+      "id:0\n" +
+      "include-seq:seq\n"+
+      "from-seq:2\n"+
+      "\n")
+
+    var frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:The from-seq header is only supported when you subscribe
to one destination")
+
+    client.close
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/some.*\n" +
+      "browser:true\n"+
+      "id:0\n" +
+      "include-seq:seq\n"+
+      "from-seq:2\n"+
+      "\n")
+
+    frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:The from-seq header is only supported when you subscribe
to one destination")
+  }
+
   test("Selector Syntax") {
     connect("1.1")
 

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1210707&r1=1210706&r2=1210707&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Mon Dec
 5 23:33:12 2011
@@ -1353,10 +1353,114 @@ Example:
 
     MESSAGE
     subscription:mysub
+    destination:
+    message-id:
     browser:end
     
     ^@
 
+If you want the browsing subscription to remain active and continue to 
+listen for message once the last message on the queue is reached, you
+should add the `browser-end:false` header to the `SUBSCRIBE` frame.  When
+the `browser-end:false` header is added the subscription will not be
+sent the "end of browse" message previously described.
+
+### Queue Message Sequences
+
+As messages are added to a queue in a broker, they are assigned an incrementing
+sequence number.  Messages delivered to subscribers can be updated to include
+the sequence number if the `include-seq` header is added to the `SUBSCRIBE`
+frame.  This header should be set to a header name which will be added
+messages delivered to hold value of the sequence number.
+
+Example:
+
+    SUBSCRIBE
+    id:mysub
+    destination:/queue/foo
+    include-seq:seq
+    
+    ^@
+
+Then you can expect to receive messages like:
+
+    MESSAGE
+    subscription:mysub
+    destination:/queue/foo
+    seq:1
+    
+    Hello
+    ^@
+    MESSAGE
+    subscription:mysub
+    destination:/queue/foo
+    seq:2
+    
+    World
+    ^@
+
+Furthermore, you configure the `SUBSCRIBE` frame so that the subscription
+only receives messages that have a sequence id that is equal to or greater 
+than a requested value by using the `from-seq` header.  Example:
+
+    SUBSCRIBE
+    id:mysub
+    destination:/queue/foo
+    from-seq:10
+    
+    ^@
+
+If the `from-seq` is set to `-1`, then the subscription will receive messages
+from the tail of the queue.  In other words, it will only receive new messages sent 
+to the queue.
+
+Note: You can only use the `from-seq` header with normal destinations.  If you 
+attempt to use it with a wildcard or composite destination then the connection
+will be closed due to invalid usage.
+
+### Using Queue Browsers and Sequences to Implement Durable Pub/Sub
+
+You can combine the Queue Browser and and Queue Message Sequence features
+to implement durable pub/sub in a way which results in even better performance
+than the Topic Durable Subscriptions feature can provide.  
+
+To use this approach, your subscribing application must be able to keep track 
+of the last sequence number processed from the destination.  The application 
+would typically store this as part of the unit of work it performs to process 
+the message.
+
+In this scenario, you use multiple queue browsers against queue.  The browsing
+subscriptions will use the `include-seq`, `from-seq`, and `browser-end` so that
+they can resume receiving messages from the queue from the last known sequence.
+
+Example:
+
+    SUBSCRIBE
+    id:mysub
+    destination:/queue/foo
+    browser:true
+    browser-end:false
+    include-seq:seq
+    from-seq:503
+    
+    ^@
+
+If you are starting a new consumer, you can either set `from-seq:0` to 
+receive a copy of all the messages that has been sent to the queue or you
+can use `from-seq:-1` to skip over all the message that exist in the queue
+and only receive new messages.
+
+Since the consuming application records the last sequence that was processed,
+you can use the default auto acknowledge mode but still avoid message loss.
+
+Since this approach does not consume the messages from the queue, you should
+either:
+
+* Send messages to the queue with an expiration time so that they are automatically
+  delete once the expiration time is reached.
+* Periodically run a normal consumer application which can cursor the queue
+  and delete messages are are deemed no longer needed.
+
 ### Exclusive Subscriptions
 
 We maintain the order of messages in queues and dispatch them to



Mime
View raw message