activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1025593 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Date Wed, 20 Oct 2010 14:12:54 GMT
Author: chirino
Date: Wed Oct 20 14:12:54 2010
New Revision: 1025593

URL: http://svn.apache.org/viewvc?rev=1025593&view=rev
Log:
Implemented all the Stomp 1.1 ack modes.  Added more stomp test cases (most features are now
exercised)

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Wed Oct 20 14:12:54 2010
@@ -190,7 +190,7 @@ class Router(val host:VirtualHost) exten
     cb(queues.get(binding))
   } >>: dispatchQueue
 
-  def bind(destination:Destination, consumer:DeliveryConsumer) = retaining(consumer) {
+  def bind(destination:Destination, consumer:DeliveryConsumer, on_complete:Runnable = ^{}
) = retaining(consumer) {
 
     assert( is_topic(destination) )
 
@@ -206,6 +206,8 @@ class Router(val host:VirtualHost) exten
     )
     broadcast_consumers.put(name, consumer)
 
+    on_complete.run
+    
   } >>: dispatchQueue
 
   def unbind(destination:Destination, consumer:DeliveryConsumer) = releasing(consumer) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Wed Oct 20 14:12:54 2010
@@ -410,14 +410,20 @@ object Stomp {
   ///////////////////////////////////////////////////////////////////
   val TRUE = ascii("true")
   val FALSE = ascii("false")
-  val AUTO = ascii("auto")
-  val CLIENT = ascii("client")
-  val INDIVIDUAL = ascii("client-individual")
+
+  val ACK_MODE_AUTO = ascii("auto")
+  val ACK_MODE_NONE = ascii("none")
+  
+  val ACK_MODE_CLIENT = ascii("client")
+  val ACK_MODE_SESSION = ascii("session")
+  
+  val ACK_MODE_MESSAGE = ascii("message")
+    
   val V1_0 = ascii("1.0")
   val V1_1 = ascii("1.1")
   val DEFAULT_HEAT_BEAT = ascii("0,0")
 
-  val SUPPORTED_PROTOCOL_VERSIONS = Set(V1_0,V1_1)
+  val SUPPORTED_PROTOCOL_VERSIONS = List(V1_1, V1_0)
 
   //	public enum Transformations {
   //		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Wed Oct 20 14:12:54 2010
@@ -176,9 +176,105 @@ class StompProtocolHandler extends Proto
 
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
-  class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination,
val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO)
extends BaseRetained with DeliveryConsumer {
+
+  trait AckHandler {
+    def track(delivery:Delivery):Unit
+    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null):Unit
+  }
+
+  class AutoAckHandler extends AckHandler {
+    def track(delivery:Delivery) = {
+      if( delivery.ack!=null ) {
+        delivery.ack(null)
+      }
+    }
+    
+    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+      die("The subscription ack mode does not expect ACK frames")
+    }
+  }
+
+  class SessionAckHandler extends AckHandler{
+    var consumer_acks = ListBuffer[(AsciiBuffer, (StoreUOW)=>Unit)]()
+
+    def track(delivery:Delivery) = {
+      queue {
+        if( protocol_version eq V1_0 ) {
+          // register on the connection since 1.0 acks may not include the subscription id
+          connection_ack_handlers += ( delivery.message.id-> this )
+        }
+        consumer_acks += (( delivery.message.id, delivery.ack ))
+      }
+
+    }
+
+
+    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+
+      // session acks ack all previously recieved messages..
+      var found = false
+      val (acked, not_acked) = consumer_acks.partition{ case (id, ack)=>
+        if( found ) {
+          false
+        } else {
+          if( id == msgid ) {
+            found = true
+          }
+          true
+        }
+      }
+
+      if( acked.isEmpty ) {
+        die("ACK failed, invalid message id: %s".format(msgid))
+      } else {
+        consumer_acks = not_acked
+        acked.foreach{case (id, ack)=>
+          if( ack!=null ) {
+            ack(uow)
+          }
+        }
+      }
+
+      if( protocol_version eq V1_0 ) {
+        connection_ack_handlers.remove(msgid)
+      }
+    }
+
+
+
+  }
+  class MessageAckHandler extends AckHandler {
+    var consumer_acks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
+
+    def track(delivery:Delivery) = {
+      queue {
+        if( protocol_version eq V1_0 ) {
+          // register on the connection since 1.0 acks may not include the subscription id
+          connection_ack_handlers += ( delivery.message.id-> this )
+        }
+        consumer_acks += ( delivery.message.id -> delivery.ack )
+      }
+    }
+
+    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+      consumer_acks.remove(msgid) match {
+        case Some(ack) =>
+          if( ack!=null ) {
+            ack(uow)
+          }
+        case None => die("ACK failed, invalid message id: %s".format(msgid))
+      }
+
+      if( protocol_version eq V1_0 ) {
+        connection_ack_handlers.remove(msgid)
+      }
+    }
+  }
+
+  class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination,
val ack_handler:AckHandler, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO)
extends BaseRetained with DeliveryConsumer {
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
 
+
     dispatchQueue.retain
     setDisposer(^{
       session_manager.release
@@ -218,18 +314,7 @@ class StompProtocolHandler extends Proto
         if( session.full ) {
           false
         } else {
-          if( delivery.ack!=null) {
-            if( ackMode eq AUTO ) {
-              delivery.ack(null)
-            } else {
-              // switch the the queue context.. this method is in the producer's context.
-              queue {
-                // we need to correlate acks from the client.. to invoke the
-                // delivery ack.
-                pendingAcks += ( delivery.message.id->delivery.ack )
-              }
-            }
-          }
+          ack_handler.track(delivery)
           var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
           if( subscription_id != None ) {
             frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
@@ -262,11 +347,12 @@ class StompProtocolHandler extends Proto
   var host:VirtualHost = null
 
   private def queue = connection.dispatchQueue
-  var pendingAcks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
 
+  // uses by STOMP 1.0 clients
+  var connection_ack_handlers = HashMap[AsciiBuffer, AckHandler]()
 
-  var session_id:Option[AsciiBuffer] = None
-  var protocol_version:Option[AsciiBuffer] = None
+  var session_id:AsciiBuffer = _
+  var protocol_version:AsciiBuffer = _
 
   var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
 
@@ -313,7 +399,7 @@ class StompProtocolHandler extends Proto
           // so we know which wire format is being used.
         case frame:StompFrame=>
 
-          if( protocol_version eq None ) {
+          if( protocol_version == null ) {
 
             info("got command: %s", frame)
             frame.action match {
@@ -364,9 +450,15 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_connect(headers:HeaderMap):Unit = {
 
-
-    protocol_version = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii).reverse.find{v=>
-      SUPPORTED_PROTOCOL_VERSIONS.contains(v)
+    val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
+    protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v)
) match {
+      case Some(x) => x
+      case None=>
+        val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",")
+        _die((MESSAGE_HEADER, ascii("version not supported"))::
+            (VERSION, ascii(supported_versions))::Nil,
+            "Supported protocol versions are %s".format(supported_versions))
+        return
     }
 
     val heart_beat = get(headers, HEART_BEAT).getOrElse(DEFAULT_HEAT_BEAT)
@@ -406,52 +498,40 @@ class StompProtocolHandler extends Proto
         return
     }
 
-    protocol_version match {
-      case None =>
-        val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",")
-
-        _die((MESSAGE_HEADER, ascii("version not supported"))::
-            (VERSION, ascii(supported_versions))::Nil,
-            "Supported protocol versions are %s".format(supported_versions))
-        return
-
-      case Some(x) =>
-        connection.transport.suspendRead
-
-        val host_header = get(headers, HOST)
-        val cb: (VirtualHost)=>Unit = (host)=>
-          queue {
-            if(host!=null) {
-              this.host=host
-
-              val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
-              session_id = Some(ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet))
-
-              connection_sink.offer(
-                StompFrame(CONNECTED, List(
-                  (VERSION, protocol_version.get),
-                  (SESSION, session_id.get),
-                  (HEART_BEAT, outbound_heart_beat_header)
-                )))
-
-              if( this.host.direct_buffer_pool!=null ) {
-                val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
-                wf.memory_pool = this.host.direct_buffer_pool
-              }
-              connection.transport.resumeRead
+    connection.transport.suspendRead
 
-            } else {
-              die("Invalid virtual host: "+host_header.get)
-            }
+    val host_header = get(headers, HOST)
+    val cb: (VirtualHost)=>Unit = (host)=>
+      queue {
+        if(host!=null) {
+          this.host=host
+
+          val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
+          session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+
+          connection_sink.offer(
+            StompFrame(CONNECTED, List(
+              (VERSION, protocol_version),
+              (SESSION, session_id),
+              (HEART_BEAT, outbound_heart_beat_header)
+            )))
+
+          if( this.host.direct_buffer_pool!=null ) {
+            val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+            wf.memory_pool = this.host.direct_buffer_pool
           }
+          connection.transport.resumeRead
 
-        host_header match {
-          case None=>
-            connection.connector.broker.getDefaultVirtualHost(cb)
-          case Some(host)=>
-            connection.connector.broker.getVirtualHost(host, cb)
+        } else {
+          die("Invalid virtual host: "+host_header.get)
         }
+      }
 
+    host_header match {
+      case None=>
+        connection.connector.broker.getDefaultVirtualHost(cb)
+      case Some(host)=>
+        connection.connector.broker.getVirtualHost(host, cb)
     }
 
   }
@@ -485,7 +565,7 @@ class StompProtocolHandler extends Proto
             perform_send(frame)
           case Some(txid)=>
             get_or_create_tx_queue(txid){ txqueue=>
-              txqueue.add(frame)
+              txqueue.add(frame, (uow)=>{perform_send(frame, uow)} )
             }
         }
 
@@ -580,7 +660,8 @@ class StompProtocolHandler extends Proto
     frame.release
   }
 
-  def on_stomp_subscribe(headers:HeaderMap) = {
+  def on_stomp_subscribe(headers:HeaderMap):Unit = {
+    val receipt = get(headers, RECEIPT_REQUESTED)
     get(headers, DESTINATION) match {
       case Some(dest)=>
 
@@ -588,13 +669,13 @@ class StompProtocolHandler extends Proto
         val subscription_id = get(headers, ID)
         var id:AsciiBuffer = subscription_id match {
           case None =>
-            if( protocol_version.get == V1_0 )
+            if( protocol_version eq V1_0 )
               // in 1.0 it's ok if the client does not send us the
               // the id header
               dest
             else
               die("The id header is missing from the SUBSCRIBE frame");
-              null
+              return
 
           case Some(x:AsciiBuffer)=> x
         }
@@ -607,11 +688,14 @@ class StompProtocolHandler extends Proto
           null
         }
 
-        val ack:AsciiBuffer = get(headers, ACK_MODE) match {
-          case None=> AUTO
+        val ack = get(headers, ACK_MODE) match {
+          case None=> new AutoAckHandler
           case Some(x)=> x match {
-            case AUTO=>AUTO
-            case CLIENT=> CLIENT
+            case ACK_MODE_AUTO=>new AutoAckHandler
+            case ACK_MODE_NONE=>new AutoAckHandler
+            case ACK_MODE_CLIENT=> new SessionAckHandler
+            case ACK_MODE_SESSION=> new SessionAckHandler
+            case ACK_MODE_MESSAGE=> new MessageAckHandler
             case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
           }
         }
@@ -660,7 +744,11 @@ class StompProtocolHandler extends Proto
             if( binding==null ) {
 
               // consumer is bind bound as a topic
-              host.router.bind(destination, consumer)
+              host.router.bind(destination, consumer, ^{
+                receipt.foreach{ receipt =>
+                  connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+                }
+              })
               consumer.release
 
             } else {
@@ -670,6 +758,9 @@ class StompProtocolHandler extends Proto
                 x match {
                   case Some(queue:Queue) =>
                     queue.bind(consumer::Nil)
+                    receipt.foreach{ receipt =>
+                      connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+                    }
                     consumer.release
                   case None => throw new RuntimeException("case not yet implemented.")
                 }
@@ -687,36 +778,55 @@ class StompProtocolHandler extends Proto
 
   }
 
-  def on_stomp_ack(frame:StompFrame) = {
+  def on_stomp_ack(frame:StompFrame):Unit = {
     val headers = frame.headers
     get(headers, MESSAGE_ID) match {
       case Some(messageId)=>
-        pendingAcks.get(messageId) match {
-          case Some(ack) =>
-            get(headers, TRANSACTION) match {
+
+        val subscription_id = get(headers, SUBSCRIPTION);
+        if( subscription_id == None && !(protocol_version eq V1_0) ) {
+          die("The subscription header is required")
+          return
+        }
+
+        val handler = subscription_id match {
+          case None=>
+
+            connection_ack_handlers.get(messageId) match {
+              case None =>
+                die("Not expecting ack for message id '%s'".format(messageId))
+                None
+              case Some(handler) =>
+                Some(handler)
+            }
+
+          case Some(id) =>
+            consumers.get(id) match {
               case None=>
-                perform_ack(frame)
-              case Some(txid)=>
-                get_or_create_tx_queue(txid){ txqueue=>
-                  txqueue.add(frame)
-                }
+                die("The subscription '%s' does not exist".format(id))
+                None
+              case Some(consumer)=>
+                Some(consumer.ack_handler)
             }
+        }
 
+        handler.foreach{ handler=>
+
+          get(headers, TRANSACTION) match {
+            case None=>
+              handler.perform_ack(messageId, null)
+            case Some(txid)=>
+              get_or_create_tx_queue(txid){ _.add(frame, (uow)=>{ handler.perform_ack(messageId,
uow)} ) }
+          }
+
+          get(headers, RECEIPT_REQUESTED).foreach { receipt =>
+            connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
+          }
 
-          case None =>
-            // This can easily happen if the consumer is doing client acks on something like
-            // a non-durable topic.
-            // trace("The specified message id is not waiting for a client ack: %s", messageId)
         }
-      case None=> die("message id header not set")
-    }
-  }
 
-  def perform_ack(frame: StompFrame, uow:StoreUOW=null) = {
-    val msgid = get(frame.headers, MESSAGE_ID).get
-    pendingAcks.remove(msgid) match {
-      case Some(ack) => ack(uow)
-      case None => die("message allready acked: %s".format(msgid))
+
+      case None=> die("message id header not set")
     }
   }
 
@@ -779,10 +889,10 @@ class StompProtocolHandler extends Proto
     // TODO: eventually we want to back this /w a broker Queue which
     // can provides persistence and memory swapping.
 
-    val queue = ListBuffer[StompFrame]()
+    val queue = ListBuffer[(StompFrame, (StoreUOW)=>Unit)]()
 
-    def add(frame:StompFrame) = {
-      queue += frame
+    def add(frame:StompFrame, proc:(StoreUOW)=>Unit) = {
+      queue += ( frame->proc )
     }
 
     def commit(onComplete: => Unit) = {
@@ -793,14 +903,15 @@ class StompProtocolHandler extends Proto
         null
       }
 
-      queue.foreach { frame=>
-        frame.action match {
-          case SEND =>
-            perform_send(frame, uow)
-          case ACK =>
-            perform_ack(frame, uow)
-          case _ => throw new java.lang.AssertionError("assertion failed: only send or
ack frames are transactional")
-        }
+      queue.foreach { case (frame, proc) =>
+        proc(uow)
+//        frame.action match {
+//          case SEND =>
+//            perform_send(frame, uow)
+//          case ACK =>
+//            perform_ack(frame, uow)
+//          case _ => throw new java.lang.AssertionError("assertion failed: only send
or ack frames are transactional")
+//        }
       }
       if( uow!=null ) {
         uow.onComplete(^{
@@ -826,7 +937,9 @@ class StompProtocolHandler extends Proto
     if ( transactions.contains(txid) ) {
       die("transaction allready started")
     } else {
-      proc( transactions.put(txid, new TransactionQueue).get )
+      val queue = new TransactionQueue
+      transactions.put(txid, queue)
+      proc( queue )
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
Wed Oct 20 14:12:54 2010
@@ -45,14 +45,14 @@ import java.io._
       socket.close
     }
 
-    def send(frame:String) = {
+    def write(frame:String) = {
       out.write(frame.getBytes("UTF-8"))
       out.write(0)
       out.write('\n')
       out.flush
     }
 
-    def send(frame:Array[Byte]) = {
+    def write(frame:Array[Byte]) = {
       out.write(frame)
       out.write(0)
       out.write('\n')

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Wed Oct 20 14:12:54 2010
@@ -19,8 +19,9 @@ package org.apache.activemq.apollo.stomp
 import org.scalatest.matchers.ShouldMatchers
 import org.apache.activemq.apollo.util.FunSuiteSupport
 import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
+import org.scalatest.BeforeAndAfterEach
 
-class StompTestSupport extends FunSuiteSupport with ShouldMatchers {
+class StompTestSupport extends FunSuiteSupport with ShouldMatchers with BeforeAndAfterEach
{
   var broker: Broker = null
 
   override protected def beforeAll() = {
@@ -30,26 +31,51 @@ class StompTestSupport extends FunSuiteS
     Thread.sleep(1000); //TODO implement waitUntilStarted
   }
 
+  var client = new StompClient
+
   override protected def afterAll() = {
     broker.stop
   }
 
+  override protected def afterEach() = {
+    super.afterEach
+    client.close
+  }
+
+  def connect(version:String, c: StompClient = client) = {
+    c.open("localhost", 61613)
+    version match {
+      case "1.0"=>
+        c.write(
+          "CONNECT\n" +
+          "\n")
+      case "1.1"=>
+        c.write(
+          "CONNECT\n" +
+          "accept-version:1.1\n" +
+          "host:default\n" +
+          "\n")
+      case x=> throw new RuntimeException("invalid version: %f".format(x))
+    }
+    val frame = c.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex("""session:.+?\n""")
+    frame should include("version:"+version+"\n")
+    c
+  }
+
+  def wait_for_receipt(id:String, c: StompClient = client): Unit = {
+    val frame = c.receive()
+    frame should startWith("RECEIPT\n")
+    frame should include("receipt-id:"+id+"\n")
+  }
 
 }
 
 class Stomp10ConnectTest extends StompTestSupport {
 
   test("Stomp 1.0 CONNECT") {
-    val client = new StompClient
-    client.open("localhost", 61613)
-
-    client.send(
-      "CONNECT\n" +
-      "\n")
-    val frame = client.receive()
-    frame should startWith("CONNECTED\n")
-    frame should include regex("""session:.+?\n""")
-    frame should include("version:1.0\n")
+    connect("1.0")
   }
 
 }
@@ -57,25 +83,14 @@ class Stomp10ConnectTest extends StompTe
 class Stomp11ConnectTest extends StompTestSupport {
 
   test("Stomp 1.1 CONNECT") {
-    val client = new StompClient
-    client.open("localhost", 61613)
-
-    client.send(
-      "CONNECT\n" +
-      "accept-version:1.0,1.1\n" +
-      "host:default\n" +
-      "\n")
-    val frame = client.receive()
-    frame should startWith("CONNECTED\n")
-    frame should include regex("""session:.+?\n""")
-    frame should include("version:1.1\n")
+    connect("1.1")
   }
 
   test("Stomp 1.1 CONNECT /w STOMP Action") {
-    val client = new StompClient
+
     client.open("localhost", 61613)
 
-    client.send(
+    client.write(
       "STOMP\n" +
       "accept-version:1.0,1.1\n" +
       "host:default\n" +
@@ -87,10 +102,10 @@ class Stomp11ConnectTest extends StompTe
   }
 
   test("Stomp 1.1 CONNECT /w valid version fallback") {
-    val client = new StompClient
+
     client.open("localhost", 61613)
 
-    client.send(
+    client.write(
       "CONNECT\n" +
       "accept-version:1.0,10.0\n" +
       "host:default\n" +
@@ -102,10 +117,10 @@ class Stomp11ConnectTest extends StompTe
   }
 
   test("Stomp 1.1 CONNECT /w invalid version fallback") {
-    val client = new StompClient
+
     client.open("localhost", 61613)
 
-    client.send(
+    client.write(
       "CONNECT\n" +
       "accept-version:9.0,10.0\n" +
       "host:default\n" +
@@ -117,10 +132,10 @@ class Stomp11ConnectTest extends StompTe
   }
 
   test("Stomp CONNECT /w invalid virtual host") {
-    val client = new StompClient
+
     client.open("localhost", 61613)
 
-    client.send(
+    client.write(
       "CONNECT\n" +
       "accept-version:1.0,1.1\n" +
       "host:invalid\n" +
@@ -135,10 +150,10 @@ class Stomp11ConnectTest extends StompTe
 class Stomp11HeartBeatTest extends StompTestSupport {
 
   test("Stomp 1.1 Broker sends heart-beat") {
-    val client = new StompClient
+
     client.open("localhost", 61613)
 
-    client.send(
+    client.write(
       "CONNECT\n" +
       "accept-version:1.1\n" +
       "host:default\n" +
@@ -164,10 +179,10 @@ class Stomp11HeartBeatTest extends Stomp
   test("Stomp 1.1 Broker times out idle connection") {
     StompProtocolHandler.inbound_heartbeat = 1000L
     try {
-      val client = new StompClient
+  
       client.open("localhost", 61613)
 
-      client.send(
+      client.write(
         "CONNECT\n" +
         "accept-version:1.1\n" +
         "host:default\n" +
@@ -192,4 +207,421 @@ class Stomp11HeartBeatTest extends Stomp
     }
   }
 
+}
+
+class StompDestinationTest extends StompTestSupport {
+
+  test("Queue order preserved") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/example\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/example\n" +
+      "id:0\n" +
+      "\n")
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should include ("subscription:0\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+    get(2)
+    get(3)
+  }
+
+  test("Topic drops messages sent before before subscription is established") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/updates\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+    put(1)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/updates\n" +
+      "id:0\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    put(2)
+    put(3)
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should include ("subscription:0\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+
+    // note that the put(1) message gets dropped.
+    get(2)
+    get(3)
+  }
+
+  test("Topic /w Durable sub retains messages.") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/updates\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/updates\n" +
+      "id:durable:my-sub-name\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+    client.close
+
+    // Close him out.. since his id started /w durable: then
+    // the topic subscription will be persistent accross client
+    // connections.
+
+    connect("1.1")
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/updates\n" +
+      "id:durable:my-sub-name\n" +
+      "\n")
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should include ("subscription:durable:my-sub-name\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+
+    get(1)
+    get(2)
+    get(3)
+  }
+
+  test("Queue and a selector") {
+    connect("1.1")
+
+    def put(id:Int, color:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/selected\n" +
+        "color:"+color+"\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+    put(1, "red")
+    put(2, "blue")
+    put(3, "red")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/selected\n" +
+      "selector:color='red'\n" +
+      "id:0\n" +
+      "\n")
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+    get(3)
+  }
+
+  test("Topic and a selector") {
+    connect("1.1")
+
+    def put(id:Int, color:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/selected\n" +
+        "color:"+color+"\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/selected\n" +
+      "selector:color='red'\n" +
+      "id:0\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    put(1, "red")
+    put(2, "blue")
+    put(3, "red")
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+    get(3)
+  }
+}
+
+class StompTransactionTest extends StompTestSupport {
+  
+
+  test("Queue and a transacted send") {
+    connect("1.1")
+
+    def put(id:Int, tx:String=null) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/transacted\n" +
+        { if(tx!=null) { "transaction:"+tx+"\n" } else { "" } }+
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    put(1)
+    client.write(
+      "BEGIN\n" +
+      "transaction:x\n" +
+      "\n")
+    put(2, "x")
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/transacted\n" +
+      "id:0\n" +
+      "\n")
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+    get(3)
+
+    client.write(
+      "COMMIT\n" +
+      "transaction:x\n" +
+      "\n")
+
+    get(2)
+    
+  }
+
+  test("Topic and a transacted send") {
+    connect("1.1")
+
+    def put(id:Int, tx:String=null) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/transacted\n" +
+        { if(tx!=null) { "transaction:"+tx+"\n" } else { "" } }+
+        "\n" +
+        "message:"+id+"\n")
+    }
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/transacted\n" +
+      "id:0\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
+
+    put(1)
+    client.write(
+      "BEGIN\n" +
+      "transaction:x\n" +
+      "\n")
+    put(2, "x")
+    put(3)
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+
+    get(1)
+    get(3)
+
+    client.write(
+      "COMMIT\n" +
+      "transaction:x\n" +
+      "\n")
+
+    get(2)
+
+  }
+
+}
+
+
+class StompAckModeTest extends StompTestSupport {
+
+  test("ack:session redelivers on client disconnect") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/ackmode-session\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/ackmode-session\n" +
+      "ack:session\n" +
+      "id:0\n" +
+      "\n")
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should include ("subscription:0\n")
+      frame should include regex("message-id:.+?\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+
+      val p = """(?s).*?\nmessage-id:(.+?)\n.*""".r
+      frame match {
+        case p(x) => x
+        case _=> null
+      }
+    }
+
+    get(1)
+    val mid = get(2)
+    get(3)
+
+    // Ack the first 2 messages..
+    client.write(
+      "ACK\n" +
+      "subscription:0\n" +
+      "message-id:"+mid+"\n" +
+      "receipt:0\n"+
+      "\n")
+
+    wait_for_receipt("0")
+    client.close
+
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/ackmode-session\n" +
+      "ack:session\n" +
+      "id:0\n" +
+      "\n")
+    get(3)
+    
+    
+  }
+
+
+  test("ack:message redelivers on client disconnect") {
+    connect("1.1")
+
+    def put(id:Int) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/ackmode-message\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/ackmode-message\n" +
+      "ack:message\n" +
+      "id:0\n" +
+      "\n")
+
+    def get(id:Int) = {
+      val frame = client.receive()
+      println(frame)
+      frame should startWith("MESSAGE\n")
+      frame should include ("subscription:0\n")
+      frame should include regex("message-id:.+?\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+
+      val p = """(?s).*?\nmessage-id:(.+?)\n.*""".r
+      frame match {
+        case p(x) => x
+        case _=> null
+      }
+    }
+
+    get(1)
+    val mid = get(2)
+    get(3)
+
+    // Ack the first 2 messages..
+    client.write(
+      "ACK\n" +
+      "subscription:0\n" +
+      "message-id:"+mid+"\n" +
+      "receipt:0\n"+
+      "\n")
+
+    wait_for_receipt("0")
+    client.close
+
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/ackmode-message\n" +
+      "ack:session\n" +
+      "id:0\n" +
+      "\n")
+    get(1)
+    get(3)
+
+
+  }
+
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Wed Oct 20 14:12:54 2010
@@ -186,7 +186,7 @@ object StompLoadClient {
       try {
         val connectUri = new URI(uri)
         client.open(connectUri.getHost(), connectUri.getPort())
-        client.send("""CONNECT
+        client.write("""CONNECT
 
 """)
         client.receive("CONNECTED")
@@ -235,7 +235,7 @@ object StompLoadClient {
           this.client=client
           var i =0
           while (!done.get) {
-            client.send(content)
+            client.write(content)
             if( syncSend ) {
               // waits for the reply..
               client.skip
@@ -272,7 +272,7 @@ object StompLoadClient {
       while (!done.get) {
         connect {
           val headers = Map[AsciiBuffer, AsciiBuffer]()
-          client.send(
+          client.write(
             "SUBSCRIBE\n" +
              (if(!durable) {""} else {"id:durable:mysub-"+id+"\n"}) + 
              (if(selector==null) {""} else {"selector: "+selector+"\n"}) +
@@ -294,7 +294,7 @@ object StompLoadClient {
           assert( start >= 0 )
           val end = msg.indexOf("\n", start)
           val msgId = msg.slice(start+MESSAGE_ID.length+1, end).ascii
-          client.send("""
+          client.write("""
 ACK
 message-id:"""+msgId+"""
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala?rev=1025593&r1=1025592&r2=1025593&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompRemoteClients.scala
Wed Oct 20 14:12:54 2010
@@ -52,7 +52,7 @@ class StompRemoteConsumer extends Remote
     headers ::= (ID, ascii("stomp-sub-" + name))
 
     if( persistent ) {
-      headers ::= (ACK_MODE, CLIENT)
+      headers ::= (ACK_MODE, ACK_MODE_CLIENT)
     }
 
     frame = StompFrame(SUBSCRIBE, headers);



Mime
View raw message