activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1023000 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll...
Date Fri, 15 Oct 2010 16:27:31 GMT
Author: chirino
Date: Fri Oct 15 16:27:30 2010
New Revision: 1023000

URL: http://svn.apache.org/viewvc?rev=1023000&view=rev
Log:
- Consolidated the Stomp constants
- Started Stomp 1.1 implementation

Removed:
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Fri Oct 15 16:27:30 2010
@@ -30,6 +30,7 @@ import path.PathFilter
 import ReporterLevel._
 import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
 import collection.JavaConversions
+import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -93,6 +94,8 @@ class VirtualHost(val broker: Broker, va
   var transactionManager:TransactionManagerX = new TransactionManagerX
   val queue_id_counter = new LongCounter
 
+  val session_counter = new AtomicLong(0)
+
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
 
   /**

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Fri Oct 15 16:27:30 2010
@@ -21,11 +21,9 @@ import _root_.org.apache.activemq.apollo
 import java.nio.ByteBuffer
 import collection.mutable.{ListBuffer, HashMap}
 import Stomp._
-import Stomp.Headers._
 
 import BufferConversions._
 import _root_.scala.collection.JavaConversions._
-import StompFrameConstants._
 import java.io.{EOFException, DataOutput, DataInput, IOException}
 import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel}
 import org.apache.activemq.apollo.transport._
@@ -48,7 +46,7 @@ object StompCodec extends Log {
     val frame = message.frame
 
     val rc = new MessageRecord
-    rc.protocol = StompConstants.PROTOCOL
+    rc.protocol = PROTOCOL
     rc.size = frame.size
     rc.expiration = message.expiration
 
@@ -465,7 +463,7 @@ class StompCodec extends ProtocolCodec w
 
           // lets try to keep the content of big message outside of the JVM's garbage collection
           // to keep the number of GCs down when moving big messages.
-          def is_message = action == Commands.SEND || action == Responses.MESSAGE
+          def is_message = action == SEND || action == MESSAGE
           if( length > 1024 && memory_pool!=null && is_message) {
 
             val ma = memory_pool.alloc(length+1)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Fri Oct 15 16:27:30 2010
@@ -30,17 +30,9 @@ import java.io.{OutputStream, DataOutput
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-object StompFrameConstants {
-  type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
-  type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
-  val NO_DATA = new Buffer(0);
-
-}
-
-import StompFrameConstants._
-import StompConstants._;
 import BufferConversions._
 import Buffer._
+import Stomp._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -82,15 +74,15 @@ case class StompFrameMessage(frame:Stomp
 
   for( header <- (frame.updated_headers ::: frame.headers).reverse ) {
     header match {
-      case (Stomp.Headers.Message.MESSAGE_ID, value) =>
+      case (MESSAGE_ID, value) =>
         id = value
-      case (Stomp.Headers.Send.PRIORITY, value) =>
+      case (PRIORITY, value) =>
         priority = java.lang.Integer.parseInt(value).toByte
-      case (Stomp.Headers.Send.DESTINATION, value) =>
+      case (DESTINATION, value) =>
         destination = value
-      case (Stomp.Headers.Send.EXPIRATION_TIME, value) =>
+      case (EXPIRATION_TIME, value) =>
         expiration = java.lang.Long.parseLong(value)
-      case (Stomp.Headers.Send.PERSISTENT, value) =>
+      case (PERSISTENT, value) =>
         persistent = java.lang.Boolean.parseBoolean(value)
       case _ =>
     }
@@ -301,6 +293,139 @@ case class StompFrame(action:AsciiBuffer
     ).map(_._2).getOrElse(null)
   }
 
+  def append_headers(value:HeaderMap) = StompFrame(action, headers, content, value ::: updated_headers)
+
   def retain = content.retain
   def release = content.release
 }
+
+object Stomp {
+
+  val PROTOCOL = "stomp"
+  val DURABLE_PREFIX = ascii("durable:")
+  val DURABLE_QUEUE_KIND = ascii("stomp:sub")
+
+  val options = new ParserOptions
+  options.queuePrefix = ascii("/queue/")
+  options.topicPrefix = ascii("/topic/")
+
+  options.defaultDomain = Router.QUEUE_DOMAIN
+
+  implicit def toDestination(value:AsciiBuffer):Destination = {
+    val d = DestinationParser.parse(value, options)
+    if( d==null ) {
+      throw new ProtocolException("Invalid stomp destiantion name: "+value);
+    }
+    d
+  }
+
+  type HeaderMap = List[(AsciiBuffer, AsciiBuffer)]
+  type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)]
+  val NO_DATA = new Buffer(0);
+
+  ///////////////////////////////////////////////////////////////////
+  // Framing
+  ///////////////////////////////////////////////////////////////////
+
+  val EMPTY_BUFFER = new Buffer(0)
+  val NULL: Byte = 0
+  val NULL_BUFFER = new Buffer(Array(NULL))
+  val NEWLINE: Byte = '\n'
+  val COMMA: Byte = ','
+  val NEWLINE_BUFFER = new Buffer(Array(NEWLINE))
+  val END_OF_FRAME_BUFFER = new Buffer(Array(NULL, NEWLINE))
+  val SEPERATOR: Byte = ':'
+  val SEPERATOR_BUFFER = new Buffer(Array(SEPERATOR))
+
+  ///////////////////////////////////////////////////////////////////
+  // Frame Commands
+  ///////////////////////////////////////////////////////////////////
+  val STOMP = ascii("STOMP")
+  val CONNECT = ascii("CONNECT")
+  val SEND = ascii("SEND")
+  val DISCONNECT = ascii("DISCONNECT")
+  val SUBSCRIBE = ascii("SUBSCRIBE")
+  val UNSUBSCRIBE = ascii("UNSUBSCRIBE")
+
+  val BEGIN_TRANSACTION = ascii("BEGIN")
+  val COMMIT_TRANSACTION = ascii("COMMIT")
+  val ABORT_TRANSACTION = ascii("ABORT")
+  val BEGIN = ascii("BEGIN")
+  val COMMIT = ascii("COMMIT")
+  val ABORT = ascii("ABORT")
+  val ACK = ascii("ACK")
+
+  ///////////////////////////////////////////////////////////////////
+  // Frame Responses
+  ///////////////////////////////////////////////////////////////////
+  val CONNECTED = ascii("CONNECTED")
+  val ERROR = ascii("ERROR")
+  val MESSAGE = ascii("MESSAGE")
+  val RECEIPT = ascii("RECEIPT")
+
+  ///////////////////////////////////////////////////////////////////
+  // Frame Headers
+  ///////////////////////////////////////////////////////////////////
+  val RECEIPT_REQUESTED = ascii("receipt")
+  val TRANSACTION = ascii("transaction")
+  val CONTENT_LENGTH = ascii("content-length")
+  val TRANSFORMATION = ascii("transformation")
+  val TRANSFORMATION_ERROR = ascii("transformation-error")
+
+  val RECEIPT_ID = ascii("receipt-id")
+
+  val DESTINATION = ascii("destination")
+  val CORRELATION_ID = ascii("correlation-id")
+  val REPLY_TO = ascii("reply-to")
+  val EXPIRATION_TIME = ascii("expires")
+  val PRIORITY = ascii("priority")
+  val TYPE = ascii("type")
+  val PERSISTENT = ascii("persistent")
+
+  val MESSAGE_ID = ascii("message-id")
+  val PRORITY = ascii("priority")
+  val REDELIVERED = ascii("redelivered")
+  val TIMESTAMP = ascii("timestamp")
+  val SUBSCRIPTION = ascii("subscription")
+
+  val ACK_MODE = ascii("ack")
+  val ID = ascii("id")
+  val SELECTOR = ascii("selector")
+
+  val LOGIN = ascii("login")
+  val PASSCODE = ascii("passcode")
+  val CLIENT_ID = ascii("client-id")
+  val REQUEST_ID = ascii("request-id")
+  val ACCEPT_VERSION = ascii("accept-version")
+  val HOST = ascii("host")
+
+  val MESSAGE_HEADER = ascii("message")
+  val VERSION = ascii("version")
+  val SESSION = ascii("session")
+  val RESPONSE_ID = ascii("response-id")
+
+  ///////////////////////////////////////////////////////////////////
+  // Common Values
+  ///////////////////////////////////////////////////////////////////
+  val TRUE = ascii("true")
+  val FALSE = ascii("false")
+  val AUTO = ascii("auto")
+  val CLIENT = ascii("client")
+  val INDIVIDUAL = ascii("client-individual")
+  val V1_0 = ascii("1.0")
+  val V1_1 = ascii("1.1")
+
+  val SUPPORTED_PROTOCOL_VERSIONS = Set(V1_0,V1_1)
+
+  //	public enum Transformations {
+  //		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON
+  //
+  //		public String toString() {
+  //			return name().replaceAll("_", "-").toLowerCase()
+  //		}
+  //
+  //		public static Transformations getValue(String value) {
+  //			return valueOf(value.replaceAll("-", "_").toUpperCase())
+  //		}
+  //	}
+}
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Fri Oct 15 16:27:30 2010
@@ -27,7 +27,6 @@ import protocol.{ProtocolFactory, Protoc
 import java.lang.String
 import Stomp._
 import BufferConversions._
-import StompFrameConstants._
 import java.io.IOException
 import org.apache.activemq.apollo.selector.SelectorParser
 import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException}
@@ -39,27 +38,6 @@ import org.apache.activemq.apollo.dto.{B
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object StompConstants {
-
-  val PROTOCOL = "stomp"
-  val DURABLE_PREFIX = ascii("durable:")
-  val DURABLE_QUEUE_KIND = ascii("stomp:sub")
-
-  val options = new ParserOptions
-  options.queuePrefix = ascii("/queue/")
-  options.topicPrefix = ascii("/topic/")
-
-  options.defaultDomain = Router.QUEUE_DOMAIN
-
-  implicit def toDestination(value:AsciiBuffer):Destination = {
-    val d = DestinationParser.parse(value, options)
-    if( d==null ) {
-      throw new ProtocolException("Invalid stomp destiantion name: "+value);
-    }
-    d
-  }
-
-}
 /**
  * Creates StompCodec objects that encode/decode the
  * <a href="http://activemq.apache.org/stomp/">Stomp</a> protocol.
@@ -67,10 +45,8 @@ object StompConstants {
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class StompProtocolCodecFactory extends ProtocolCodecFactory.Provider {
-  import Stomp.Commands.CONNECT
-  import Stomp.Commands.STOMP
 
-  def protocol = StompConstants.PROTOCOL
+  def protocol = PROTOCOL
 
   def createProtocolCodec() = new StompCodec();
 
@@ -116,8 +92,6 @@ object StompProtocol extends StompProtoc
 
 }
 
-import StompConstants._
-
 object StompProtocolHandler extends Log
 
 /**
@@ -131,7 +105,7 @@ class StompProtocolHandler extends Proto
   
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
-  class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer,
BooleanExpression), val binding:BindingDTO) extends BaseRetained with DeliveryConsumer {
+  class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination,
val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO)
extends BaseRetained with DeliveryConsumer {
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
 
     dispatchQueue.retain
@@ -174,7 +148,7 @@ class StompProtocolHandler extends Proto
           false
         } else {
           if( delivery.ack!=null) {
-            if( ackMode eq Headers.Subscribe.AckModeValues.AUTO ) {
+            if( ackMode eq AUTO ) {
               delivery.ack(null)
             } else {
               // switch the the queue context.. this method is in the producer's context.
@@ -185,7 +159,10 @@ class StompProtocolHandler extends Proto
               }
             }
           }
-          val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+          var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+          if( subscription_id != None ) {
+            frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
+          }
           frame.retain
           val rc = session.offer(frame)
           assert(rc, "offer should be accepted since it was not full")
@@ -216,17 +193,7 @@ class StompProtocolHandler extends Proto
     session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>x},
dispatchQueue, StompFrame)
     connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
     connection_sink.refiller = ^{}
-    
-    connection.connector.broker.getDefaultVirtualHost(
-      queue.wrap { (host)=>
-        this.host=host
-        if( this.host.direct_buffer_pool!=null ) {
-          val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
-          wf.memory_pool = this.host.direct_buffer_pool
-        }
-        connection.transport.resumeRead
-      }
-    )
+    connection.transport.resumeRead
   }
 
   override def onTransportDisconnected() = {
@@ -255,25 +222,30 @@ class StompProtocolHandler extends Proto
   override def onTransportCommand(command:Any) = {
     try {
       command match {
-        case StompFrame(Commands.SEND, _, _, _) =>
+        case StompFrame(SEND, _, _, _) =>
           on_stomp_send(command.asInstanceOf[StompFrame])
-        case StompFrame(Commands.ACK, headers, content, _) =>
+        case StompFrame(ACK, headers, content, _) =>
           on_stomp_ack(command.asInstanceOf[StompFrame])
 
-        case StompFrame(Commands.BEGIN, headers, content, _) =>
+        case StompFrame(BEGIN, headers, content, _) =>
           on_stomp_begin(headers)
-        case StompFrame(Commands.COMMIT, headers, content, _) =>
+        case StompFrame(COMMIT, headers, content, _) =>
           on_stomp_commit(headers)
-        case StompFrame(Commands.ABORT, headers, content, _) =>
+        case StompFrame(ABORT, headers, content, _) =>
           on_stomp_abort(headers)
 
-        case StompFrame(Commands.SUBSCRIBE, headers, content, _) =>
+        case StompFrame(SUBSCRIBE, headers, content, _) =>
           info("got command: %s", command)
           on_stomp_subscribe(headers)
-        case StompFrame(Commands.CONNECT, headers, _, _) =>
+
+        case StompFrame(STOMP, headers, _, _) =>
+          info("got command: %s", command)
+          on_stomp_connect(headers)
+        case StompFrame(CONNECT, headers, _, _) =>
           info("got command: %s", command)
           on_stomp_connect(headers)
-        case StompFrame(Commands.DISCONNECT, headers, content, _t) =>
+
+        case StompFrame(DISCONNECT, headers, content, _t) =>
           info("got command: %s", command)
           connection.stop
         case s:StompCodec =>
@@ -286,13 +258,69 @@ class StompProtocolHandler extends Proto
       }
     }  catch {
       case e:Exception =>
-        die("Unexpected error: "+e);  
+        die("Unexpected Error", e.toString);
     }
   }
 
 
+  var session_id:Option[AsciiBuffer] = None
+  var protocol_version:Option[AsciiBuffer] = None
+
   def on_stomp_connect(headers:HeaderMap) = {
-    connection_sink.offer(StompFrame(Responses.CONNECTED))
+
+    protocol_version = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii).reverse.find{v=>
+      SUPPORTED_PROTOCOL_VERSIONS.contains(v)
+    }
+
+
+    protocol_version match {
+      case None =>
+        val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",")
+
+        _die((MESSAGE_HEADER, ascii("version not supported"))::
+            (VERSION, ascii(supported_versions))::Nil,
+            "Supported protocol versions are %s".format(supported_versions))
+
+      case Some(x) =>
+        connection.transport.suspendRead
+
+        val host_header = get(headers, HOST)
+        val cb: (VirtualHost)=>Unit = queue.wrap { (host)=>
+
+            if(host!=null) {
+              this.host=host
+
+              session_id = Some(ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet))
+              connection_sink.offer(
+                StompFrame(CONNECTED, List(
+                  (VERSION, protocol_version.get),
+                  (SESSION, session_id.get)
+                )))
+
+              if( this.host.direct_buffer_pool!=null ) {
+                val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec]
+                wf.memory_pool = this.host.direct_buffer_pool
+              }
+              connection.transport.resumeRead
+
+            } else {
+              die("Invalid virtual host: "+host_header.get)
+            }
+          }
+
+        host_header match {
+          case None=>
+            connection.connector.broker.getDefaultVirtualHost(cb)
+          case Some(host)=>
+            connection.connector.broker.getVirtualHost(host, cb)
+        }
+
+    }
+
+  }
+
+  def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = {
+    names.map(x=>get(headers, x))
   }
 
   def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
@@ -308,14 +336,14 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_send(frame:StompFrame) = {
 
-    get(frame.headers, Headers.Send.DESTINATION) match {
+    get(frame.headers, DESTINATION) match {
       case None=>
         frame.release
         die("destination not set.")
 
       case Some(dest)=>
 
-        get(frame.headers, Headers.TRANSACTION) match {
+        get(frame.headers, TRANSACTION) match {
           case None=>
             perform_send(frame)
           case Some(txid)=>
@@ -329,7 +357,7 @@ class StompProtocolHandler extends Proto
 
   def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = {
 
-    val destiantion: Destination = get(frame.headers, Headers.Send.DESTINATION).get
+    val destiantion: Destination = get(frame.headers, DESTINATION).get
     producerRoutes.get(destiantion) match {
       case None =>
         // create the producer route...
@@ -372,18 +400,18 @@ class StompProtocolHandler extends Proto
   def send_via_route(route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = {
     var storeBatch:StoreUOW=null
     // User might be asking for ack that we have processed the message..
-    val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED)
+    val receipt = frame.header(RECEIPT_REQUESTED)
 
     if( !route.targets.isEmpty ) {
 
       // We may need to add some headers..
-      var message = get( frame.headers, Stomp.Headers.Message.MESSAGE_ID) match {
+      var message = get( frame.headers, MESSAGE_ID) match {
         case None=>
           var updated_headers:HeaderMap=Nil;
-          updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id)
-          StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content,
updated_headers))
+          updated_headers ::= (MESSAGE_ID, next_message_id)
+          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, updated_headers))
         case Some(id)=>
-          StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
+          StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content))
       }
 
       val delivery = new Delivery
@@ -393,7 +421,7 @@ class StompProtocolHandler extends Proto
 
       if( receipt!=null ) {
         delivery.ack = { storeTx =>
-          connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID,
receipt))))
+          connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
         }
       }
 
@@ -409,20 +437,28 @@ class StompProtocolHandler extends Proto
     } else {
       // info("Dropping message.  No consumers interested in message.")
       if( receipt!=null ) {
-        connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID,
receipt))))
+        connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
       }
     }
     frame.release
   }
 
   def on_stomp_subscribe(headers:HeaderMap) = {
-    get(headers, Headers.Subscribe.DESTINATION) match {
+    get(headers, DESTINATION) match {
       case Some(dest)=>
 
         val destination:Destination = dest
+        val subscription_id = get(headers, ID)
+        var id:AsciiBuffer = subscription_id match {
+          case None =>
+            if( protocol_version.get == V1_0 )
+              // in 1.0 it's ok if the client does not send us the
+              // the id header
+              dest
+            else
+              die("The id header is missing from the SUBSCRIBE frame");
+              null
 
-        var id:AsciiBuffer = get(headers, Headers.Subscribe.ID) match {
-          case None => dest
           case Some(x:AsciiBuffer)=> x
         }
 
@@ -434,16 +470,16 @@ class StompProtocolHandler extends Proto
           null
         }
 
-        val ack:AsciiBuffer = get(headers, Headers.Subscribe.ACK_MODE) match {
-          case None=> Headers.Subscribe.AckModeValues.AUTO
+        val ack:AsciiBuffer = get(headers, ACK_MODE) match {
+          case None=> AUTO
           case Some(x)=> x match {
-            case Headers.Subscribe.AckModeValues.AUTO=> Headers.Subscribe.AckModeValues.AUTO
-            case Headers.Subscribe.AckModeValues.CLIENT=> Headers.Subscribe.AckModeValues.CLIENT
+            case AUTO=>AUTO
+            case CLIENT=> CLIENT
             case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null
           }
         }
 
-        val selector = get(headers, Headers.Subscribe.SELECTOR) match {
+        val selector = get(headers, SELECTOR) match {
           case None=> null
           case Some(x)=> x
             try {
@@ -481,7 +517,7 @@ class StompProtocolHandler extends Proto
               }
             }
 
-            val consumer = new StompConsumer(destination, ack, selector, binding);
+            val consumer = new StompConsumer(subscription_id, destination, ack, selector,
binding);
             consumers += (id -> consumer)
 
             if( binding==null ) {
@@ -516,11 +552,11 @@ class StompProtocolHandler extends Proto
 
   def on_stomp_ack(frame:StompFrame) = {
     val headers = frame.headers
-    get(headers, Headers.Ack.MESSAGE_ID) match {
+    get(headers, MESSAGE_ID) match {
       case Some(messageId)=>
         pendingAcks.get(messageId) match {
           case Some(ack) =>
-            get(headers, Headers.TRANSACTION) match {
+            get(headers, TRANSACTION) match {
               case None=>
                 perform_ack(frame)
               case Some(txid)=>
@@ -540,18 +576,22 @@ class StompProtocolHandler extends Proto
   }
 
   def perform_ack(frame: StompFrame, uow:StoreUOW=null) = {
-    val msgid = get(frame.headers, Headers.Ack.MESSAGE_ID).get
+    val msgid = get(frame.headers, MESSAGE_ID).get
     pendingAcks.remove(msgid) match {
       case Some(ack) => ack(uow)
       case None => die("message allready acked: %s".format(msgid))
     }
   }
 
-  private def die(msg:String) = {
+  private def die(msg:String, explained:String="") = {
+    info("Shutting connection down due to: "+msg)
+    _die((MESSAGE_HEADER, ascii(msg))::Nil, explained)
+  }
+
+  private def _die(headers:HeaderMap, explained:String="") = {
     if( !connection.stopped ) {
-      info("Shutting connection down due to: "+msg)
       connection.transport.suspendRead
-      connection.transport.offer(StompFrame(Responses.ERROR, Nil, BufferContent(ascii(msg)))
)
+      connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(explained)))
)
       ^ {
         connection.stop()
       } >>: queue
@@ -568,7 +608,7 @@ class StompProtocolHandler extends Proto
 
 
   def require_transaction_header[T](headers:HeaderMap)(proc:(AsciiBuffer)=>T):Option[T]
= {
-    get(headers, Headers.TRANSACTION) match {
+    get(headers, TRANSACTION) match {
       case None=> die("transaction header not set")
       None
       case Some(txid)=> Some(proc(txid))
@@ -589,9 +629,9 @@ class StompProtocolHandler extends Proto
 
 
   def send_receipt(headers:HeaderMap) = {
-    get(headers, Stomp.Headers.RECEIPT_REQUESTED) match {
+    get(headers, RECEIPT_REQUESTED) match {
       case Some(receipt)=>
-        connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID,
receipt))))
+        connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
       case None=>
     }
   }
@@ -616,9 +656,9 @@ class StompProtocolHandler extends Proto
 
       queue.foreach { frame=>
         frame.action match {
-          case Commands.SEND =>
+          case SEND =>
             perform_send(frame, uow)
-          case Commands.ACK =>
+          case ACK =>
             perform_ack(frame, uow)
           case _ => throw new java.lang.AssertionError("assertion failed: only send or
ack frames are transactional")
         }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Fri Oct 15 16:27:30 2010
@@ -22,20 +22,83 @@ import org.apache.activemq.apollo.util.F
 import org.apache.activemq.apollo.broker.{Broker, BrokerFactory}
 
 class StompTest extends FunSuiteSupport with ShouldMatchers {
+  var broker: Broker = null
 
-  var broker:Broker = null
+
+  test("Stomp 1.0 CONNECT") {
+    val client = new StompClient
+    client.open("localhost", 61613)
+
+    client.send(
+      "CONNECT\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex("""session:.+?\n""")
+    frame should include("version:1.0\n")
+  }
 
 
-  test("Stomp Connect") {
+  test("Stomp 1.1 CONNECT") {
     val client = new StompClient
     client.open("localhost", 61613)
-    client.send("""CONNECT
 
-""")
-   val frame = client.receive()
-   frame should startWith ("CONNECTED")
+    client.send(
+      "CONNECT\n" +
+      "accept-version:1.0,1.1\n" +
+      "host:default\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex("""session:.+?\n""")
+    frame should include("version:1.1\n")
   }
 
+  test("Stomp 1.1 CONNECT /w STOMP Action") {
+    val client = new StompClient
+    client.open("localhost", 61613)
+
+    client.send(
+      "STOMP\n" +
+      "accept-version:1.0,1.1\n" +
+      "host:default\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex("""session:.+?\n""")
+    frame should include("version:1.1\n")
+  }
+
+  test("Stomp 1.1 CONNECT /w Version Fallback") {
+    val client = new StompClient
+    client.open("localhost", 61613)
+
+    client.send(
+      "CONNECT\n" +
+      "accept-version:1.0,10.0\n" +
+      "host:default\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex("""session:.+?\n""")
+    frame should include("version:1.0\n")
+  }
+
+  test("Stomp CONNECT /w invalid virtual host") {
+    val client = new StompClient
+    client.open("localhost", 61613)
+
+    client.send(
+      "CONNECT\n" +
+      "accept-version:1.0,1.1\n" +
+      "host:invalid\n" +
+      "\n")
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex("""message:.+?\n""")
+  }
+
+
   override protected def beforeAll() = {
     val uri = "xml:classpath:activemq-stomp.xml"
     info("Loading broker configuration from the classpath with URI: " + uri)

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
Fri Oct 15 16:27:30 2010
@@ -102,18 +102,18 @@ class StompRemoteConsumer extends Remote
       ascii("/topic/" + destination.getName().toString());
     }
 
-    var frame = StompFrame(Stomp.Commands.CONNECT);
+    var frame = StompFrame(CONNECT);
     outboundSink.offer(frame);
 
     var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-    headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
-    headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+    headers ::= (DESTINATION, stompDestination)
+    headers ::= (ID, ascii("stomp-sub-" + name))
 
     if( persistent ) {
-      headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT)
+      headers ::= (ACK_MODE, CLIENT)
     }
 
-    frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+    frame = StompFrame(SUBSCRIBE, headers);
     outboundSink.offer(frame);
     watchdog(messageCount)
   }
@@ -121,17 +121,17 @@ class StompRemoteConsumer extends Remote
   override def onTransportCommand(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
-      case StompFrame(Responses.CONNECTED, headers, _, _) =>
-      case StompFrame(Responses.MESSAGE, headers, content, _) =>
+      case StompFrame(CONNECTED, headers, _, _) =>
+      case StompFrame(MESSAGE, headers, content, _) =>
           messageReceived();
 
           // we client ack if persistent messages are being used.
           if( persistent ) {
-            var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID)))
-            outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc));
+            var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID)))
+            outboundSink.offer(StompFrame(ACK, rc));
           }
 
-      case StompFrame(Responses.ERROR, headers, content, _) =>
+      case StompFrame(ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content));
       case _ =>
         onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -155,7 +155,7 @@ class StompRemoteConsumer extends Remote
   }
 
   override def doStop() = {
-    outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+    outboundSink.offer(StompFrame(DISCONNECT));
     dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
         transport.stop
         stop
@@ -170,12 +170,12 @@ class StompRemoteProducer extends Remote
 
   def send_next: Unit = {
       var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-      headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+      headers ::= (DESTINATION, stompDestination);
       if (property != null) {
         headers ::= (ascii(property), ascii(property));
       }
       if( persistent ) {
-        headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x")));
+        headers ::= ((RECEIPT_REQUESTED, ascii("x")));
       }
       //    var p = this.priority;
       //    if (priorityMod > 0) {
@@ -183,7 +183,7 @@ class StompRemoteProducer extends Remote
       //    }
 
       var content = ascii(createPayload());
-      frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content))
+      frame = StompFrame(SEND, headers, BufferContent(content))
       drain()
   }
 
@@ -222,21 +222,21 @@ class StompRemoteProducer extends Remote
     } else {
       stompDestination = ascii("/topic/" + destination.getName().toString());
     }
-    outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+    outboundSink.offer(StompFrame(CONNECT));
     send_next
   }
 
   override def onTransportCommand(command: Object) = {
     var frame = command.asInstanceOf[StompFrame]
     frame match {
-      case StompFrame(Responses.RECEIPT, headers, _, _) =>
+      case StompFrame(RECEIPT, headers, _, _) =>
         assert( persistent )
         // we got the ack for the previous message we sent.. now send the next one.
         incrementMessageCount
         send_next
 
-      case StompFrame(Responses.CONNECTED, headers, _, _) =>
-      case StompFrame(Responses.ERROR, headers, content, _) =>
+      case StompFrame(CONNECTED, headers, _, _) =>
+      case StompFrame(ERROR, headers, content, _) =>
         onFailure(new Exception("Server reported an error: " + frame.content.utf8));
       case _ =>
         onFailure(new Exception("Unexpected stomp command: " + frame.action));
@@ -244,7 +244,7 @@ class StompRemoteProducer extends Remote
   }
 
   override def doStop() = {
-    outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT));
+    outboundSink.offer(StompFrame(DISCONNECT));
     dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ {
         transport.stop
         stop

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=1023000&r1=1022999&r2=1023000&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
Fri Oct 15 16:27:30 2010
@@ -27,7 +27,8 @@ import java.net.{ProtocolException, Inet
 import java.lang.String._
 import java.util.concurrent.TimeUnit._
 import collection.mutable.Map
-import org.apache.activemq.apollo.stomp.{StompClient, Stomp}
+import org.apache.activemq.apollo.stomp.StompClient
+import org.apache.activemq.apollo.stomp.Stomp._
 
 /**
  *
@@ -289,10 +290,10 @@ object StompLoadClient {
       while (!done.get) {
         if( clientAck ) {
           val msg = client.receiveAscii()
-          val start = msg.indexOf(Stomp.Headers.Message.MESSAGE_ID)
+          val start = msg.indexOf(MESSAGE_ID)
           assert( start >= 0 )
           val end = msg.indexOf("\n", start)
-          val msgId = msg.slice(start+Stomp.Headers.Message.MESSAGE_ID.length+1, end).ascii
+          val msgId = msg.slice(start+MESSAGE_ID.length+1, end).ascii
           client.send("""
 ACK
 message-id:"""+msgId+"""



Mime
View raw message