activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1050420 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Date Fri, 17 Dec 2010 15:25:36 GMT
Author: chirino
Date: Fri Dec 17 15:25:36 2010
New Revision: 1050420

URL: http://svn.apache.org/viewvc?rev=1050420&view=rev
Log:
Got queue browsers working from stomp.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Fri Dec 17 15:25:36 2010
@@ -538,9 +538,7 @@ class Queue(val host: VirtualHost, var i
     for (consumer <- values) {
       all_subscriptions.get(consumer) match {
         case Some(subscription) =>
-          all_subscriptions -= consumer
           subscription.close
-          addCapacity( -tune_consumer_buffer )
         case None =>
       }
 
@@ -1011,7 +1009,7 @@ class QueueEntry(val queue:Queue, val se
           } else {
             if (sub.offer(delivery)) {
               // advance: accepted...
-              advancing == sub
+              advancing += sub
             } else {
               // hold back: flow controlled
               heldBack += sub
@@ -1316,6 +1314,8 @@ class Subscription(val queue:Queue, val 
 
   // This opens up the consumer
   pos = queue.head_entry;
+  assert(pos!=null)
+
   session = consumer.connect(this)
   session.refiller = pos
   queue.head_entry ::= this
@@ -1327,22 +1327,27 @@ class Subscription(val queue:Queue, val 
   }
 
   def close() = {
-    pos -= this
-    pos = null
-
-    // nack all the acquired entries.
-    var next = acquired.getHead
-    while( next !=null ) {
-      val cur = next;
-      next = next.getNext
-      cur.nack // this unlinks the entry.
-    }
-
-    session.refiller = NOOP
-    session.close
-    session = null
+    if(pos!=null) {
+      pos -= this
+      pos = null
+
+      queue.all_subscriptions -= consumer
+      queue.addCapacity( - queue.tune_consumer_buffer )
+
+      // nack all the acquired entries.
+      var next = acquired.getHead
+      while( next !=null ) {
+        val cur = next;
+        next = next.getNext
+        cur.nack // this unlinks the entry.
+      }
+
+      session.refiller = NOOP
+      session.close
+      session = null
 
-    queue.trigger_swap
+      queue.trigger_swap
+    } else {}
   }
 
   /**
@@ -1352,9 +1357,6 @@ class Subscription(val queue:Queue, val 
   def advance(value:QueueEntry):Unit = {
 
     assert(value!=null)
-    if( pos == null ) {
-      assert(pos!=null)
-    }
 
     advanced_size += pos.size
 
@@ -1363,6 +1365,9 @@ class Subscription(val queue:Queue, val 
 
     if( tail_parked ) {
       tail_parkings += 0
+      if( browser ) {
+        close
+      }
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Fri Dec 17 15:25:36 2010
@@ -187,17 +187,24 @@ class SinkMux[T](val downstream:Sink[T],
     }
   }
 
-  def open(producer_queue:DispatchQueue):Sink[T] = {
+  def open(producer_queue:DispatchQueue,allow_overflow:Boolean=false):Sink[T] = {
     val session = createSession(producer_queue, session_max_credits)
     sessions ::= session
-    session
+    if( allow_overflow ) {
+      new OverflowSink(session)
+    } else {
+      session
+    }
   }
 
-  def close(session:Sink[T]) = {
-    val s = session.asInstanceOf[Session[T]]
-    sessions = sessions.filterNot( _ == s )
-    s.producer_queue {
-      s.close
+  def close(session:Sink[T]):Unit = {
+    session match {
+      case s:OverflowSink[T] => close(s.downstream)
+      case s:Session[T] =>
+        sessions = sessions.filterNot( _ == s )
+        s.producer_queue {
+          s.close
+        }
     }
   }
 
@@ -267,10 +274,12 @@ class Session[T](val producer_queue:Disp
   }
 
   def close = {
-    assert(getCurrentQueue eq producer_queue)
-    credit_adder.release
-    downstream.release
-    closed=true
+    if( !closed ) {
+      closed=true
+      assert(getCurrentQueue eq producer_queue)
+      credit_adder.release
+      downstream.release
+    }
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Fri Dec 17 15:25:36 2010
@@ -416,11 +416,15 @@ object Stomp {
   val SESSION = ascii("session")
   val RESPONSE_ID = ascii("response-id")
 
+  val BROWSER = ascii("browser")
+  val EXCLUSIVE = ascii("exclusive")
+
   ///////////////////////////////////////////////////////////////////
   // Common Values
   ///////////////////////////////////////////////////////////////////
   val TRUE = ascii("true")
   val FALSE = ascii("false")
+  val END = ascii("end")
 
   val ACK_MODE_AUTO = ascii("auto")
   val ACK_MODE_NONE = ascii("none")

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Fri Dec 17 15:25:36 2010
@@ -197,16 +197,18 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination,
val ack_handler:AckHandler, val selector:(String, BooleanExpression), val binding:BindingDTO)
extends BaseRetained with DeliveryConsumer {
-    val dispatchQueue = StompProtocolHandler.this.dispatchQueue
+  class StompConsumer(
 
+    val subscription_id:Option[AsciiBuffer],
+    val destination:Destination,
+    val ack_handler:AckHandler,
+    val selector:(String, BooleanExpression),
+    val binding:BindingDTO,
+    override val browser:Boolean
 
+  ) extends BaseRetained with DeliveryConsumer {
 
-    dispatchQueue.retain
-    setDisposer(^{
-      session_manager.release
-      dispatchQueue.release
-    })
+    val dispatchQueue = StompProtocolHandler.this.dispatchQueue
 
     override def connection = Some(StompProtocolHandler.this.connection)
 
@@ -225,16 +227,48 @@ class StompProtocolHandler extends Proto
     }
 
     def connect(p:DeliveryProducer) = new DeliverySession {
+
+      // This session object should only be used from the dispatch queue context
+      // of the producer.
+
       retain
 
       def producer = p
       def consumer = StompConsumer.this
+      var closed = false
 
       val session = session_manager.open(producer.dispatchQueue)
 
       def close = {
-        session_manager.close(session)
-        release
+        assert(getCurrentQueue == producer.dispatchQueue)
+        if( !closed ) {
+          closed = true
+          if( browser ) {
+            // Then send the end of browse message.
+            var frame = StompFrame(MESSAGE, (BROWSER, END)::Nil, BufferContent(EMPTY_BUFFER))
+            if( subscription_id != None ) {
+              frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
+            }
+
+            if( session.full ) {
+              // session is full so use an overflow sink so to hold the message,
+              // and then trigger closing the session once it empties out.
+              val sink = new OverflowSink(session)
+              sink.refiller = ^{
+                session_manager.close(session)
+                release
+              }
+              sink.offer(frame)
+            } else {
+              session.offer(frame)
+              session_manager.close(session)
+              release
+            }
+          } else {
+            session_manager.close(session)
+            release
+          }
+        }
       }
 
       // Delegate all the flow control stuff to the session
@@ -736,6 +770,7 @@ class StompProtocolHandler extends Proto
 
     val topic = destination.domain == Router.TOPIC_DOMAIN
     var persistent = get(headers, PERSISTENT).map( _ == TRUE ).getOrElse(false)
+    var browser = get(headers, BROWSER).map( _ == TRUE ).getOrElse(false)
 
     val ack = get(headers, ACK_MODE) match {
       case None=> new AutoAckHandler
@@ -788,7 +823,7 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding);
+    val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding,
browser);
     consumers += (id -> consumer)
 
     if( binding==null ) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Dec 17 15:25:36 2010
@@ -225,6 +225,61 @@ class Stomp11HeartBeatTest extends Stomp
 
 class StompDestinationTest extends StompTestSupport {
 
+  test("Queue browsers don't consume the messages") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/browsing\n" +
+        "receipt:0\n" +
+        "\n" +
+        "message:"+id+"\n")
+      wait_for_receipt("0")
+    }
+
+    put(1)
+    put(2)
+    put(3)
+
+    // create a browser subscription.
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/browsing\n" +
+      "browser:true\n" +
+      "id:0\n" +
+      "\n")
+
+    def get(sub:Int, id:Int) = {
+      val frame = client.receive()
+      info(frame)
+      frame should startWith("MESSAGE\n")
+      frame should include ("subscription:%d\n".format(sub))
+      frame should endWith regex("\n\nmessage:%d\n".format(id))
+    }
+    get(0,1)
+    get(0,2)
+    get(0,3)
+
+    // Should get a browse end message
+    val frame = client.receive()
+    frame should startWith("MESSAGE\n")
+    frame should include ("subscription:0\n")
+    frame should include ("browser:end\n")
+
+    // create a regular subscription.
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/browsing\n" +
+      "id:1\n" +
+      "\n")
+
+    get(1,1)
+    get(1,2)
+    get(1,3)
+
+  }
+
   test("Queue order preserved") {
     connect("1.1")
 
@@ -409,6 +464,7 @@ class StompDestinationTest extends Stomp
     get(1)
     get(3)
   }
+
 }
 
 class StompSslDestinationTest extends StompDestinationTest {

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1050420&r1=1050419&r2=1050420&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Fri Dec
17 15:25:36 2010
@@ -718,6 +718,21 @@ ack mode to consume reliable messages. A
 client which have not been acked when the client disconnects will get
 redelivered to another subscribed client.
 
+### Browsing Subscriptions
+
+A normal subscription on a queue will consume messages so that no
+other subscription will get a copy of the message.  If you want to
+browse all the messages on a queue in a non-destructive fashion, you
+can create browsing subscription.  To make a subscription an browsing
+subscription, just add the `browser:true` header.  For example:
+
+    SUBSCRIBE
+    id:mysub
+    browser:true
+    destination:/queue/foo
+    
+    ^@
+
 ### Topic Durable Subscriptions
 
 A durable subscription is a queue which is subscribed to a topic so that
@@ -728,7 +743,8 @@ durable subscription and since it's back
 will have the topic's messages load balanced across them.
 
 To create or reattach to a a durable subscription with STOMP, you uniquely name
-the durable subscription using the `id` header on the `SUBSCRIBE` frame and
+the durable subscription using the `id` header on the `
+SCRIBE` frame and
 also adding a `persistent:true` header. Example:
 
     SUBSCRIBE



Mime
View raw message