activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1050111 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Date Thu, 16 Dec 2010 19:28:41 GMT
Author: chirino
Date: Thu Dec 16 19:28:40 2010
New Revision: 1050111

URL: http://svn.apache.org/viewvc?rev=1050111&view=rev
Log:
Implemented the new STOMP 1.1 NAK feature header encoding/decoding.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1050111&r1=1050110&r2=1050111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Thu Dec 16 19:28:40 2010
@@ -164,7 +164,7 @@ class Delivery extends BaseRetained {
    * Set if the producer requires an ack to be sent back.  Consumer
    * should execute once the message is processed.
    */
-  var ack:(StoreUOW)=>Unit = null
+  var ack:(Boolean, StoreUOW)=>Unit = null
 
   def copy() = (new Delivery).set(this)
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1050111&r1=1050110&r2=1050111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Thu Dec 16 19:28:40 2010
@@ -66,7 +66,7 @@ class Queue(val host: VirtualHost, var i
   })
 
 
-  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry,
StoreUOW)](), dispatchQueue)
+  val ack_source = createSource(new ListEventAggregator[(Subscription#AcquiredQueueEntry,
Boolean, StoreUOW)](), dispatchQueue)
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
@@ -431,8 +431,12 @@ class Queue(val host: VirtualHost, var i
 
   def drain_acks = {
     ack_source.getData.foreach {
-      case (entry, tx) =>
-        entry.ack(tx)
+      case (entry, consumed, tx) =>
+        if( consumed ) {
+          entry.ack(tx)
+        } else {
+          entry.nack
+        }
     }
     messages.refiller.run
   }
@@ -1035,8 +1039,8 @@ class QueueEntry(val queue:Queue, val se
 
                 val acquiredQueueEntry = sub.acquire(entry)
                 val acquiredDelivery = delivery.copy
-                acquiredDelivery.ack = (tx)=> {
-                  queue.ack_source.merge((acquiredQueueEntry, tx))
+                acquiredDelivery.ack = (consumed, tx)=> {
+                  queue.ack_source.merge((acquiredQueueEntry, consumed, tx))
                 }
 
                 assert(sub.offer(acquiredDelivery), "sub should have accepted, it had reported
not full earlier.")

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1050111&r1=1050110&r2=1050111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Thu Dec 16 19:28:40 2010
@@ -570,7 +570,7 @@ case class DeliveryProducerRoute(val rou
   // Dispatch.
   //
 
-  var pendingAck: (StoreUOW)=>Unit = null
+  var pendingAck: (Boolean, StoreUOW)=>Unit = null
   var overflow:Delivery=null
   var overflowSessions = List[DeliverySession]()
   var refiller:Runnable=null
@@ -621,11 +621,11 @@ case class DeliveryProducerRoute(val rou
       if (delivery.uow != null) {
         val ack = pendingAck
         delivery.uow.setDisposer(^ {
-          ack(null)
+          ack(true, null)
         })
 
       } else {
-        pendingAck(null)
+        pendingAck(true, null)
       }
       pendingAck==null
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1050111&r1=1050110&r2=1050111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
Thu Dec 16 19:28:40 2010
@@ -64,7 +64,7 @@ object StompCodec extends Log {
     if( !frame.updated_headers.isEmpty ) {
       for( (key, value) <- frame.updated_headers ) {
         key.writeTo(os)
-        os.write(SEPERATOR)
+        os.write(COLON)
         value.writeTo(os)
         os.write(NEWLINE)
       }
@@ -82,7 +82,7 @@ object StompCodec extends Log {
     } else {
       for( (key, value) <- frame.headers ) {
         key.writeTo(os)
-        os.write(SEPERATOR)
+        os.write(COLON)
         value.writeTo(os)
         os.write(NEWLINE)
       }
@@ -122,7 +122,7 @@ object StompCodec extends Log {
     var line = read_line
     while( line.length() > 0 ) {
       try {
-          val seperatorIndex = line.indexOf(SEPERATOR)
+          val seperatorIndex = line.indexOf(COLON)
           if( seperatorIndex<0 ) {
               throw new IOException("Header line missing seperator.")
           }
@@ -225,7 +225,7 @@ class StompCodec extends ProtocolCodec w
     if( !frame.updated_headers.isEmpty ) {
       for( (key, value) <- frame.updated_headers ) {
         key.writeTo(os)
-        os.write(SEPERATOR)
+        os.write(COLON)
         value.writeTo(os)
         os.write(NEWLINE)
       }
@@ -244,7 +244,7 @@ class StompCodec extends ProtocolCodec w
     } else {
       for( (key, value) <- frame.headers ) {
         key.writeTo(os)
-        os.write(SEPERATOR)
+        os.write(COLON)
         value.writeTo(os)
         os.write(NEWLINE)
       }
@@ -431,7 +431,7 @@ class StompCodec extends ProtocolCodec w
         }
 
         try {
-            val seperatorIndex = line.indexOf(SEPERATOR)
+            val seperatorIndex = line.indexOf(COLON)
             if( seperatorIndex<0 ) {
                 throw new IOException("Header line missing seperator [" + ascii(line) + "]")
             }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=1050111&r1=1050110&r2=1050111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
Thu Dec 16 19:28:40 2010
@@ -337,8 +337,14 @@ object Stomp {
   val COMMA: Byte = ','
   val NEWLINE_BUFFER = new Buffer(Array(NEWLINE))
   val END_OF_FRAME_BUFFER = new Buffer(Array(NULL, NEWLINE))
-  val SEPERATOR: Byte = ':'
-  val SEPERATOR_BUFFER = new Buffer(Array(SEPERATOR))
+  val COLON: Byte = ':'
+  val COLON_BUFFER = new Buffer(Array(COLON))
+  val ESCAPE:Byte = '\\'
+
+  val ESCAPE_ESCAPE_SEQ = ascii("""\\""")
+  val COLON_ESCAPE_SEQ = ascii("""\c""")
+  val NEWLINE_ESCAPE_SEQ = ascii("""\n""")
+
 
   ///////////////////////////////////////////////////////////////////
   // Frame Commands
@@ -357,6 +363,7 @@ object Stomp {
   val COMMIT = ascii("COMMIT")
   val ABORT = ascii("ABORT")
   val ACK = ascii("ACK")
+  val NACK = ascii("NACK")
 
   ///////////////////////////////////////////////////////////////////
   // Frame Responses
@@ -372,6 +379,7 @@ object Stomp {
   val RECEIPT_REQUESTED = ascii("receipt")
   val TRANSACTION = ascii("transaction")
   val CONTENT_LENGTH = ascii("content-length")
+  val CONTENT_TYPE = ascii("content-type")
   val TRANSFORMATION = ascii("transformation")
   val TRANSFORMATION_ERROR = ascii("transformation-error")
 
@@ -428,6 +436,9 @@ object Stomp {
 
   val SUPPORTED_PROTOCOL_VERSIONS = List(V1_1, V1_0)
 
+  val TEXT_PLAIN = ascii("text/plain")
+
+
   //	public enum Transformations {
   //		JMS_BYTE, JMS_OBJECT_XML, JMS_OBJECT_JSON, JMS_MAP_XML, JMS_MAP_JSON
   //

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1050111&r1=1050110&r2=1050111&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Thu Dec 16 19:28:40 2010
@@ -17,10 +17,9 @@
 package org.apache.activemq.apollo.stomp
 
 import _root_.org.fusesource.hawtbuf._
-import collection.mutable.{ListBuffer, HashMap}
 import org.fusesource.hawtdispatch._
 
-import AsciiBuffer._
+import Buffer._
 import org.apache.activemq.apollo.broker._
 import java.lang.String
 import protocol.{HeartBeatMonitor, ProtocolHandler}
@@ -37,6 +36,7 @@ import scala.util.continuations._
 import org.apache.activemq.apollo.dto._
 import org.apache.activemq.apollo.transport.tcp.SslTransport
 import java.security.cert.X509Certificate
+import collection.mutable.{ArrayBuffer, ListBuffer, HashMap}
 
 object StompProtocolHandler extends Log {
 
@@ -65,29 +65,65 @@ class StompProtocolHandler extends Proto
 
   def protocol = "stomp"
 
+  def decode_header(value:Buffer):String = {
+    var rc = new ByteArrayOutputStream(value.length)
+    val pos = new Buffer(value)
+    val max = value.offset + value.length
+    while( pos.offset < max ) {
+      if( pos.startsWith(ESCAPE_ESCAPE_SEQ) ) {
+        rc.write(ESCAPE)
+        pos.offset += 2
+      } else if( pos.startsWith(COLON_ESCAPE_SEQ) ) {
+        rc.write(COLON)
+        pos.offset += 2
+      } else if( pos.startsWith(NEWLINE_ESCAPE_SEQ) ) {
+        rc.write(NEWLINE)
+        pos.offset += 2
+      } else {
+        rc.write(pos.data(pos.offset))
+        pos.offset += 1
+      }
+    }
+    new String(rc.toByteArray, "UTF-8")
+  }
+
+  def encode_header(value:String) = {
+    val data = value.getBytes("UTF-8")
+    var rc = new ByteArrayOutputStream(data.length)
+    data.foreach {
+      case ESCAPE  => rc.write(ESCAPE_ESCAPE_SEQ)
+      case COLON   => rc.write(COLON_ESCAPE_SEQ)
+      case NEWLINE => rc.write(COLON_ESCAPE_SEQ)
+      case c       => rc.write(c)
+
+    }
+    rc.toBuffer.ascii
+  }
+
   override protected def log = StompProtocolHandler
 
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
   trait AckHandler {
     def track(delivery:Delivery):Unit
-    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null):Unit
+    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null):Unit
   }
 
   class AutoAckHandler extends AckHandler {
     def track(delivery:Delivery) = {
       if( delivery.ack!=null ) {
-        delivery.ack(null)
+        delivery.ack(true, null)
       }
     }
 
-    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
-      async_die("The subscription ack mode does not expect ACK frames")
+    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
+      async_die("The subscription ack mode does not expect ACK or NACK frames")
     }
+
   }
 
   class SessionAckHandler extends AckHandler{
-    var consumer_acks = ListBuffer[(AsciiBuffer, (StoreUOW)=>Unit)]()
+    var consumer_acks = ListBuffer[(AsciiBuffer, (Boolean, StoreUOW)=>Unit)]()
 
     def track(delivery:Delivery) = {
       queue.apply {
@@ -101,7 +137,7 @@ class StompProtocolHandler extends Proto
     }
 
 
-    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
 
       // session acks ack all previously recieved messages..
       var found = false
@@ -122,7 +158,7 @@ class StompProtocolHandler extends Proto
         consumer_acks = not_acked
         acked.foreach{case (id, ack)=>
           if( ack!=null ) {
-            ack(uow)
+            ack(consumed, uow)
           }
         }
       }
@@ -132,11 +168,9 @@ class StompProtocolHandler extends Proto
       }
     }
 
-
-
   }
   class MessageAckHandler extends AckHandler {
-    var consumer_acks = HashMap[AsciiBuffer, (StoreUOW)=>Unit]()
+    var consumer_acks = HashMap[AsciiBuffer, (Boolean, StoreUOW)=>Unit]()
 
     def track(delivery:Delivery) = {
       queue.apply {
@@ -148,11 +182,11 @@ class StompProtocolHandler extends Proto
       }
     }
 
-    def perform_ack(msgid: AsciiBuffer, uow:StoreUOW=null) = {
+    def perform_ack(consumed:Boolean, msgid: AsciiBuffer, uow:StoreUOW=null) = {
       consumer_acks.remove(msgid) match {
         case Some(ack) =>
           if( ack!=null ) {
-            ack(uow)
+            ack(consumed, uow)
           }
         case None => async_die("ACK failed, invalid message id: %s".format(msgid))
       }
@@ -163,7 +197,7 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination,
val ack_handler:AckHandler, val selector:(AsciiBuffer, BooleanExpression), val binding:BindingDTO)
extends BaseRetained with DeliveryConsumer {
+  class StompConsumer(val subscription_id:Option[AsciiBuffer], val destination:Destination,
val ack_handler:AckHandler, val selector:(String, BooleanExpression), val binding:BindingDTO)
extends BaseRetained with DeliveryConsumer {
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
 
 
@@ -285,7 +319,7 @@ class StompProtocolHandler extends Proto
     } else {
       debug("Shutting connection down due to: "+msg)
     }
-    die((MESSAGE_HEADER, ascii(msg))::Nil, "")
+    die((MESSAGE_HEADER, encode_header(msg))::Nil, "")
   }
 
   private def die[T](headers:HeaderMap, body:String):T = {
@@ -293,7 +327,13 @@ class StompProtocolHandler extends Proto
       dead = true
       waiting_on = "shutdown"
       connection.transport.resumeRead
-      connection_sink.offer(StompFrame(ERROR, headers, BufferContent(ascii(body))) )
+
+      if( body.isEmpty ) {
+        connection_sink.offer(StompFrame(ERROR, headers, BufferContent(EMPTY_BUFFER)) )
+      } else {
+        connection_sink.offer(StompFrame(ERROR, (CONTENT_TYPE, TEXT_PLAIN)::headers, BufferContent(utf8(body)))
)
+      }
+
       // TODO: if there are too many open connections we should just close the connection
       // without waiting for the error to get sent to the client.
       queue.after(die_delay, TimeUnit.MILLISECONDS) {
@@ -375,7 +415,6 @@ class StompProtocolHandler extends Proto
                 on_stomp_send(frame)
               case ACK =>
                 on_stomp_ack(frame)
-
               case BEGIN =>
                 on_stomp_begin(frame.headers)
               case COMMIT =>
@@ -386,6 +425,8 @@ class StompProtocolHandler extends Proto
                 on_stomp_subscribe(frame.headers)
               case UNSUBSCRIBE =>
                 on_stomp_unsubscribe(frame.headers)
+              case NACK =>
+                on_stomp_nack(frame)
 
               case DISCONNECT =>
                 connection.stop
@@ -402,6 +443,7 @@ class StompProtocolHandler extends Proto
     }  catch {
       case e: Break =>
       case e:Exception =>
+        e.printStackTrace
         async_die("Internal Server Error", e);
     }
   }
@@ -427,8 +469,8 @@ class StompProtocolHandler extends Proto
         security_context.certificates = Option(t.getPeerX509Certificates).getOrElse(Array[X509Certificate]())
       case _ => None
     }
-    security_context.user = get(headers, LOGIN).map(_.toString).getOrElse(null)
-    security_context.password = get(headers, PASSCODE).map(_.toString).getOrElse(null)
+    security_context.user = get(headers, LOGIN).map(decode_header _).getOrElse(null)
+    security_context.password = get(headers, PASSCODE).map(decode_header _).getOrElse(null)
 
     val accept_versions = get(headers, ACCEPT_VERSION).getOrElse(V1_0).split(COMMA).map(_.ascii)
     protocol_version = SUPPORTED_PROTOCOL_VERSIONS.find( v=> accept_versions.contains(v)
) match {
@@ -479,7 +521,7 @@ class StompProtocolHandler extends Proto
 
     def send_connected = {
       val outbound_heart_beat_header = ascii("%d,%d".format(outbound_heartbeat,inbound_heartbeat))
-      session_id = ascii(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
+      session_id = encode_header(this.host.config.id + ":"+this.host.session_counter.incrementAndGet)
       if( connection_sink==null ) {
         weird(headers)
       }
@@ -623,7 +665,7 @@ class StompProtocolHandler extends Proto
     // Do we need to add the user id?
     if( host.authenticator!=null && config.add_user_header!=null ) {
       host.authenticator.user_name(security_context).foreach{ name=>
-        rc ::= (ascii(config.add_user_header), ascii(name))
+        rc ::= (encode_header(config.add_user_header), encode_header(name))
       }
     }
 
@@ -651,7 +693,7 @@ class StompProtocolHandler extends Proto
       delivery.uow = uow
 
       if( receipt!=null ) {
-        delivery.ack = { storeTx =>
+        delivery.ack = { (consumed, uow) =>
           dispatchQueue <<| ^{
             connection_sink.offer(StompFrame(RECEIPT, List((RECEIPT_ID, receipt))))
           }
@@ -712,7 +754,8 @@ class StompProtocolHandler extends Proto
       case None=> null
       case Some(x)=> x
         try {
-          (x, SelectorParser.parse(x.utf8.toString))
+          val s = decode_header(x)
+          (s, SelectorParser.parse(s))
         } catch {
           case e:FilterException =>
             die("Invalid selector expression: "+e.getMessage)
@@ -735,8 +778,8 @@ class StompProtocolHandler extends Proto
         rc.name = DestinationParser.encode_path(destination.name)
         // TODO:
         // rc.client_id =
-        rc.subscription_id = if( persistent ) id.toString else null
-        rc.filter = if (selector == null) null else selector._1.toString
+        rc.subscription_id = if( persistent ) decode_header(id) else null
+        rc.filter = if (selector == null) null else selector._1
         rc
       } else {
         val rc = new QueueBindingDTO
@@ -838,7 +881,14 @@ class StompProtocolHandler extends Proto
   }
 
   def on_stomp_ack(frame:StompFrame):Unit = {
-    val headers = frame.headers
+    on_stomp_ack(frame.headers, true)
+  }
+
+  def on_stomp_nack(frame:StompFrame):Unit = {
+    on_stomp_ack(frame.headers, false)
+  }
+
+  def on_stomp_ack(headers:HeaderMap, consumed:Boolean):Unit = {
     val messageId = get(headers, MESSAGE_ID).getOrElse(die("message id header not set"))
 
     val subscription_id = get(headers, SUBSCRIPTION);
@@ -847,7 +897,7 @@ class StompProtocolHandler extends Proto
         if( !(protocol_version eq V1_0) ) {
           die("The subscription header is required")
         }
-        connection_ack_handlers.get(messageId).orElse(die("Not expecting ack for message
id '%s'".format(messageId)))
+        connection_ack_handlers.get(messageId).orElse(die("Not expecting an ACK/NACK for
message id '%s'".format(messageId)))
       case Some(id) =>
         consumers.get(id).map(_.ack_handler).orElse(die("The subscription '%s' does not exist".format(id)))
     }
@@ -855,16 +905,17 @@ class StompProtocolHandler extends Proto
     handler.foreach{ handler=>
       get(headers, TRANSACTION) match {
         case None=>
-          handler.perform_ack(messageId, null)
+          handler.perform_ack(consumed, messageId, null)
         case Some(txid)=>
           get_or_create_tx_queue(txid).add{ uow=>
-            handler.perform_ack(messageId, uow)
+            handler.perform_ack(consumed, messageId, uow)
           }
       }
       send_receipt(headers)
     }
   }
 
+
   override def onTransportFailure(error: IOException) = {
     if( !connection.stopped ) {
       suspendRead("shutdown")



Mime
View raw message