activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1237540 [1/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire-generator/src/main/scala/org/apache/activemq/apollo/openwire/generator/ apollo-openwire/src/main/scala/org/...
Date Mon, 30 Jan 2012 04:59:17 GMT
Author: chirino
Date: Mon Jan 30 04:59:15 2012
New Revision: 1237540

URL: http://svn.apache.org/viewvc?rev=1237540&view=rev
Log:
Refactored the openwire command classes to replace String with UTF8Buffer to get more efficient serialization/deserialization.

Removed:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseEndpoint.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Endpoint.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/JournalQueueAck.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/JournalTopicAck.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/JournalTrace.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/JournalTransaction.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/state/CommandVisitor.java
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
    activemq/activemq-apollo/trunk/apollo-openwire-generator/src/main/scala/org/apache/activemq/apollo/openwire/generator/ApolloMarshallingGenerator.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BaseDataStreamMarshaller.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBlobMessage.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerControl.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerId.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ControlCommand.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DestinationInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DiscoveryEvent.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/FlushCommand.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/KeepAliveInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/LastPartialCommand.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/DestinationParser.scala Mon Jan 30 04:59:15 2012
@@ -35,6 +35,7 @@ class DestinationParser extends PathPars
   var temp_queue_prefix = "temp-queue:"
   var temp_topic_prefix = "temp-topic:"
   var destination_separator = ","
+  var sanitize_destinations = false
 
   def copy(other:DestinationParser) = {
     super.copy(other)
@@ -59,20 +60,32 @@ class DestinationParser extends PathPars
         }
         dest match {
           case d:QueueDestinationDTO =>
-            if( queue_prefix!=null ) {
-              rc.append(queue_prefix)
+            if ( d.temp() ) {
+              if( temp_queue_prefix!=null ) {
+                rc.append(temp_queue_prefix)
+              }
+            } else {
+              if( queue_prefix!=null ) {
+                rc.append(queue_prefix)
+              }
             }
-            rc.append(encode_path(dest.path.toIterable))
+            rc.append(encode_path_iter(dest.path.toIterable, sanitize_destinations))
           case d:DurableSubscriptionDestinationDTO =>
             if( dsub_prefix!=null ) {
               rc.append(dsub_prefix)
             }
-            rc.append(d.subscription_id)
+            rc.append(unsanitize_destination_part(d.subscription_id))
           case d:TopicDestinationDTO =>
-            if( topic_prefix!=null ) {
-              rc.append(topic_prefix)
+            if ( d.temp() ) {
+              if( temp_topic_prefix!=null ) {
+                rc.append(temp_topic_prefix)
+              }
+            } else {
+              if( topic_prefix!=null ) {
+                rc.append(topic_prefix)
+              }
             }
-            rc.append(encode_path(dest.path.toIterable))
+            rc.append(encode_path_iter(dest.path.toIterable, sanitize_destinations))
           case _ =>
             throw new Exception("Uknown destination type: "+dest.getClass);
         }
@@ -125,19 +138,19 @@ class DestinationParser extends PathPars
   def decode_single_destination(value: String, unqualified: (String) => DestinationDTO): DestinationDTO = {
     if (queue_prefix != null && value.startsWith(queue_prefix)) {
       var name = value.substring(queue_prefix.length)
-      return new QueueDestinationDTO(parts(name))
+      return new QueueDestinationDTO(parts(name, sanitize_destinations))
     } else if (topic_prefix != null && value.startsWith(topic_prefix)) {
       var name = value.substring(topic_prefix.length)
-      return new TopicDestinationDTO(parts(name))
+      return new TopicDestinationDTO(parts(name, sanitize_destinations))
     } else if (dsub_prefix != null && value.startsWith(dsub_prefix)) {
       var name = sanitize_destination_part(value.substring(dsub_prefix.length))
       return new DurableSubscriptionDestinationDTO(name).direct();
     } else if (temp_topic_prefix != null && value.startsWith(temp_topic_prefix)) {
       var name = value.substring(temp_topic_prefix.length)
-      return new TopicDestinationDTO(parts(name)).temp(true)
+      return new TopicDestinationDTO(parts(name, sanitize_destinations)).temp(true)
     } else if (temp_queue_prefix != null && value.startsWith(temp_queue_prefix)) {
       var name = value.substring(temp_queue_prefix.length)
-      return new QueueDestinationDTO(parts(name)).temp(true)
+      return new QueueDestinationDTO(parts(name, sanitize_destinations)).temp(true)
     } else if (unqualified != null) {
       return unqualified(value)
     } else {

Modified: activemq/activemq-apollo/trunk/apollo-openwire-generator/src/main/scala/org/apache/activemq/apollo/openwire/generator/ApolloMarshallingGenerator.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire-generator/src/main/scala/org/apache/activemq/apollo/openwire/generator/ApolloMarshallingGenerator.scala?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire-generator/src/main/scala/org/apache/activemq/apollo/openwire/generator/ApolloMarshallingGenerator.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire-generator/src/main/scala/org/apache/activemq/apollo/openwire/generator/ApolloMarshallingGenerator.scala Mon Jan 30 04:59:15 2012
@@ -390,6 +390,8 @@ class ApolloMarshallingGenerator extends
       out.println("        info." + setter + "(dataIn.readInt());")
     } else if (property_type == "long") {
       out.println("        info." + setter + "(tightUnmarshalLong(wireFormat, dataIn, bs));")
+    } else if (property_type == "UTF8Buffer") {
+      out.println("        info." + setter + "(tightUnmarshalString(dataIn, bs));")
     } else if (property_type == "String") {
       out.println("        info." + setter + "(tightUnmarshalString(dataIn, bs));")
     } else if (property_type == "byte[]") {
@@ -457,6 +459,8 @@ class ApolloMarshallingGenerator extends
         baseSize += 4
       } else if (property_type == "long") {
         out.println("        rc += tightMarshalLong1(wireFormat, " + getter + ", bs);")
+      } else if (property_type == "UTF8Buffer") {
+        out.println("        rc += tightMarshalString1(" + getter + ", bs);")
       } else if (property_type == "String") {
         out.println("        rc += tightMarshalString1(" + getter + ", bs);")
       } else if (property_type == "byte[]") {
@@ -505,6 +509,8 @@ class ApolloMarshallingGenerator extends
         out.println("        dataOut.writeInt(" + getter + ");")
       } else if (property_type == "long") {
         out.println("        tightMarshalLong2(wireFormat, " + getter + ", dataOut, bs);")
+      } else if (property_type == "UTF8Buffer") {
+        out.println("        tightMarshalString2(" + getter + ", dataOut, bs);")
       } else if (property_type == "String") {
         out.println("        tightMarshalString2(" + getter + ", dataOut, bs);")
       } else if (property_type == "byte[]") {
@@ -552,6 +558,8 @@ class ApolloMarshallingGenerator extends
         out.println("        dataOut.writeInt(" + getter + ");")
       } else if (property_type == "long") {
         out.println("        looseMarshalLong(wireFormat, " + getter + ", dataOut);")
+      } else if (property_type == "UTF8Buffer") {
+        out.println("        looseMarshalString(" + getter + ", dataOut);")
       } else if (property_type == "String") {
         out.println("        looseMarshalString(" + getter + ", dataOut);")
       } else if (property_type == "byte[]") {
@@ -609,6 +617,8 @@ class ApolloMarshallingGenerator extends
       out.println("        info." + setter + "(dataIn.readInt());")
     } else if (property_type == "long") {
       out.println("        info." + setter + "(looseUnmarshalLong(wireFormat, dataIn));")
+    } else if (property_type == "UTF8Buffer") {
+      out.println("        info." + setter + "(looseUnmarshalString(dataIn));")
     } else if (property_type == "String") {
       out.println("        info." + setter + "(looseUnmarshalString(dataIn));")
     } else if (property_type == "byte[]") {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Mon Jan 30 04:59:15 2012
@@ -27,6 +27,7 @@ import org.apache.activemq.apollo.util._
 import java.util.Map.Entry
 import org.apache.activemq.apollo.broker._
 import org.fusesource.hawtdispatch._
+import org.fusesource.hawtbuf.UTF8Buffer
 
 /**
  * <p>
@@ -58,7 +59,7 @@ class DestinationAdvisoryRouterListener(
   final val advisoryProducerId = new ProducerId
   final val messageIdGenerator = new LongSequenceGenerator
 
-  advisoryProducerId.setConnectionId(ID_GENERATOR.generateId)
+  advisoryProducerId.setConnectionId(new UTF8Buffer(ID_GENERATOR.generateId))
 
 
   class ProducerRoute extends DeliveryProducerRoute(router) {
@@ -160,7 +161,7 @@ class DestinationAdvisoryRouterListener(
     //set the data structure
     message.setDataStructure(command);
     message.setPersistent(false);
-    message.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
+    message.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE_BUFFER);
     message.setMessageId(new MessageId(advisoryProducerId, messageIdGenerator.getNextSequenceId()));
 //    message.setTargetConsumerId(targetConsumerId);
     message.setDestination(topic);

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationConverter.scala Mon Jan 30 04:59:15 2012
@@ -19,6 +19,7 @@ package org.apache.activemq.apollo.openw
 import org.apache.activemq.apollo.dto.{TopicDestinationDTO, QueueDestinationDTO, DestinationDTO}
 import org.apache.activemq.apollo.broker.DestinationParser
 import org.apache.activemq.apollo.openwire.command._
+import java.lang.String
 
 /**
  * <p>
@@ -29,74 +30,71 @@ import org.apache.activemq.apollo.openwi
 object DestinationConverter {
 
   val OPENWIRE_PARSER = new DestinationParser();
+  OPENWIRE_PARSER.queue_prefix = ActiveMQDestination.QUEUE_QUALIFIED_PREFIX
+  OPENWIRE_PARSER.topic_prefix = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX
+  OPENWIRE_PARSER.temp_queue_prefix = ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX
+  OPENWIRE_PARSER.temp_topic_prefix = ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX
+  OPENWIRE_PARSER.dsub_prefix = null
   OPENWIRE_PARSER.path_separator = "."
   OPENWIRE_PARSER.any_child_wildcard = "*"
   OPENWIRE_PARSER.any_descendant_wildcard = ">"
+  OPENWIRE_PARSER.sanitize_destinations = true
 
-  //  = Pattern.compile("[ a-zA-Z0-9\\_\\-\\%\\~]")
-  
   def to_destination_dto(dest: ActiveMQDestination, handler:OpenwireProtocolHandler): Array[DestinationDTO] = {
-
-    if( !dest.isComposite ) {
-      import ActiveMQDestination._
-      var name = dest.getPhysicalName
-      Array(dest.getDestinationType match {
-        case QUEUE_TYPE =>
-          var path_parts = OPENWIRE_PARSER.parts(name).map(OPENWIRE_PARSER.sanitize_destination_part(_, true))
-          new QueueDestinationDTO(path_parts)
-        case TOPIC_TYPE =>
-          var path_parts = OPENWIRE_PARSER.parts(name).map(OPENWIRE_PARSER.sanitize_destination_part(_, true))
-          new TopicDestinationDTO(path_parts)
-        case TEMP_QUEUE_TYPE =>
-          val (connectionid, rest)= name.splitAt(name.lastIndexOf(':'))
-          val real_path = ("temp" :: handler.broker.id :: OPENWIRE_PARSER.sanitize_destination_part(connectionid) :: OPENWIRE_PARSER.sanitize_destination_part(rest.substring(1)) :: Nil).toArray
-          new QueueDestinationDTO( real_path ).temp(true)
-        case TEMP_TOPIC_TYPE =>
-          val (connectionid, rest)= name.splitAt(name.lastIndexOf(':'))
-          val real_path = ("temp" :: handler.broker.id :: OPENWIRE_PARSER.sanitize_destination_part(connectionid) :: OPENWIRE_PARSER.sanitize_destination_part(rest.substring(1)) :: Nil).toArray
-          new TopicDestinationDTO( real_path ).temp(true)
-      })
-    } else {
-      dest.getCompositeDestinations.map { c =>
-        to_destination_dto(c, handler)(0)
+    def fallback(value:String) = {
+      OPENWIRE_PARSER.decode_single_destination(dest.getQualifiedPrefix+value, null)
+    }
+    val rc = OPENWIRE_PARSER.decode_multi_destination(dest.getPhysicalName.toString, fallback)
+    rc.foreach { dest =>
+      if( dest.temp() ) {
+        import collection.JavaConversions._
+        // Put it back together...
+        val name = dest.path.map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(OPENWIRE_PARSER.path_separator)
+
+        val (connectionid, rest) = name.splitAt(name.lastIndexOf(':'))
+        val real_path = ("temp" :: handler.broker.id :: OPENWIRE_PARSER.sanitize_destination_part(connectionid) :: OPENWIRE_PARSER.sanitize_destination_part(rest.substring(1)) :: Nil).toArray
+        dest.path = java.util.Arrays.asList(real_path:_*)
       }
     }
+    rc
   }
 
-  def to_activemq_destination(dest:Array[DestinationDTO]):ActiveMQDestination = {
+  def to_activemq_destination(dests:Array[DestinationDTO]):ActiveMQDestination = {
     import collection.JavaConversions._
-
-    val rc = dest.flatMap { dest =>
-
+    var wrapper: (String)=> ActiveMQDestination = null
+    
+    val rc = OPENWIRE_PARSER.encode_destination(dests.flatMap{ dest=>
       val temp = dest.path.headOption == Some("temp")
       dest match {
         case dest:QueueDestinationDTO =>
           if( temp ) {
-            Some(new ActiveMQTempQueue(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":")))
+            if(wrapper==null) 
+              wrapper = (x)=>new ActiveMQTempQueue(x)
+            var path: Array[String] = Array(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":"))
+            Some(new QueueDestinationDTO(path).temp(true))
           } else {
-            val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList.map(OPENWIRE_PARSER.unsanitize_destination_part(_)))
-            Some(new ActiveMQQueue(name))
+            if(wrapper==null) 
+              wrapper = (x)=>new ActiveMQQueue(x)
+            Some(dest)
           }
         case dest:TopicDestinationDTO =>
           if( temp ) {
-            Some(new ActiveMQTempTopic(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":")))
+            if(wrapper==null) 
+              wrapper = (x)=>new ActiveMQTempTopic(x)
+            var path: Array[String] = Array(dest.path.toList.drop(2).map(OPENWIRE_PARSER.unsanitize_destination_part(_)).mkString(":"))
+            Some(new TopicDestinationDTO(path).temp(true))
           } else {
-            val name = OPENWIRE_PARSER.encode_path(asScalaBuffer(dest.path).toList.map(OPENWIRE_PARSER.unsanitize_destination_part(_)))
-            Some(new ActiveMQTopic(name))
+            if(wrapper==null) 
+              wrapper = (x)=>new ActiveMQTopic(x)
+            Some(dest)
           }
         case _ => None 
       }
-    }
+    })
 
-    if( rc.length == 0) {
+    if ( wrapper==null )
       null
-    } else if( rc.length == 1) {
-      rc(0)
-    } else {
-      val c = new ActiveMQQueue()
-      c.setCompositeDestinations(rc)
-      c
-    }
-
+    else
+      wrapper(rc)
   }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Mon Jan 30 04:59:15 2012
@@ -40,6 +40,7 @@ import org.apache.activemq.apollo.broker
 import protocol._
 import security.SecurityContext
 import DestinationConverter._
+import Buffer._
 
 object OpenwireProtocolHandler extends Log {
   def unit:Unit = {}
@@ -412,9 +413,9 @@ class OpenwireProtocolHandler extends Pr
 
     // Give the client some info about this broker.
     val brokerInfo = new BrokerInfo();
-    brokerInfo.setBrokerId(new BrokerId(host.config.id));
-    brokerInfo.setBrokerName(host.config.id);
-    brokerInfo.setBrokerURL(host.broker.get_connect_address);
+    brokerInfo.setBrokerId(new BrokerId(utf8(host.config.id)));
+    brokerInfo.setBrokerName(utf8(host.config.id));
+    brokerInfo.setBrokerURL(utf8(host.broker.get_connect_address));
     connection_session.offer(brokerInfo);
   }
 
@@ -427,8 +428,8 @@ class OpenwireProtocolHandler extends Pr
     if (connection_context==null) {
       new ConnectionContext(info).attach
 
-      security_context.user = info.getUserName
-      security_context.password = info.getPassword
+      security_context.user = Option(info.getUserName).map(_.toString).getOrElse(null)
+      security_context.password = Option(info.getPassword).map(_.toString).getOrElse(null)
       security_context.session_id = Some(OPENWIRE_PARSER.sanitize_destination_part(info.getConnectionId.toString))
 
       reset {
@@ -847,7 +848,7 @@ class OpenwireProtocolHandler extends Pr
         case null=> null
         case x=>
           try {
-            SelectorParser.parse(x)
+            SelectorParser.parse(x.toString)
           } catch {
             case e:FilterException =>
               fail("Invalid selector expression: "+e.getMessage)
@@ -863,7 +864,7 @@ class OpenwireProtocolHandler extends Pr
         subscription_id += info.getSubscriptionName
 
         val rc = new DurableSubscriptionDestinationDTO(subscription_id)
-        rc.selector = info.getSelector
+        rc.selector = Option(info.getSelector).map(_.toString).getOrElse(null)
 
         destination.foreach { _ match {
           case x:TopicDestinationDTO=>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BaseDataStreamMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BaseDataStreamMarshaller.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BaseDataStreamMarshaller.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/codec/BaseDataStreamMarshaller.java Mon Jan 30 04:59:15 2012
@@ -20,6 +20,7 @@ import org.apache.activemq.apollo.openwi
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.DataByteArrayInputStream;
 import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -31,8 +32,8 @@ public abstract class BaseDataStreamMars
     static {
         Constructor constructor = null;
         try {
-            constructor = StackTraceElement.class.getConstructor(new Class[] {String.class, String.class,
-                                                                              String.class, int.class});
+            constructor = StackTraceElement.class.getConstructor(new Class[] {UTF8Buffer.class, UTF8Buffer.class,
+                                                                              UTF8Buffer.class, int.class});
         } catch (Throwable e) {
         }
         STACK_TRACE_ELEMENT_CONSTRUCTOR = constructor;
@@ -185,8 +186,8 @@ public abstract class BaseDataStreamMars
     protected Throwable tightUnmarsalThrowable(OpenWireFormat wireFormat, DataByteArrayInputStream dataIn, BooleanStream bs)
         throws IOException {
         if (bs.readBoolean()) {
-            String clazz = tightUnmarshalString(dataIn, bs);
-            String message = tightUnmarshalString(dataIn, bs);
+            UTF8Buffer clazz = tightUnmarshalString(dataIn, bs);
+            UTF8Buffer message = tightUnmarshalString(dataIn, bs);
             Throwable o = createThrowable(clazz, message);
             if (wireFormat.isStackTraceEnabled()) {
                 if (STACK_TRACE_ELEMENT_CONSTRUCTOR != null) {
@@ -222,11 +223,11 @@ public abstract class BaseDataStreamMars
         }
     }
 
-    private Throwable createThrowable(String className, String message) {
+    private Throwable createThrowable(UTF8Buffer className, UTF8Buffer message) {
         try {
-            Class clazz = Class.forName(className, false, BaseDataStreamMarshaller.class.getClassLoader());
-            Constructor constructor = clazz.getConstructor(new Class[] {String.class});
-            return (Throwable)constructor.newInstance(new Object[] {message});
+            Class clazz = Class.forName(className.toString(), false, BaseDataStreamMarshaller.class.getClassLoader());
+            Constructor constructor = clazz.getConstructor(new Class[] {UTF8Buffer.class});
+            return (Throwable)constructor.newInstance(new Object[] {message.toString()});
         } catch (Throwable e) {
             return new Throwable(className + ": " + message);
         }
@@ -240,16 +241,16 @@ public abstract class BaseDataStreamMars
         } else {
             int rc = 0;
             bs.writeBoolean(true);
-            rc += tightMarshalString1(o.getClass().getName(), bs);
-            rc += tightMarshalString1(o.getMessage(), bs);
+            rc += tightMarshalString1(new UTF8Buffer(o.getClass().getName()), bs);
+            rc += tightMarshalString1(new UTF8Buffer(o.getMessage()), bs);
             if (wireFormat.isStackTraceEnabled()) {
                 rc += 2;
                 StackTraceElement[] stackTrace = o.getStackTrace();
                 for (int i = 0; i < stackTrace.length; i++) {
                     StackTraceElement element = stackTrace[i];
-                    rc += tightMarshalString1(element.getClassName(), bs);
-                    rc += tightMarshalString1(element.getMethodName(), bs);
-                    rc += tightMarshalString1(element.getFileName(), bs);
+                    rc += tightMarshalString1(new UTF8Buffer(element.getClassName()), bs);
+                    rc += tightMarshalString1(new UTF8Buffer(element.getMethodName()), bs);
+                    rc += tightMarshalString1(new UTF8Buffer(element.getFileName()), bs);
                     rc += 4;
                 }
                 rc += tightMarshalThrowable1(wireFormat, o.getCause(), bs);
@@ -261,16 +262,16 @@ public abstract class BaseDataStreamMars
     protected void tightMarshalThrowable2(OpenWireFormat wireFormat, Throwable o, DataByteArrayOutputStream dataOut,
                                           BooleanStream bs) throws IOException {
         if (bs.readBoolean()) {
-            tightMarshalString2(o.getClass().getName(), dataOut, bs);
-            tightMarshalString2(o.getMessage(), dataOut, bs);
+            tightMarshalString2(new UTF8Buffer(o.getClass().getName()), dataOut, bs);
+            tightMarshalString2(new UTF8Buffer(o.getMessage()), dataOut, bs);
             if (wireFormat.isStackTraceEnabled()) {
                 StackTraceElement[] stackTrace = o.getStackTrace();
                 dataOut.writeShort(stackTrace.length);
                 for (int i = 0; i < stackTrace.length; i++) {
                     StackTraceElement element = stackTrace[i];
-                    tightMarshalString2(element.getClassName(), dataOut, bs);
-                    tightMarshalString2(element.getMethodName(), dataOut, bs);
-                    tightMarshalString2(element.getFileName(), dataOut, bs);
+                    tightMarshalString2(new UTF8Buffer(element.getClassName()), dataOut, bs);
+                    tightMarshalString2(new UTF8Buffer(element.getMethodName()), dataOut, bs);
+                    tightMarshalString2(new UTF8Buffer(element.getFileName()), dataOut, bs);
                     dataOut.writeInt(element.getLineNumber());
                 }
                 tightMarshalThrowable2(wireFormat, o.getCause(), dataOut, bs);
@@ -279,68 +280,44 @@ public abstract class BaseDataStreamMars
     }
 
     @SuppressWarnings("deprecation")
-    protected String tightUnmarshalString(DataByteArrayInputStream dataIn, BooleanStream bs) throws IOException {
+    protected UTF8Buffer tightUnmarshalString(DataByteArrayInputStream dataIn, BooleanStream bs) throws IOException {
         if (bs.readBoolean()) {
-            if (bs.readBoolean()) {
-                int size = dataIn.readShort();
-                byte data[] = new byte[size];
-                dataIn.readFully(data);
-                // Yes deprecated, but we know what we are doing.
-                // This allows us to create a String from a ASCII byte array. (no UTF-8 decoding)
-                return new String(data, 0);
-            } else {
-                return dataIn.readUTF();
-            }
+            boolean ascii = bs.readBoolean(); // ignored for now.
+            int size = dataIn.readShort();
+            return dataIn.readBuffer(size).utf8();
         } else {
             return null;
         }
     }
 
-    protected int tightMarshalString1(String value, BooleanStream bs) throws IOException {
+    protected int tightMarshalString1(UTF8Buffer value, BooleanStream bs) throws IOException {
         bs.writeBoolean(value != null);
         if (value != null) {
 
-            int strlen = value.length();
-            int utflen = 0;
-            char[] charr = new char[strlen];
-            int c = 0;
-            boolean isOnlyAscii = true;
-
-            value.getChars(0, strlen, charr, 0);
-
-            for (int i = 0; i < strlen; i++) {
-                c = charr[i];
-                if ((c >= 0x0001) && (c <= 0x007F)) {
-                    utflen++;
-                } else if (c > 0x07FF) {
-                    utflen += 3;
-                    isOnlyAscii = false;
-                } else {
-                    isOnlyAscii = false;
-                    utflen += 2;
-                }
-            }
+            boolean ascii = false;
+//  we could check to see if its' really ascii.. for now punt.
+//            boolean ascii = true;
+//            int last = value.offset+value.length;
+//            for (int i = value.offset; i < last; i++) {
+//                if( (value.data[i] & 0x80) !=0 ) {
+//                    ascii = false;
+//                }
+//            }
 
-            if (utflen >= Short.MAX_VALUE) {
-                throw new IOException("Encountered a String value that is too long to encode.");
-            }
-            bs.writeBoolean(isOnlyAscii);
-            return utflen + 2;
+            bs.writeBoolean(ascii);
+            return value.length() + 2;
 
         } else {
             return 0;
         }
     }
 
-    protected void tightMarshalString2(String value, DataByteArrayOutputStream dataOut, BooleanStream bs) throws IOException {
+    protected void tightMarshalString2(UTF8Buffer value, DataByteArrayOutputStream dataOut, BooleanStream bs) throws IOException {
         if (bs.readBoolean()) {
             // If we verified it only holds ascii values
-            if (bs.readBoolean()) {
-                dataOut.writeShort(value.length());
-                dataOut.writeBytes(value);
-            } else {
-                dataOut.writeUTF(value);
-            }
+            bs.readBoolean();
+            dataOut.writeShort(value.length);
+            dataOut.write(value);
         }
     }
 
@@ -506,8 +483,8 @@ public abstract class BaseDataStreamMars
     protected Throwable looseUnmarsalThrowable(OpenWireFormat wireFormat, DataByteArrayInputStream dataIn)
         throws IOException {
         if (dataIn.readBoolean()) {
-            String clazz = looseUnmarshalString(dataIn);
-            String message = looseUnmarshalString(dataIn);
+            UTF8Buffer clazz = looseUnmarshalString(dataIn);
+            UTF8Buffer message = looseUnmarshalString(dataIn);
             Throwable o = createThrowable(clazz, message);
             if (wireFormat.isStackTraceEnabled()) {
                 if (STACK_TRACE_ELEMENT_CONSTRUCTOR != null) {
@@ -547,16 +524,16 @@ public abstract class BaseDataStreamMars
         throws IOException {
         dataOut.writeBoolean(o != null);
         if (o != null) {
-            looseMarshalString(o.getClass().getName(), dataOut);
-            looseMarshalString(o.getMessage(), dataOut);
+            looseMarshalString(new UTF8Buffer(o.getClass().getName()), dataOut);
+            looseMarshalString(new UTF8Buffer(o.getMessage()), dataOut);
             if (wireFormat.isStackTraceEnabled()) {
                 StackTraceElement[] stackTrace = o.getStackTrace();
                 dataOut.writeShort(stackTrace.length);
                 for (int i = 0; i < stackTrace.length; i++) {
                     StackTraceElement element = stackTrace[i];
-                    looseMarshalString(element.getClassName(), dataOut);
-                    looseMarshalString(element.getMethodName(), dataOut);
-                    looseMarshalString(element.getFileName(), dataOut);
+                    looseMarshalString(new UTF8Buffer(element.getClassName()), dataOut);
+                    looseMarshalString(new UTF8Buffer(element.getMethodName()), dataOut);
+                    looseMarshalString(new UTF8Buffer(element.getFileName()), dataOut);
                     dataOut.writeInt(element.getLineNumber());
                 }
                 looseMarshalThrowable(wireFormat, o.getCause(), dataOut);
@@ -564,18 +541,20 @@ public abstract class BaseDataStreamMars
         }
     }
 
-    protected String looseUnmarshalString(DataByteArrayInputStream dataIn) throws IOException {
+    protected UTF8Buffer looseUnmarshalString(DataByteArrayInputStream dataIn) throws IOException {
         if (dataIn.readBoolean()) {
-            return dataIn.readUTF();
+            int size = dataIn.readShort();
+            return dataIn.readBuffer(size).utf8();
         } else {
             return null;
         }
     }
 
-    protected void looseMarshalString(String value, DataByteArrayOutputStream dataOut) throws IOException {
+    protected void looseMarshalString(UTF8Buffer value, DataByteArrayOutputStream dataOut) throws IOException {
         dataOut.writeBoolean(value != null);
         if (value != null) {
-            dataOut.writeUTF(value);
+            dataOut.writeShort(value.length);
+            dataOut.write(value);
         }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBlobMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBlobMessage.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBlobMessage.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQBlobMessage.java Mon Jan 30 04:59:15 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.apollo.openw
 
 import org.apache.activemq.apollo.openwire.support.OpenwireException;
 import org.apache.activemq.apollo.openwire.support.blob.BlobUploader;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,11 +32,11 @@ import java.net.URL;
 public class ActiveMQBlobMessage extends ActiveMQMessage {
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_BLOB_MESSAGE;
 
-    public static final String BINARY_MIME_TYPE = "application/octet-stream";
+    public static final UTF8Buffer BINARY_MIME_TYPE = new UTF8Buffer("application/octet-stream");
 
-    private String remoteBlobUrl;
-    private String mimeType;
-    private String name;
+    private UTF8Buffer remoteBlobUrl;
+    private UTF8Buffer mimeType;
+    private UTF8Buffer name;
     private boolean deletedByBroker;
 
     private transient BlobUploader blobUploader;
@@ -63,11 +64,11 @@ public class ActiveMQBlobMessage extends
     /**
      * @openwire:property version=3 cache=false
      */
-    public String getRemoteBlobUrl() {
+    public UTF8Buffer getRemoteBlobUrl() {
         return remoteBlobUrl;
     }
 
-    public void setRemoteBlobUrl(String remoteBlobUrl) {
+    public void setRemoteBlobUrl(UTF8Buffer remoteBlobUrl) {
         this.remoteBlobUrl = remoteBlobUrl;
         url = null;
     }
@@ -78,18 +79,18 @@ public class ActiveMQBlobMessage extends
      * 
      * @openwire:property version=3 cache=true
      */
-    public String getMimeType() {
+    public UTF8Buffer getMimeType() {
         if (mimeType == null) {
             return BINARY_MIME_TYPE;
         }
         return mimeType;
     }
 
-    public void setMimeType(String mimeType) {
+    public void setMimeType(UTF8Buffer mimeType) {
         this.mimeType = mimeType;
     }
 
-    public String getName() {
+    public UTF8Buffer getName() {
         return name;
     }
 
@@ -99,7 +100,7 @@ public class ActiveMQBlobMessage extends
      * 
      * @openwire:property version=3 cache=false
      */
-    public void setName(String name) {
+    public void setName(UTF8Buffer name) {
         this.name = name;
     }
 
@@ -114,7 +115,7 @@ public class ActiveMQBlobMessage extends
         this.deletedByBroker = deletedByBroker;
     }
 
-    public String getJMSXMimeType() {
+    public UTF8Buffer getJMSXMimeType() {
         return getMimeType();
     }
 
@@ -129,7 +130,7 @@ public class ActiveMQBlobMessage extends
     public URL getURL() throws OpenwireException {
         if (url == null && remoteBlobUrl != null) {
             try {
-                url = new URL(remoteBlobUrl);
+                url = new URL(remoteBlobUrl.toString());
             } catch (MalformedURLException e) {
                 throw new OpenwireException(e);
             }
@@ -139,7 +140,7 @@ public class ActiveMQBlobMessage extends
 
     public void setURL(URL url) {
         this.url = url;
-        remoteBlobUrl = url != null ? url.toExternalForm() : null;
+        remoteBlobUrl = url != null ? new UTF8Buffer(url.toExternalForm()) : null;
     }
 
     public org.apache.activemq.apollo.openwire.support.blob.BlobUploader getBlobUploader() {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQDestination.java Mon Jan 30 04:59:15 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 import java.util.*;
 
 /**
@@ -42,23 +44,15 @@ abstract public class ActiveMQDestinatio
 
   private static final long serialVersionUID = -3885260014960795889L;
 
-  protected String physicalName;
-
-  protected transient ActiveMQDestination[] compositeDestinations;
-  protected transient String[] destinationPaths;
+  protected UTF8Buffer physicalName;
 
   public ActiveMQDestination() {
   }
 
   protected ActiveMQDestination(String name) {
-      setPhysicalName(name);
+      setPhysicalName(new UTF8Buffer(name));
   }
 
-  public ActiveMQDestination(ActiveMQDestination composites[]) {
-      setCompositeDestinations(composites);
-  }
-
-
   // static helper methods for working with destinations
   // -------------------------------------------------------------------------
   public static ActiveMQDestination createDestination(String name, byte defaultType) {
@@ -114,114 +108,18 @@ abstract public class ActiveMQDestinatio
           return getClass().getName().compareTo(that.getClass().getName());
       }
   }
-
-  public boolean isComposite() {
-      return compositeDestinations != null;
-  }
-
-  public ActiveMQDestination[] getCompositeDestinations() {
-      return compositeDestinations;
-  }
-
-  public void setCompositeDestinations(ActiveMQDestination[] destinations) {
-      this.compositeDestinations = destinations;
-      this.destinationPaths = null;
-
-      StringBuffer sb = new StringBuffer();
-      for (int i = 0; i < destinations.length; i++) {
-          if (i != 0) {
-              sb.append(COMPOSITE_SEPERATOR);
-          }
-          if (getDestinationType() == destinations[i].getDestinationType()) {
-              sb.append(destinations[i].getPhysicalName());
-          } else {
-              sb.append(destinations[i].getQualifiedName());
-          }
-      }
-      physicalName = sb.toString();
-  }
-
-  public String getQualifiedName() {
-      if (isComposite()) {
-          return physicalName;
-      }
-      return getQualifiedPrefix() + physicalName;
-  }
-
-  protected abstract String getQualifiedPrefix();
+    
+  public abstract String getQualifiedPrefix();
 
   /**
    * @openwire:property version=1
    */
-  public String getPhysicalName() {
+  public UTF8Buffer getPhysicalName() {
       return physicalName;
   }
 
-  public void setPhysicalName(String physicalName) {
-      final int len = physicalName.length();
-      // options offset
-      int p = -1;
-      boolean composite = false;
-      for (int i = 0; i < len; i++) {
-          char c = physicalName.charAt(i);
-          if (c == '?') {
-              p = i;
-              break;
-          }
-          if (c == COMPOSITE_SEPERATOR) {
-              // won't be wild card
-              composite = true;
-          }
-      }
-      // Strip off any options
-      if (p >= 0) {
-          String optstring = physicalName.substring(p + 1);
-          physicalName = physicalName.substring(0, p);
-      }
+  public void setPhysicalName(UTF8Buffer physicalName) {
       this.physicalName = physicalName;
-      this.destinationPaths = null;
-      if (composite) {
-          // Check to see if it is a composite.
-          Set<String> l = new HashSet<String>();
-          StringTokenizer iter = new StringTokenizer(physicalName, "" + COMPOSITE_SEPERATOR);
-          while (iter.hasMoreTokens()) {
-              String name = iter.nextToken().trim();
-              if (name.length() == 0) {
-                  continue;
-              }
-              l.add(name);
-          }
-          compositeDestinations = new ActiveMQDestination[l.size()];
-          int counter = 0;
-          for (String dest : l) {
-              compositeDestinations[counter++] = createDestination(dest);
-          }
-      }
-  }
-
-  public ActiveMQDestination createDestination(String name) {
-      return createDestination(name, getDestinationType());
-  }
-
-  public String[] getDestinationPaths() {
-
-      if (destinationPaths != null) {
-          return destinationPaths;
-      }
-
-      List<String> l = new ArrayList<String>();
-      StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
-      while (iter.hasMoreTokens()) {
-          String name = iter.nextToken().trim();
-          if (name.length() == 0) {
-              continue;
-          }
-          l.add(name);
-      }
-
-      destinationPaths = new String[l.size()];
-      l.toArray(destinationPaths);
-      return destinationPaths;
   }
 
   public abstract byte getDestinationType();
@@ -254,7 +152,7 @@ abstract public class ActiveMQDestinatio
   }
 
   public String toString() {
-      return getQualifiedName();
+      return physicalName.toString();
   }
 
   public String getDestinationTypeAsString() {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQMessage.java Mon Jan 30 04:59:15 2012
@@ -20,7 +20,7 @@ import org.apache.activemq.apollo.util.C
 import org.apache.activemq.apollo.openwire.support.OpenwireException;
 import org.apache.activemq.apollo.openwire.support.Settings;
 import org.apache.activemq.apollo.openwire.support.TypeConversionSupport;
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+import static org.fusesource.hawtbuf.Buffer.*;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -35,13 +35,10 @@ public class ActiveMQMessage extends Mes
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_MESSAGE;
     private static final Map<String, PropertySetter> JMS_PROPERTY_SETERS = new HashMap<String, PropertySetter>();
 
-    protected transient Callback acknowledgeCallback;
-
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
-
     public Message copy() {
         ActiveMQMessage copy = new ActiveMQMessage();
         copy(copy);
@@ -50,7 +47,6 @@ public class ActiveMQMessage extends Mes
 
     protected void copy(ActiveMQMessage copy) {
         super.copy(copy);
-        copy.acknowledgeCallback = acknowledgeCallback;
     }
 
     public int hashCode() {
@@ -76,180 +72,11 @@ public class ActiveMQMessage extends Mes
         return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
     }
 
-    public void acknowledge() throws OpenwireException {
-        if (acknowledgeCallback != null) {
-            try {
-                acknowledgeCallback.execute();
-            } catch (OpenwireException e) {
-                throw e;
-            } catch (Throwable e) {
-                throw new OpenwireException(e);
-            }
-        }
-    }
-
     public void clearBody() throws OpenwireException {
         setContent(null);
         readOnlyBody = false;
     }
 
-    public String getJMSMessageID() {
-        MessageId messageId = this.getMessageId();
-        if (messageId == null) {
-            return null;
-        }
-        return messageId.toString();
-    }
-
-    /**
-     * Seems to be invalid because the parameter doesn't initialize MessageId
-     * instance variables ProducerId and ProducerSequenceId
-     *
-     * @param value
-     * @throws OpenwireException
-     */
-    public void setJMSMessageID(String value) throws OpenwireException {
-        if (value != null) {
-            try {
-                MessageId id = new MessageId(value);
-                this.setMessageId(id);
-            } catch (NumberFormatException e) {
-                // we must be some foreign JMS provider or strange user-supplied
-                // String
-                // so lets set the IDs to be 1
-                MessageId id = new MessageId();
-                id.setTextView(value);
-                this.setMessageId(messageId);
-            }
-        } else {
-            this.setMessageId(null);
-        }
-    }
-
-    /**
-     * This will create an object of MessageId. For it to be valid, the instance
-     * variable ProducerId and producerSequenceId must be initialized.
-     *
-     * @param producerId
-     * @param producerSequenceId
-     * @throws OpenwireException
-     */
-    public void setJMSMessageID(ProducerId producerId, long producerSequenceId) throws OpenwireException {
-        MessageId id = null;
-        try {
-            id = new MessageId(producerId, producerSequenceId);
-            this.setMessageId(id);
-        } catch (Throwable e) {
-            throw new OpenwireException("Invalid message id '" + id + "', reason: " + e.getMessage(), e);
-        }
-    }
-
-    public long getJMSTimestamp() {
-        return this.getTimestamp();
-    }
-
-    public void setJMSTimestamp(long timestamp) {
-        this.setTimestamp(timestamp);
-    }
-
-    public String getJMSCorrelationID() {
-        return this.getCorrelationId();
-    }
-
-    public void setJMSCorrelationID(String correlationId) {
-        this.setCorrelationId(correlationId);
-    }
-
-    public byte[] getJMSCorrelationIDAsBytes() throws OpenwireException {
-        return encodeString(this.getCorrelationId());
-    }
-
-    public void setJMSCorrelationIDAsBytes(byte[] correlationId) throws OpenwireException {
-        this.setCorrelationId(decodeString(correlationId));
-    }
-
-    public String getJMSXMimeType() {
-        return "jms/message";
-    }
-
-    protected static String decodeString(byte[] data) throws OpenwireException {
-        try {
-            if (data == null) {
-                return null;
-            }
-            return new String(data, "UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new OpenwireException("Invalid UTF-8 encoding: " + e.getMessage());
-        }
-    }
-
-    protected static byte[] encodeString(String data) throws OpenwireException {
-        try {
-            if (data == null) {
-                return null;
-            }
-            return data.getBytes("UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new OpenwireException("Invalid UTF-8 encoding: " + e.getMessage());
-        }
-    }
-
-    public ActiveMQDestination getJMSReplyTo() {
-        return this.getReplyTo();
-    }
-
-    public void setJMSReplyTo(ActiveMQDestination destination) throws OpenwireException {
-        this.setReplyTo(destination);
-    }
-
-    public ActiveMQDestination getJMSDestination() {
-        return this.getDestination();
-    }
-
-    public void setJMSDestination(ActiveMQDestination destination) throws OpenwireException {
-        this.setDestination(destination);
-    }
-
-    public int getJMSDeliveryMode() {
-        return this.isPersistent() ? 2 : 1;
-    }
-
-    public void setJMSDeliveryMode(int mode) {
-        this.setPersistent(mode == 2);
-    }
-
-    public boolean getJMSRedelivered() {
-        return this.isRedelivered();
-    }
-
-    public void setJMSRedelivered(boolean redelivered) {
-        this.setRedelivered(redelivered);
-    }
-
-    public String getJMSType() {
-        return this.getType();
-    }
-
-    public void setJMSType(String type) {
-        this.setType(type);
-    }
-
-    public long getJMSExpiration() {
-        return this.getExpiration();
-    }
-
-    public void setJMSExpiration(long expiration) {
-        this.setExpiration(expiration);
-    }
-
-    public int getJMSPriority() {
-        return this.getPriority();
-    }
-
-    public void setJMSPriority(int priority) {
-        this.setPriority((byte) priority);
-    }
-
     public void clearProperties() {
         super.clearProperties();
         readOnlyProperties = false;
@@ -292,7 +119,7 @@ public class ActiveMQMessage extends Mes
                 if (rc == null) {
                     throw new OpenwireException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setGroupID(rc);
+                message.setGroupID(utf8(rc));
             }
         });
         JMS_PROPERTY_SETERS.put("JMSXGroupSeq", new PropertySetter() {
@@ -310,7 +137,7 @@ public class ActiveMQMessage extends Mes
                 if (rc == null) {
                     throw new OpenwireException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
                 }
-                ((ActiveMQMessage) message).setJMSCorrelationID(rc);
+                message.setCorrelationId(utf8(rc));
             }
         });
         JMS_PROPERTY_SETERS.put("JMSDeliveryMode", new PropertySetter() {
@@ -320,12 +147,11 @@ public class ActiveMQMessage extends Mes
                     Boolean bool = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
                     if (bool == null) {
                         throw new OpenwireException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
-                    }
-                    else {
+                    } else {
                         rc = bool.booleanValue() ? 2 : 1;
                     }
                 }
-                ((ActiveMQMessage) message).setJMSDeliveryMode(rc);
+                message.setPersistent(rc.intValue()==2);
             }
         });
         JMS_PROPERTY_SETERS.put("JMSExpiration", new PropertySetter() {
@@ -334,7 +160,7 @@ public class ActiveMQMessage extends Mes
                 if (rc == null) {
                     throw new OpenwireException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
                 }
-                ((ActiveMQMessage) message).setJMSExpiration(rc.longValue());
+                message.setExpiration(rc.longValue());
             }
         });
         JMS_PROPERTY_SETERS.put("JMSPriority", new PropertySetter() {
@@ -343,7 +169,7 @@ public class ActiveMQMessage extends Mes
                 if (rc == null) {
                     throw new OpenwireException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
                 }
-                ((ActiveMQMessage) message).setJMSPriority(rc.intValue());
+                message.setPriority((byte) rc.intValue());
             }
         });
         JMS_PROPERTY_SETERS.put("JMSRedelivered", new PropertySetter() {
@@ -352,7 +178,7 @@ public class ActiveMQMessage extends Mes
                 if (rc == null) {
                     throw new OpenwireException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
                 }
-                ((ActiveMQMessage) message).setJMSRedelivered(rc.booleanValue());
+                message.setRedelivered(rc.booleanValue());
             }
         });
         JMS_PROPERTY_SETERS.put("JMSReplyTo", new PropertySetter() {
@@ -370,7 +196,7 @@ public class ActiveMQMessage extends Mes
                 if (rc == null) {
                     throw new OpenwireException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
                 }
-                ((ActiveMQMessage) message).setJMSTimestamp(rc.longValue());
+                message.setTimestamp(rc.longValue());
             }
         });
         JMS_PROPERTY_SETERS.put("JMSType", new PropertySetter() {
@@ -379,7 +205,7 @@ public class ActiveMQMessage extends Mes
                 if (rc == null) {
                     throw new OpenwireException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
                 }
-                ((ActiveMQMessage) message).setJMSType(rc);
+                message.setType(utf8(rc));
             }
         });
     }
@@ -411,16 +237,6 @@ public class ActiveMQMessage extends Mes
         }
     }
 
-    public void setProperties(Map properties) throws OpenwireException {
-        for (Iterator iter = properties.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry) iter.next();
-
-            // Lets use the object property method as we may contain standard
-            // extension headers like JMSXGroupID
-            setObjectProperty((String) entry.getKey(), entry.getValue());
-        }
-    }
-
     protected void checkValidObject(Object value) throws OpenwireException {
 
         boolean valid = value instanceof Boolean || value instanceof Byte || value instanceof Short || value instanceof Integer || value instanceof Long;
@@ -442,11 +258,7 @@ public class ActiveMQMessage extends Mes
         if (name == null) {
             throw new NullPointerException("Property name cannot be null");
         }
-//        try {
         return createFilterable().getProperty(name);
-//        } catch (FilterException e) {
-//            throw new JMSException(e);
-//        }
     }
 
     public boolean getBooleanProperty(String name) throws OpenwireException {
@@ -609,14 +421,6 @@ public class ActiveMQMessage extends Mes
         return false;
     }
 
-    public Callback getAcknowledgeCallback() {
-        return acknowledgeCallback;
-    }
-
-    public void setAcknowledgeCallback(Callback acknowledgeCallback) {
-        this.acknowledgeCallback = acknowledgeCallback;
-    }
-
     /**
      * Send operation event listener. Used to get the message ready to be sent.
      */
@@ -625,7 +429,4 @@ public class ActiveMQMessage extends Mes
         setReadOnlyProperties(true);
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processMessage(this);
-    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQQueue.java Mon Jan 30 04:59:15 2012
@@ -21,43 +21,40 @@ import org.apache.activemq.apollo.openwi
 import org.fusesource.hawtbuf.AsciiBuffer;
 
 /**
- * 
- * @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
- *                         Destination"
- * 
  * @openwire:marshaller code="100"
  * @version $Revision: 1.5 $
  */
 public class ActiveMQQueue extends ActiveMQDestination {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_QUEUE;
-    private static final long serialVersionUID = -3885260014960795889L;
 
     public ActiveMQQueue() {
     }
 
     public ActiveMQQueue(String name) {
-        super(name);
+        super(strip(name));
+    }
+    private static String strip(String name) {
+        if(name.startsWith(QUEUE_QUALIFIED_PREFIX)) {
+            return name.substring(QUEUE_QUALIFIED_PREFIX.length());
+        } else {
+            return name;
+        }
     }
-
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
-    public boolean isQueue() {
-        return true;
+    public String getQualifiedPrefix() {
+        return QUEUE_QUALIFIED_PREFIX;
     }
 
-    public String getQueueName() throws OpenwireException {
-        return getPhysicalName();
+    public boolean isQueue() {
+        return true;
     }
 
     public byte getDestinationType() {
         return QUEUE_TYPE;
     }
 
-    protected String getQualifiedPrefix() {
-        return QUEUE_QUALIFIED_PREFIX;
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempDestination.java Mon Jan 30 04:59:15 2012
@@ -16,19 +16,12 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * @openwire:marshaller
  * @version $Revision: 1.5 $
  */
 public abstract class ActiveMQTempDestination extends ActiveMQDestination {
 
-    private static final Log LOG = LogFactory.getLog(ActiveMQTempDestination.class);
-//    protected transient String connectionId;
-//    protected transient int sequenceId;
-
     public ActiveMQTempDestination() {
     }
 
@@ -44,44 +37,4 @@ public abstract class ActiveMQTempDestin
         return true;
     }
 
-    public void setPhysicalName(String physicalName) {
-        super.setPhysicalName(physicalName);
-//        if ( destination.name()!=null ) {
-//            // Parse off the sequenceId off the end.
-//            // this can fail if the temp destination is
-//            // generated by another JMS system via the JMS<->JMS Bridge
-//        	String value = physicalName.toString();
-//            int p = value.lastIndexOf(":");
-//            if (p >= 0) {
-//                String seqStr = value.substring(p + 1).trim();
-//                if (seqStr != null && seqStr.length() > 0) {
-//                    try {
-//                        sequenceId = Integer.parseInt(seqStr);
-//                    } catch (NumberFormatException e) {
-//                        LOG.debug("Did not parse sequence Id from " + physicalName);
-//                    }
-//                    // The rest should be the connection id.
-//                    connectionId = value.substring(0, p);
-//                }
-//            }
-//        }
-    }
-//
-//    public void delete() throws JMSException {
-//        if (connection != null) {
-//            connection.deleteTempDestination(this);
-//        }
-//    }
-//
-//    public String getConnectionId() {
-//        return connectionId;
-//    }
-//
-//    public void setConnectionId(String connectionId) {
-//        this.connectionId = connectionId;
-//    }
-//
-//    public int getSequenceId() {
-//        return sequenceId;
-//    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempQueue.java Mon Jan 30 04:59:15 2012
@@ -27,37 +27,36 @@ import org.fusesource.hawtbuf.AsciiBuffe
 public class ActiveMQTempQueue extends ActiveMQTempDestination {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEMP_QUEUE;
-    private static final long serialVersionUID = 6683049467527633867L;
 
     public ActiveMQTempQueue() {
     }
 
     public ActiveMQTempQueue(String name) {
-        super(name);
+        super(strip(name));
     }
 
-    public ActiveMQTempQueue(ConnectionId connectionId, long sequenceId) {
-        super(connectionId.getValue(), sequenceId);
+    private static String strip(String name) {
+        if(name.startsWith(TEMP_QUEUE_QUALIFED_PREFIX)) {
+            return name.substring(TEMP_QUEUE_QUALIFED_PREFIX.length());
+        } else {
+            return name;
+        }
     }
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
-    public boolean isQueue() {
-        return true;
+    public String getQualifiedPrefix() {
+        return TEMP_QUEUE_QUALIFED_PREFIX;
     }
 
-    public String getQueueName() throws OpenwireException {
-        return getPhysicalName();
+    public boolean isQueue() {
+        return true;
     }
 
     public byte getDestinationType() {
         return TEMP_QUEUE_TYPE;
     }
 
-    protected String getQualifiedPrefix() {
-        return TEMP_QUEUE_QUALIFED_PREFIX;
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTempTopic.java Mon Jan 30 04:59:15 2012
@@ -33,31 +33,31 @@ public class ActiveMQTempTopic extends A
     }
 
     public ActiveMQTempTopic(String name) {
-        super(name);
+        super(strip(name));
     }
 
-    public ActiveMQTempTopic(ConnectionId connectionId, long sequenceId) {
-        super(connectionId.getValue(), sequenceId);
+    private static String strip(String name) {
+        if(name.startsWith(TEMP_TOPIC_QUALIFED_PREFIX)) {
+            return name.substring(TEMP_TOPIC_QUALIFED_PREFIX.length());
+        } else {
+            return name;
+        }
     }
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
-    public boolean isTopic() {
-        return true;
+    public String getQualifiedPrefix() {
+        return TEMP_TOPIC_QUALIFED_PREFIX;
     }
 
-    public String getTopicName() throws OpenwireException {
-        return getPhysicalName();
+    public boolean isTopic() {
+        return true;
     }
 
     public byte getDestinationType() {
         return TEMP_TOPIC_TYPE;
     }
 
-    protected String getQualifiedPrefix() {
-        return TEMP_TOPIC_QUALIFED_PREFIX;
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ActiveMQTopic.java Mon Jan 30 04:59:15 2012
@@ -21,41 +21,41 @@ import org.apache.activemq.apollo.openwi
 import org.fusesource.hawtbuf.AsciiBuffer;
 
 /**
- * @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
- *                         Destination"
  * @openwire:marshaller code="101"
  * @version $Revision: 1.5 $
  */
 public class ActiveMQTopic extends ActiveMQDestination {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TOPIC;
-    private static final long serialVersionUID = 7300307405896488588L;
 
     public ActiveMQTopic() {
     }
 
     public ActiveMQTopic(String name) {
-        super(name);
+        super(strip(name));
+    }
+    private static String strip(String name) {
+        if(name.startsWith(TOPIC_QUALIFIED_PREFIX)) {
+            return name.substring(TOPIC_QUALIFIED_PREFIX.length());
+        } else {
+            return name;
+        }
     }
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
-    public boolean isTopic() {
-        return true;
+    public String getQualifiedPrefix() {
+        return TOPIC_QUALIFIED_PREFIX;
     }
 
-    public String getTopicName() throws OpenwireException {
-        return getPhysicalName();
+    public boolean isTopic() {
+        return true;
     }
 
     public byte getDestinationType() {
         return TOPIC_TYPE;
     }
 
-    protected String getQualifiedPrefix() {
-        return TOPIC_QUALIFIED_PREFIX;
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BaseCommand.java Mon Jan 30 04:59:15 2012
@@ -31,9 +31,6 @@ public abstract class BaseCommand implem
     protected int commandId;
     protected boolean responseRequired;
     
-    private transient Endpoint from;
-    private transient Endpoint to;
-    
     public void copy(BaseCommand copy) {
         copy.commandId = commandId;
         copy.responseRequired = responseRequired;
@@ -106,27 +103,6 @@ public abstract class BaseCommand implem
         return false;
     }
 
-    /**
-     * The endpoint within the transport where this message came from.
-     */
-    public Endpoint getFrom() {
-        return from;
-    }
-
-    public void setFrom(Endpoint from) {
-        this.from = from;
-    }
-
-    /**
-     * The endpoint within the transport where this message is going to - null means all endpoints.
-     */
-    public Endpoint getTo() {
-        return to;
-    }
 
-    public void setTo(Endpoint to) {
-        this.to = to;
-    }
-    
     
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerId.java Mon Jan 30 04:59:15 2012
@@ -16,18 +16,20 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 /**
  * @openwire:marshaller code="124"
  */
 public class BrokerId implements DataStructure {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_ID;
-    protected String value;
+    protected UTF8Buffer value;
 
     public BrokerId() {
     }
 
-    public BrokerId(String brokerId) {
+    public BrokerId(UTF8Buffer brokerId) {
         this.value = brokerId;
     }
 
@@ -51,17 +53,17 @@ public class BrokerId implements DataStr
     }
 
     public String toString() {
-        return value;
+        return value.toString();
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getValue() {
+    public UTF8Buffer getValue() {
         return value;
     }
 
-    public void setValue(String brokerId) {
+    public void setValue(UTF8Buffer brokerId) {
         this.value = brokerId;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/BrokerInfo.java Mon Jan 30 04:59:15 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * When a client connects to a broker, the broker send the client a BrokerInfo
@@ -30,17 +30,17 @@ import org.apache.activemq.apollo.openwi
 public class BrokerInfo extends BaseCommand {
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
     BrokerId brokerId;
-    String brokerURL;
+    UTF8Buffer brokerURL;
     boolean slaveBroker;
     boolean masterBroker;
     boolean faultTolerantConfiguration;
     boolean networkConnection;
     boolean duplexConnection;
     BrokerInfo peerBrokerInfos[];
-    String brokerName;
+    UTF8Buffer brokerName;
     long connectionId;
-    String brokerUploadUrl;
-    String networkProperties;
+    UTF8Buffer brokerUploadUrl;
+    UTF8Buffer networkProperties;
 
     public boolean isBrokerInfo() {
         return true;
@@ -64,11 +64,11 @@ public class BrokerInfo extends BaseComm
     /**
      * @openwire:property version=1
      */
-    public String getBrokerURL() {
+    public UTF8Buffer getBrokerURL() {
         return brokerURL;
     }
 
-    public void setBrokerURL(String brokerURL) {
+    public void setBrokerURL(UTF8Buffer brokerURL) {
         this.brokerURL = brokerURL;
     }
 
@@ -86,18 +86,14 @@ public class BrokerInfo extends BaseComm
     /**
      * @openwire:property version=1
      */
-    public String getBrokerName() {
+    public UTF8Buffer getBrokerName() {
         return brokerName;
     }
 
-    public void setBrokerName(String brokerName) {
+    public void setBrokerName(UTF8Buffer brokerName) {
         this.brokerName = brokerName;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processBrokerInfo(this);
-    }
-
     /**
      * @openwire:property version=1
      */
@@ -187,11 +183,11 @@ public class BrokerInfo extends BaseComm
      * 
      * @openwire:property version=3
      */
-    public String getBrokerUploadUrl() {
+    public UTF8Buffer getBrokerUploadUrl() {
         return brokerUploadUrl;
     }
 
-    public void setBrokerUploadUrl(String brokerUploadUrl) {
+    public void setBrokerUploadUrl(UTF8Buffer brokerUploadUrl) {
         this.brokerUploadUrl = brokerUploadUrl;
     }
 
@@ -199,14 +195,14 @@ public class BrokerInfo extends BaseComm
      * @openwire:property version=3 cache=false
      * @return the networkProperties
      */
-    public String getNetworkProperties() {
+    public UTF8Buffer getNetworkProperties() {
         return this.networkProperties;
     }
 
     /**
      * @param networkProperties the networkProperties to set
      */
-    public void setNetworkProperties(String networkProperties) {
+    public void setNetworkProperties(UTF8Buffer networkProperties) {
         this.networkProperties = networkProperties;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Command.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * The Command Pattern so that we can send and receive commands on the different
  * transports
@@ -53,21 +51,4 @@ public interface Command extends DataStr
 
     boolean isShutdownInfo();
 
-    Response visit(CommandVisitor visitor) throws Exception;
-
-    /**
-     * The endpoint within the transport where this message came from which
-     * could be null if the transport only supports a single endpoint.
-     */
-    Endpoint getFrom();
-
-    void setFrom(Endpoint from);
-
-    /**
-     * The endpoint within the transport where this message is going to - null
-     * means all endpoints.
-     */
-    Endpoint getTo();
-
-    void setTo(Endpoint to);
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionControl.java Mon Jan 30 04:59:15 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * Used to start and stop transports as well as terminating clients.
@@ -25,24 +25,22 @@ import org.apache.activemq.apollo.openwi
  * @version $Revision: 1.1 $
  */
 public class ConnectionControl extends BaseCommand {
+    public static final UTF8Buffer EMPTY = new UTF8Buffer("");
+
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_CONTROL;
     protected boolean suspend;
     protected boolean resume;
     protected boolean close;
     protected boolean exit;
     protected boolean faultTolerant;
-    protected String connectedBrokers="";
-    protected String reconnectTo = "";
+    protected UTF8Buffer connectedBrokers = EMPTY;
+    protected UTF8Buffer reconnectTo = EMPTY;
     protected boolean rebalanceConnection;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processConnectionControl(this);
-    }
-
     /**
      * @openwire:property version=1
      * @return Returns the close.
@@ -122,14 +120,14 @@ public class ConnectionControl extends B
      * @openwire:property version=6 cache=false
      * @return connected brokers.
      */
-    public String getConnectedBrokers() {
+    public UTF8Buffer getConnectedBrokers() {
         return this.connectedBrokers;
     }
 
     /**
      * @param connectedBrokers the connectedBrokers to set
      */
-    public void setConnectedBrokers(String connectedBrokers) {
+    public void setConnectedBrokers(UTF8Buffer connectedBrokers) {
         this.connectedBrokers = connectedBrokers;
     }
 
@@ -137,14 +135,14 @@ public class ConnectionControl extends B
      *  @openwire:property version=6 cache=false
      * @return the reconnectTo
      */
-    public String getReconnectTo() {
+    public UTF8Buffer getReconnectTo() {
         return this.reconnectTo;
     }
 
     /**
      * @param reconnectTo the reconnectTo to set
      */
-    public void setReconnectTo(String reconnectTo) {
+    public void setReconnectTo(UTF8Buffer reconnectTo) {
         this.reconnectTo = reconnectTo;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionError.java Mon Jan 30 04:59:15 2012
@@ -31,10 +31,6 @@ public class ConnectionError extends Bas
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        return visitor.processConnectionError(this);
-    }
-
     /**
      * @openwire:property version=1
      */

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionId.java Mon Jan 30 04:59:15 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 /**
  * @openwire:marshaller code="120"
  */
@@ -23,12 +25,12 @@ public class ConnectionId implements Dat
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_ID;
 
-    protected String value;
+    protected UTF8Buffer value;
 
     public ConnectionId() {
     }
 
-    public ConnectionId(String connectionId) {
+    public ConnectionId(UTF8Buffer connectionId) {
         this.value = connectionId;
     }
 
@@ -68,17 +70,17 @@ public class ConnectionId implements Dat
     }
 
     public String toString() {
-        return value;
+        return value.toString();
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getValue() {
+    public UTF8Buffer getValue() {
         return value;
     }
 
-    public void setValue(String connectionId) {
+    public void setValue(UTF8Buffer connectionId) {
         this.value = connectionId;
     }
 



Mime
View raw message