Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 58469 invoked from network); 15 Oct 2010 16:28:30 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 Oct 2010 16:28:30 -0000 Received: (qmail 2074 invoked by uid 500); 15 Oct 2010 16:28:30 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 2039 invoked by uid 500); 15 Oct 2010 16:28:29 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 2032 invoked by uid 99); 15 Oct 2010 16:28:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Oct 2010 16:28:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED,WEIRD_QUOTING X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Oct 2010 16:28:27 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B8098238897A; Fri, 15 Oct 2010 16:27:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1023000 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/ apoll... Date: Fri, 15 Oct 2010 16:27:31 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101015162731.B8098238897A@eris.apache.org> Author: chirino Date: Fri Oct 15 16:27:30 2010 New Revision: 1023000 URL: http://svn.apache.org/viewvc?rev=1023000&view=rev Log: - Consolidated the Stomp constants - Started Stomp 1.1 implementation Removed: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/Stomp.java Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1023000&r1=1022999&r2=1023000&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original) +++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Oct 15 16:27:30 2010 @@ -30,6 +30,7 @@ import path.PathFilter import ReporterLevel._ import org.fusesource.hawtbuf.{Buffer, AsciiBuffer} import collection.JavaConversions +import java.util.concurrent.atomic.AtomicLong /** * @author Hiram Chirino @@ -93,6 +94,8 @@ class VirtualHost(val broker: Broker, va var transactionManager:TransactionManagerX = new TransactionManagerX val queue_id_counter = new LongCounter + val session_counter = new AtomicLong(0) + override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id /** Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1023000&r1=1022999&r2=1023000&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Fri Oct 15 16:27:30 2010 @@ -21,11 +21,9 @@ import _root_.org.apache.activemq.apollo import java.nio.ByteBuffer import collection.mutable.{ListBuffer, HashMap} import Stomp._ -import Stomp.Headers._ import BufferConversions._ import _root_.scala.collection.JavaConversions._ -import StompFrameConstants._ import java.io.{EOFException, DataOutput, DataInput, IOException} import java.nio.channels.{SocketChannel, WritableByteChannel, ReadableByteChannel} import org.apache.activemq.apollo.transport._ @@ -48,7 +46,7 @@ object StompCodec extends Log { val frame = message.frame val rc = new MessageRecord - rc.protocol = StompConstants.PROTOCOL + rc.protocol = PROTOCOL rc.size = frame.size rc.expiration = message.expiration @@ -465,7 +463,7 @@ class StompCodec extends ProtocolCodec w // lets try to keep the content of big message outside of the JVM's garbage collection // to keep the number of GCs down when moving big messages. - def is_message = action == Commands.SEND || action == Responses.MESSAGE + def is_message = action == SEND || action == MESSAGE if( length > 1024 && memory_pool!=null && is_message) { val ma = memory_pool.alloc(length+1) Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1023000&r1=1022999&r2=1023000&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Fri Oct 15 16:27:30 2010 @@ -30,17 +30,9 @@ import java.io.{OutputStream, DataOutput * * @author chirino */ -object StompFrameConstants { - type HeaderMap = List[(AsciiBuffer, AsciiBuffer)] - type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)] - val NO_DATA = new Buffer(0); - -} - -import StompFrameConstants._ -import StompConstants._; import BufferConversions._ import Buffer._ +import Stomp._ /** * @author Hiram Chirino @@ -82,15 +74,15 @@ case class StompFrameMessage(frame:Stomp for( header <- (frame.updated_headers ::: frame.headers).reverse ) { header match { - case (Stomp.Headers.Message.MESSAGE_ID, value) => + case (MESSAGE_ID, value) => id = value - case (Stomp.Headers.Send.PRIORITY, value) => + case (PRIORITY, value) => priority = java.lang.Integer.parseInt(value).toByte - case (Stomp.Headers.Send.DESTINATION, value) => + case (DESTINATION, value) => destination = value - case (Stomp.Headers.Send.EXPIRATION_TIME, value) => + case (EXPIRATION_TIME, value) => expiration = java.lang.Long.parseLong(value) - case (Stomp.Headers.Send.PERSISTENT, value) => + case (PERSISTENT, value) => persistent = java.lang.Boolean.parseBoolean(value) case _ => } @@ -301,6 +293,139 @@ case class StompFrame(action:AsciiBuffer ).map(_._2).getOrElse(null) } + def append_headers(value:HeaderMap) = StompFrame(action, headers, content, value ::: updated_headers) + def retain = content.retain def release = content.release } + +object Stomp { + + val PROTOCOL = "stomp" + val DURABLE_PREFIX = ascii("durable:") + val DURABLE_QUEUE_KIND = ascii("stomp:sub") + + val options = new ParserOptions + options.queuePrefix = ascii("/queue/") + options.topicPrefix = ascii("/topic/") + + options.defaultDomain = Router.QUEUE_DOMAIN + + implicit def toDestination(value:AsciiBuffer):Destination = { + val d = DestinationParser.parse(value, options) + if( d==null ) { + throw new ProtocolException("Invalid stomp destiantion name: "+value); + } + d + } + + type HeaderMap = List[(AsciiBuffer, AsciiBuffer)] + type HeaderMapBuffer = ListBuffer[(AsciiBuffer, AsciiBuffer)] + val NO_DATA = new Buffer(0); + + /////////////////////////////////////////////////////////////////// + // Framing + /////////////////////////////////////////////////////////////////// + + val EMPTY_BUFFER = new Buffer(0) + val NULL: Byte = 0 + val NULL_BUFFER = new Buffer(Array(NULL)) + val NEWLINE: Byte = '\n' + val COMMA: Byte = ',' + val NEWLINE_BUFFER = new Buffer(Array(NEWLINE)) + val END_OF_FRAME_BUFFER = new Buffer(Array(NULL, NEWLINE)) + val SEPERATOR: Byte = ':' + val SEPERATOR_BUFFER = new Buffer(Array(SEPERATOR)) + + /////////////////////////////////////////////////////////////////// + // Frame Commands + /////////////////////////////////////////////////////////////////// + val STOMP = ascii("STOMP") + val CONNECT = ascii("CONNECT") + val SEND = ascii("SEND") + val DISCONNECT = ascii("DISCONNECT") + val SUBSCRIBE = ascii("SUBSCRIBE") + val UNSUBSCRIBE = ascii("UNSUBSCRIBE") + + val BEGIN_TRANSACTION = ascii("BEGIN") + val COMMIT_TRANSACTION = ascii("COMMIT") + val ABORT_TRANSACTION = ascii("ABORT") + val BEGIN = ascii("BEGIN") + val COMMIT = ascii("COMMIT") + val ABORT = ascii("ABORT") + val ACK = ascii("ACK") + + /////////////////////////////////////////////////////////////////// + // Frame Responses + /////////////////////////////////////////////////////////////////// + val CONNECTED = ascii("CONNECTED") + val ERROR = ascii("ERROR") + val MESSAGE = ascii("MESSAGE") + val RECEIPT = ascii("RECEIPT") + + /////////////////////////////////////////////////////////////////// + // Frame Headers + /////////////////////////////////////////////////////////////////// + val RECEIPT_REQUESTED = ascii("receipt") + val TRANSACTION = ascii("transaction") + val CONTENT_LENGTH = ascii("content-length") + val TRANSFORMATION = ascii("transformation") + val TRANSFORMATION_ERROR = ascii("transformation-error") + + val RECEIPT_ID = ascii("receipt-id") + + val DESTINATION = ascii("destination") + val CORRELATION_ID = ascii("correlation-id") + val REPLY_TO = ascii("reply-to") + val EXPIRATION_TIME = ascii("expires") + val PRIORITY = ascii("priority") + val TYPE = ascii("type") + val PERSISTENT = ascii("persistent") + + val MESSAGE_ID = ascii("message-id") + val PRORITY = ascii("priority") + val REDELIVERED = ascii("redelivered") + val TIMESTAMP = ascii("timestamp") + val SUBSCRIPTION = ascii("subscription") + + val ACK_MODE = ascii("ack") + val ID = ascii("id") + val SELECTOR = ascii("selector") + + val LOGIN = ascii("login") + val PASSCODE = ascii("passcode") + val CLIENT_ID = ascii("client-id") + val REQUEST_ID = ascii("request-id") + val ACCEPT_VERSION = ascii("accept-version") + val HOST = ascii("host") + + val MESSAGE_HEADER = ascii("message") + val VERSION = ascii("version") + val SESSION = ascii("session") + val RESPONSE_ID = ascii("response-id") + + /////////////////////////////////////////////////////////////////// + // Common Values + /////////////////////////////////////////////////////////////////// + val TRUE = ascii("true") + val FALSE = ascii("false") + val AUTO = ascii("auto") + val CLIENT = ascii("client") + val INDIVIDUAL = ascii("client-individual") + val V1_0 = ascii("1.0") + val V1_1 = ascii("1.1") + + val SUPPORTED_PROTOCOL_VERSIONS = Set(V1_0,V1_1) + + // public enum Transformations { + // JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON + // + // public String toString() { + // return name().replaceAll("_", "-").toLowerCase() + // } + // + // public static Transformations getValue(String value) { + // return valueOf(value.replaceAll("-", "_").toUpperCase()) + // } + // } +} \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=1023000&r1=1022999&r2=1023000&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Fri Oct 15 16:27:30 2010 @@ -27,7 +27,6 @@ import protocol.{ProtocolFactory, Protoc import java.lang.String import Stomp._ import BufferConversions._ -import StompFrameConstants._ import java.io.IOException import org.apache.activemq.apollo.selector.SelectorParser import org.apache.activemq.apollo.filter.{BooleanExpression, FilterException} @@ -39,27 +38,6 @@ import org.apache.activemq.apollo.dto.{B /** * @author Hiram Chirino */ -object StompConstants { - - val PROTOCOL = "stomp" - val DURABLE_PREFIX = ascii("durable:") - val DURABLE_QUEUE_KIND = ascii("stomp:sub") - - val options = new ParserOptions - options.queuePrefix = ascii("/queue/") - options.topicPrefix = ascii("/topic/") - - options.defaultDomain = Router.QUEUE_DOMAIN - - implicit def toDestination(value:AsciiBuffer):Destination = { - val d = DestinationParser.parse(value, options) - if( d==null ) { - throw new ProtocolException("Invalid stomp destiantion name: "+value); - } - d - } - -} /** * Creates StompCodec objects that encode/decode the * Stomp protocol. @@ -67,10 +45,8 @@ object StompConstants { * @author Hiram Chirino */ class StompProtocolCodecFactory extends ProtocolCodecFactory.Provider { - import Stomp.Commands.CONNECT - import Stomp.Commands.STOMP - def protocol = StompConstants.PROTOCOL + def protocol = PROTOCOL def createProtocolCodec() = new StompCodec(); @@ -116,8 +92,6 @@ object StompProtocol extends StompProtoc } -import StompConstants._ - object StompProtocolHandler extends Log /** @@ -131,7 +105,7 @@ class StompProtocolHandler extends Proto protected def dispatchQueue:DispatchQueue = connection.dispatchQueue - class StompConsumer(val destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO) extends BaseRetained with DeliveryConsumer { + class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination, val ackMode:AsciiBuffer, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO) extends BaseRetained with DeliveryConsumer { val dispatchQueue = StompProtocolHandler.this.dispatchQueue dispatchQueue.retain @@ -174,7 +148,7 @@ class StompProtocolHandler extends Proto false } else { if( delivery.ack!=null) { - if( ackMode eq Headers.Subscribe.AckModeValues.AUTO ) { + if( ackMode eq AUTO ) { delivery.ack(null) } else { // switch the the queue context.. this method is in the producer's context. @@ -185,7 +159,10 @@ class StompProtocolHandler extends Proto } } } - val frame = delivery.message.asInstanceOf[StompFrameMessage].frame + var frame = delivery.message.asInstanceOf[StompFrameMessage].frame + if( subscription_id != None ) { + frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil) + } frame.retain val rc = session.offer(frame) assert(rc, "offer should be accepted since it was not full") @@ -216,17 +193,7 @@ class StompProtocolHandler extends Proto session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){x=>x}, dispatchQueue, StompFrame) connection_sink = new OverflowSink(session_manager.open(dispatchQueue)); connection_sink.refiller = ^{} - - connection.connector.broker.getDefaultVirtualHost( - queue.wrap { (host)=> - this.host=host - if( this.host.direct_buffer_pool!=null ) { - val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec] - wf.memory_pool = this.host.direct_buffer_pool - } - connection.transport.resumeRead - } - ) + connection.transport.resumeRead } override def onTransportDisconnected() = { @@ -255,25 +222,30 @@ class StompProtocolHandler extends Proto override def onTransportCommand(command:Any) = { try { command match { - case StompFrame(Commands.SEND, _, _, _) => + case StompFrame(SEND, _, _, _) => on_stomp_send(command.asInstanceOf[StompFrame]) - case StompFrame(Commands.ACK, headers, content, _) => + case StompFrame(ACK, headers, content, _) => on_stomp_ack(command.asInstanceOf[StompFrame]) - case StompFrame(Commands.BEGIN, headers, content, _) => + case StompFrame(BEGIN, headers, content, _) => on_stomp_begin(headers) - case StompFrame(Commands.COMMIT, headers, content, _) => + case StompFrame(COMMIT, headers, content, _) => on_stomp_commit(headers) - case StompFrame(Commands.ABORT, headers, content, _) => + case StompFrame(ABORT, headers, content, _) => on_stomp_abort(headers) - case StompFrame(Commands.SUBSCRIBE, headers, content, _) => + case StompFrame(SUBSCRIBE, headers, content, _) => info("got command: %s", command) on_stomp_subscribe(headers) - case StompFrame(Commands.CONNECT, headers, _, _) => + + case StompFrame(STOMP, headers, _, _) => + info("got command: %s", command) + on_stomp_connect(headers) + case StompFrame(CONNECT, headers, _, _) => info("got command: %s", command) on_stomp_connect(headers) - case StompFrame(Commands.DISCONNECT, headers, content, _t) => + + case StompFrame(DISCONNECT, headers, content, _t) => info("got command: %s", command) connection.stop case s:StompCodec => @@ -286,13 +258,69 @@ class StompProtocolHandler extends Proto } } catch { case e:Exception => - die("Unexpected error: "+e); + die("Unexpected Error", e.toString); } } + var session_id:Option[AsciiBuffer] = None + var protocol_version:Option[AsciiBuffer] = None + def on_stomp_connect(headers:HeaderMap) = { - connection_sink.offer(StompFrame(Responses.CONNECTED)) + + protocol_version = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii).reverse.find{v=> + SUPPORTED_PROTOCOL_VERSIONS.contains(v) + } + + + protocol_version match { + case None => + val supported_versions = SUPPORTED_PROTOCOL_VERSIONS.mkString(",") + + _die((MESSAGE_HEADER, ascii("version not supported")):: + (VERSION, ascii(supported_versions))::Nil, + "Supported protocol versions are %s".format(supported_versions)) + + case Some(x) => + connection.transport.suspendRead + + val host_header = get(headers, HOST) + val cb: (VirtualHost)=>Unit = queue.wrap { (host)=> + + if(host!=null) { + this.host=host + + session_id = Some(ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)) + connection_sink.offer( + StompFrame(CONNECTED, List( + (VERSION, protocol_version.get), + (SESSION, session_id.get) + ))) + + if( this.host.direct_buffer_pool!=null ) { + val wf = connection.transport.getProtocolCodec.asInstanceOf[StompCodec] + wf.memory_pool = this.host.direct_buffer_pool + } + connection.transport.resumeRead + + } else { + die("Invalid virtual host: "+host_header.get) + } + } + + host_header match { + case None=> + connection.connector.broker.getDefaultVirtualHost(cb) + case Some(host)=> + connection.connector.broker.getVirtualHost(host, cb) + } + + } + + } + + def get(headers:HeaderMap, names:List[AsciiBuffer]):List[Option[AsciiBuffer]] = { + names.map(x=>get(headers, x)) } def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = { @@ -308,14 +336,14 @@ class StompProtocolHandler extends Proto def on_stomp_send(frame:StompFrame) = { - get(frame.headers, Headers.Send.DESTINATION) match { + get(frame.headers, DESTINATION) match { case None=> frame.release die("destination not set.") case Some(dest)=> - get(frame.headers, Headers.TRANSACTION) match { + get(frame.headers, TRANSACTION) match { case None=> perform_send(frame) case Some(txid)=> @@ -329,7 +357,7 @@ class StompProtocolHandler extends Proto def perform_send(frame:StompFrame, uow:StoreUOW=null): Unit = { - val destiantion: Destination = get(frame.headers, Headers.Send.DESTINATION).get + val destiantion: Destination = get(frame.headers, DESTINATION).get producerRoutes.get(destiantion) match { case None => // create the producer route... @@ -372,18 +400,18 @@ class StompProtocolHandler extends Proto def send_via_route(route:DeliveryProducerRoute, frame:StompFrame, uow:StoreUOW) = { var storeBatch:StoreUOW=null // User might be asking for ack that we have processed the message.. - val receipt = frame.header(Stomp.Headers.RECEIPT_REQUESTED) + val receipt = frame.header(RECEIPT_REQUESTED) if( !route.targets.isEmpty ) { // We may need to add some headers.. - var message = get( frame.headers, Stomp.Headers.Message.MESSAGE_ID) match { + var message = get( frame.headers, MESSAGE_ID) match { case None=> var updated_headers:HeaderMap=Nil; - updated_headers ::= (Stomp.Headers.Message.MESSAGE_ID, next_message_id) - StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content, updated_headers)) + updated_headers ::= (MESSAGE_ID, next_message_id) + StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content, updated_headers)) case Some(id)=> - StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content)) + StompFrameMessage(StompFrame(MESSAGE, frame.headers, frame.content)) } val delivery = new Delivery @@ -393,7 +421,7 @@ class StompProtocolHandler extends Proto if( receipt!=null ) { delivery.ack = { storeTx => - connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt)))) + connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))) } } @@ -409,20 +437,28 @@ class StompProtocolHandler extends Proto } else { // info("Dropping message. No consumers interested in message.") if( receipt!=null ) { - connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt)))) + connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))) } } frame.release } def on_stomp_subscribe(headers:HeaderMap) = { - get(headers, Headers.Subscribe.DESTINATION) match { + get(headers, DESTINATION) match { case Some(dest)=> val destination:Destination = dest + val subscription_id = get(headers, ID) + var id:AsciiBuffer = subscription_id match { + case None => + if( protocol_version.get == V1_0 ) + // in 1.0 it's ok if the client does not send us the + // the id header + dest + else + die("The id header is missing from the SUBSCRIBE frame"); + null - var id:AsciiBuffer = get(headers, Headers.Subscribe.ID) match { - case None => dest case Some(x:AsciiBuffer)=> x } @@ -434,16 +470,16 @@ class StompProtocolHandler extends Proto null } - val ack:AsciiBuffer = get(headers, Headers.Subscribe.ACK_MODE) match { - case None=> Headers.Subscribe.AckModeValues.AUTO + val ack:AsciiBuffer = get(headers, ACK_MODE) match { + case None=> AUTO case Some(x)=> x match { - case Headers.Subscribe.AckModeValues.AUTO=> Headers.Subscribe.AckModeValues.AUTO - case Headers.Subscribe.AckModeValues.CLIENT=> Headers.Subscribe.AckModeValues.CLIENT + case AUTO=>AUTO + case CLIENT=> CLIENT case ack:AsciiBuffer => die("Unsuported ack mode: "+ack); null } } - val selector = get(headers, Headers.Subscribe.SELECTOR) match { + val selector = get(headers, SELECTOR) match { case None=> null case Some(x)=> x try { @@ -481,7 +517,7 @@ class StompProtocolHandler extends Proto } } - val consumer = new StompConsumer(destination, ack, selector, binding); + val consumer = new StompConsumer(subscription_id, destination, ack, selector, binding); consumers += (id -> consumer) if( binding==null ) { @@ -516,11 +552,11 @@ class StompProtocolHandler extends Proto def on_stomp_ack(frame:StompFrame) = { val headers = frame.headers - get(headers, Headers.Ack.MESSAGE_ID) match { + get(headers, MESSAGE_ID) match { case Some(messageId)=> pendingAcks.get(messageId) match { case Some(ack) => - get(headers, Headers.TRANSACTION) match { + get(headers, TRANSACTION) match { case None=> perform_ack(frame) case Some(txid)=> @@ -540,18 +576,22 @@ class StompProtocolHandler extends Proto } def perform_ack(frame: StompFrame, uow:StoreUOW=null) = { - val msgid = get(frame.headers, Headers.Ack.MESSAGE_ID).get + val msgid = get(frame.headers, MESSAGE_ID).get pendingAcks.remove(msgid) match { case Some(ack) => ack(uow) case None => die("message allready acked: %s".format(msgid)) } } - private def die(msg:String) = { + private def die(msg:String, explained:String="") = { + info("Shutting connection down due to: "+msg) + _die((MESSAGE_HEADER, ascii(msg))::Nil, explained) + } + + private def _die(headers:HeaderMap, explained:String="") = { if( !connection.stopped ) { - info("Shutting connection down due to: "+msg) connection.transport.suspendRead - connection.transport.offer(StompFrame(Responses.ERROR, Nil, BufferContent(ascii(msg))) ) + connection.transport.offer(StompFrame(ERROR, headers, BufferContent(ascii(explained))) ) ^ { connection.stop() } >>: queue @@ -568,7 +608,7 @@ class StompProtocolHandler extends Proto def require_transaction_header[T](headers:HeaderMap)(proc:(AsciiBuffer)=>T):Option[T] = { - get(headers, Headers.TRANSACTION) match { + get(headers, TRANSACTION) match { case None=> die("transaction header not set") None case Some(txid)=> Some(proc(txid)) @@ -589,9 +629,9 @@ class StompProtocolHandler extends Proto def send_receipt(headers:HeaderMap) = { - get(headers, Stomp.Headers.RECEIPT_REQUESTED) match { + get(headers, RECEIPT_REQUESTED) match { case Some(receipt)=> - connection_sink.offer(StompFrame(Responses.RECEIPT, List((Stomp.Headers.Response.RECEIPT_ID, receipt)))) + connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt)))) case None=> } } @@ -616,9 +656,9 @@ class StompProtocolHandler extends Proto queue.foreach { frame=> frame.action match { - case Commands.SEND => + case SEND => perform_send(frame, uow) - case Commands.ACK => + case ACK => perform_ack(frame, uow) case _ => throw new java.lang.AssertionError("assertion failed: only send or ack frames are transactional") } Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1023000&r1=1022999&r2=1023000&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Oct 15 16:27:30 2010 @@ -22,20 +22,83 @@ import org.apache.activemq.apollo.util.F import org.apache.activemq.apollo.broker.{Broker, BrokerFactory} class StompTest extends FunSuiteSupport with ShouldMatchers { + var broker: Broker = null - var broker:Broker = null + + test("Stomp 1.0 CONNECT") { + val client = new StompClient + client.open("localhost", 61613) + + client.send( + "CONNECT\n" + + "\n") + val frame = client.receive() + frame should startWith("CONNECTED\n") + frame should include regex("""session:.+?\n""") + frame should include("version:1.0\n") + } - test("Stomp Connect") { + test("Stomp 1.1 CONNECT") { val client = new StompClient client.open("localhost", 61613) - client.send("""CONNECT -""") - val frame = client.receive() - frame should startWith ("CONNECTED") + client.send( + "CONNECT\n" + + "accept-version:1.0,1.1\n" + + "host:default\n" + + "\n") + val frame = client.receive() + frame should startWith("CONNECTED\n") + frame should include regex("""session:.+?\n""") + frame should include("version:1.1\n") } + test("Stomp 1.1 CONNECT /w STOMP Action") { + val client = new StompClient + client.open("localhost", 61613) + + client.send( + "STOMP\n" + + "accept-version:1.0,1.1\n" + + "host:default\n" + + "\n") + val frame = client.receive() + frame should startWith("CONNECTED\n") + frame should include regex("""session:.+?\n""") + frame should include("version:1.1\n") + } + + test("Stomp 1.1 CONNECT /w Version Fallback") { + val client = new StompClient + client.open("localhost", 61613) + + client.send( + "CONNECT\n" + + "accept-version:1.0,10.0\n" + + "host:default\n" + + "\n") + val frame = client.receive() + frame should startWith("CONNECTED\n") + frame should include regex("""session:.+?\n""") + frame should include("version:1.0\n") + } + + test("Stomp CONNECT /w invalid virtual host") { + val client = new StompClient + client.open("localhost", 61613) + + client.send( + "CONNECT\n" + + "accept-version:1.0,1.1\n" + + "host:invalid\n" + + "\n") + val frame = client.receive() + frame should startWith("ERROR\n") + frame should include regex("""message:.+?\n""") + } + + override protected def beforeAll() = { val uri = "xml:classpath:activemq-stomp.xml" info("Loading broker configuration from the classpath with URI: " + uri) Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=1023000&r1=1022999&r2=1023000&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Fri Oct 15 16:27:30 2010 @@ -102,18 +102,18 @@ class StompRemoteConsumer extends Remote ascii("/topic/" + destination.getName().toString()); } - var frame = StompFrame(Stomp.Commands.CONNECT); + var frame = StompFrame(CONNECT); outboundSink.offer(frame); var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil - headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination) - headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name)) + headers ::= (DESTINATION, stompDestination) + headers ::= (ID, ascii("stomp-sub-" + name)) if( persistent ) { - headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.CLIENT) + headers ::= (ACK_MODE, CLIENT) } - frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers); + frame = StompFrame(SUBSCRIBE, headers); outboundSink.offer(frame); watchdog(messageCount) } @@ -121,17 +121,17 @@ class StompRemoteConsumer extends Remote override def onTransportCommand(command: Object) = { var frame = command.asInstanceOf[StompFrame] frame match { - case StompFrame(Responses.CONNECTED, headers, _, _) => - case StompFrame(Responses.MESSAGE, headers, content, _) => + case StompFrame(CONNECTED, headers, _, _) => + case StompFrame(MESSAGE, headers, content, _) => messageReceived(); // we client ack if persistent messages are being used. if( persistent ) { - var rc = List((Stomp.Headers.Ack.MESSAGE_ID, frame.header(Stomp.Headers.Message.MESSAGE_ID))) - outboundSink.offer(StompFrame(Stomp.Commands.ACK, rc)); + var rc = List((MESSAGE_ID, frame.header(MESSAGE_ID))) + outboundSink.offer(StompFrame(ACK, rc)); } - case StompFrame(Responses.ERROR, headers, content, _) => + case StompFrame(ERROR, headers, content, _) => onFailure(new Exception("Server reported an error: " + frame.content)); case _ => onFailure(new Exception("Unexpected stomp command: " + frame.action)); @@ -155,7 +155,7 @@ class StompRemoteConsumer extends Remote } override def doStop() = { - outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT)); + outboundSink.offer(StompFrame(DISCONNECT)); dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ { transport.stop stop @@ -170,12 +170,12 @@ class StompRemoteProducer extends Remote def send_next: Unit = { var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil - headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination); + headers ::= (DESTINATION, stompDestination); if (property != null) { headers ::= (ascii(property), ascii(property)); } if( persistent ) { - headers ::= ((Stomp.Headers.RECEIPT_REQUESTED, ascii("x"))); + headers ::= ((RECEIPT_REQUESTED, ascii("x"))); } // var p = this.priority; // if (priorityMod > 0) { @@ -183,7 +183,7 @@ class StompRemoteProducer extends Remote // } var content = ascii(createPayload()); - frame = StompFrame(Stomp.Commands.SEND, headers, BufferContent(content)) + frame = StompFrame(SEND, headers, BufferContent(content)) drain() } @@ -222,21 +222,21 @@ class StompRemoteProducer extends Remote } else { stompDestination = ascii("/topic/" + destination.getName().toString()); } - outboundSink.offer(StompFrame(Stomp.Commands.CONNECT)); + outboundSink.offer(StompFrame(CONNECT)); send_next } override def onTransportCommand(command: Object) = { var frame = command.asInstanceOf[StompFrame] frame match { - case StompFrame(Responses.RECEIPT, headers, _, _) => + case StompFrame(RECEIPT, headers, _, _) => assert( persistent ) // we got the ack for the previous message we sent.. now send the next one. incrementMessageCount send_next - case StompFrame(Responses.CONNECTED, headers, _, _) => - case StompFrame(Responses.ERROR, headers, content, _) => + case StompFrame(CONNECTED, headers, _, _) => + case StompFrame(ERROR, headers, content, _) => onFailure(new Exception("Server reported an error: " + frame.content.utf8)); case _ => onFailure(new Exception("Unexpected stomp command: " + frame.action)); @@ -244,7 +244,7 @@ class StompRemoteProducer extends Remote } override def doStop() = { - outboundSink.offer(StompFrame(Stomp.Commands.DISCONNECT)); + outboundSink.offer(StompFrame(DISCONNECT)); dispatchQueue.dispatchAfter(5, TimeUnit.SECONDS, ^ { transport.stop stop Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=1023000&r1=1022999&r2=1023000&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original) +++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Fri Oct 15 16:27:30 2010 @@ -27,7 +27,8 @@ import java.net.{ProtocolException, Inet import java.lang.String._ import java.util.concurrent.TimeUnit._ import collection.mutable.Map -import org.apache.activemq.apollo.stomp.{StompClient, Stomp} +import org.apache.activemq.apollo.stomp.StompClient +import org.apache.activemq.apollo.stomp.Stomp._ /** * @@ -289,10 +290,10 @@ object StompLoadClient { while (!done.get) { if( clientAck ) { val msg = client.receiveAscii() - val start = msg.indexOf(Stomp.Headers.Message.MESSAGE_ID) + val start = msg.indexOf(MESSAGE_ID) assert( start >= 0 ) val end = msg.indexOf("\n", start) - val msgId = msg.slice(start+Stomp.Headers.Message.MESSAGE_ID.length+1, end).ascii + val msgId = msg.slice(start+MESSAGE_ID.length+1, end).ascii client.send(""" ACK message-id:"""+msgId+"""