activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1241156 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Date Mon, 06 Feb 2012 20:26:00 GMT
Author: chirino
Date: Mon Feb  6 20:25:59 2012
New Revision: 1241156

URL: http://svn.apache.org/viewvc?rev=1241156&view=rev
Log:
Improve cross protocol conversion support.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=1241156&r1=1241155&r2=1241156&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
Mon Feb  6 20:25:59 2012
@@ -18,12 +18,12 @@ package org.apache.activemq.apollo.broke
 
 import _root_.org.fusesource.hawtdispatch._
 import org.fusesource.hawtbuf._
-import protocol.Protocol
 import org.apache.activemq.apollo.filter.Filterable
 import org.apache.activemq.apollo.broker.store.StoreUOW
 import org.apache.activemq.apollo.util.Log
 import java.util.concurrent.atomic.{AtomicReference, AtomicLong}
 import org.apache.activemq.apollo.dto.DestinationDTO
+import org.apache.activemq.apollo.broker.protocol.{ProtocolFactory, Protocol}
 
 object DeliveryProducer extends Log
 
@@ -116,6 +116,8 @@ trait Message extends Filterable with Re
    */
   def protocol:Protocol
 
+  def encoded:Buffer = protocol.encode(this).buffer
+  
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1241156&r1=1241155&r2=1241156&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Mon Feb  6 20:25:59 2012
@@ -391,7 +391,22 @@ class StompProtocolHandler extends Proto
     val consumer_sink = sink_manager.open()
     val credit_window_filter = new CreditWindowFilter[Delivery](consumer_sink.map { delivery
=>
       ack_handler.track(delivery)
-      var frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+
+      val message = delivery.message
+      var frame = if( message.protocol eq StompProtocol ) {
+        message.asInstanceOf[StompFrameMessage].frame
+      } else {
+        val (body, content_type) =  protocol_convert match{
+          case "body" => (message.getBodyAs(classOf[Buffer]), "protocol/"+message.protocol.id()+";conv=body")
+          case _ => (message.encoded, "protocol/"+message.protocol.id())
+        }
+        message_id_counter += 1
+        var headers =  (MESSAGE_ID -> ascii(session_id.get+message_id_counter)) :: Nil
+        headers ::= (CONTENT_TYPE -> ascii(content_type))
+        headers ::= (CONTENT_LENGTH -> ascii(body.length().toString))
+        StompFrame(MESSAGE, headers, BufferContent(body))
+      }
+
       if( subscription_id != None ) {
         frame = frame.append_headers((SUBSCRIPTION, subscription_id.get)::Nil)
       }
@@ -428,14 +443,12 @@ class StompProtocolHandler extends Proto
 
     def is_persistent = false
 
-    def match_protocol(delivery:Delivery)= delivery.message.protocol eq StompProtocol
     def match_selector(delivery:Delivery)= selector._2.matches(delivery.message)
     def match_from_seq(delivery:Delivery)= delivery.seq >= from_seq
     def match_from_tail(delivery:Delivery)= delivery.seq >= starting_seq
 
     val matchers = {
       var l = ListBuffer[(Delivery)=>Boolean]()
-      l += match_protocol
       if( from_seq > 0 ) {
         l += match_from_seq
       }
@@ -560,6 +573,7 @@ class StompProtocolHandler extends Proto
   var protocol_filters = List[ProtocolFilter]()
 
   var destination_parser = Stomp.destination_parser
+  var protocol_convert = "full"
   var temp_destination_map = HashMap[SimpleAddress, SimpleAddress]()
 
   var codec:StompCodec = _



Mime
View raw message