activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1237540 [2/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-openwire-generator/src/main/scala/org/apache/activemq/apollo/openwire/generator/ apollo-openwire/src/main/scala/org/...
Date Mon, 30 Jan 2012 04:59:17 GMT
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConnectionInfo.java Mon Jan 30 04:59:15 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * 
@@ -28,10 +28,10 @@ public class ConnectionInfo extends Base
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONNECTION_INFO;
 
     protected ConnectionId connectionId;
-    protected String clientId;
-    protected String clientIp;
-    protected String userName;
-    protected String password;
+    protected UTF8Buffer clientId;
+    protected UTF8Buffer clientIp;
+    protected UTF8Buffer userName;
+    protected UTF8Buffer password;
     protected BrokerId[] brokerPath;
     protected boolean brokerMasterConnector;
     protected boolean manageable;
@@ -85,11 +85,11 @@ public class ConnectionInfo extends Base
     /**
      * @openwire:property version=1
      */
-    public String getClientId() {
+    public UTF8Buffer getClientId() {
         return clientId;
     }
 
-    public void setClientId(String clientId) {
+    public void setClientId(UTF8Buffer clientId) {
         this.clientId = clientId;
     }
 
@@ -102,22 +102,22 @@ public class ConnectionInfo extends Base
     /**
      * @openwire:property version=1
      */
-    public String getPassword() {
+    public UTF8Buffer getPassword() {
         return password;
     }
 
-    public void setPassword(String password) {
+    public void setPassword(UTF8Buffer password) {
         this.password = password;
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getUserName() {
+    public UTF8Buffer getUserName() {
         return userName;
     }
 
-    public void setUserName(String userName) {
+    public void setUserName(UTF8Buffer userName) {
         this.userName = userName;
     }
 
@@ -134,10 +134,6 @@ public class ConnectionInfo extends Base
         this.brokerPath = brokerPath;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processAddConnection(this);
-    }
-
     /**
      * @openwire:property version=1
      */
@@ -233,11 +229,11 @@ public class ConnectionInfo extends Base
     /**
      * @openwire:property version=8
      */
-    public String getClientIp() {
+    public UTF8Buffer getClientIp() {
         return clientIp;
     }
 
-    public void setClientIp(String clientIp) {
+    public void setClientIp(UTF8Buffer clientIp) {
         this.clientIp = clientIp;
     }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerControl.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerControl.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerControl.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerControl.java Mon Jan 30 04:59:15 2012
@@ -50,10 +50,6 @@ public class ConsumerControl extends Bas
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        return visitor.processConsumerControl(this);
-    }
-
     /**
      * @openwire:property version=1
      * @return Returns the close.

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerId.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerId.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerId.java Mon Jan 30 04:59:15 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 /**
  * @openwire:marshaller code="122"
  */
@@ -23,7 +25,7 @@ public class ConsumerId implements DataS
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONSUMER_ID;
 
-    protected String connectionId;
+    protected UTF8Buffer connectionId;
     protected long sessionId;
     protected long value;
 
@@ -85,11 +87,11 @@ public class ConsumerId implements DataS
     /**
      * @openwire:property version=1
      */
-    public String getConnectionId() {
+    public UTF8Buffer getConnectionId() {
         return connectionId;
     }
 
-    public void setConnectionId(String connectionId) {
+    public void setConnectionId(UTF8Buffer connectionId) {
         this.connectionId = connectionId;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ConsumerInfo.java Mon Jan 30 04:59:15 2012
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.activemq.apollo.filter.BooleanExpression;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * @openwire:marshaller code="5"
@@ -40,8 +41,8 @@ public class ConsumerInfo extends BaseCo
     protected int maximumPendingMessageLimit;
     protected boolean browser;
     protected boolean dispatchAsync;
-    protected String selector;
-    protected String subscriptionName;
+    protected UTF8Buffer selector;
+    protected UTF8Buffer subscriptionName;
     protected boolean noLocal;
     protected boolean exclusive;
     protected boolean retroactive;
@@ -206,11 +207,11 @@ public class ConsumerInfo extends BaseCo
      * 
      * @openwire:property version=1
      */
-    public String getSelector() {
+    public UTF8Buffer getSelector() {
         return selector;
     }
 
-    public void setSelector(String selector) {
+    public void setSelector(UTF8Buffer selector) {
         this.selector = selector;
     }
 
@@ -219,11 +220,11 @@ public class ConsumerInfo extends BaseCo
      * 
      * @openwire:property version=1
      */
-    public String getSubscriptionName() {
+    public UTF8Buffer getSubscriptionName() {
         return subscriptionName;
     }
 
-    public void setSubscriptionName(String durableSubscriptionId) {
+    public void setSubscriptionName(UTF8Buffer durableSubscriptionId) {
         this.subscriptionName = durableSubscriptionId;
     }
 
@@ -232,7 +233,7 @@ public class ConsumerInfo extends BaseCo
      * @return
      * @see getSubscriptionName
      */
-    public String getSubcriptionName() {
+    public UTF8Buffer getSubcriptionName() {
         return subscriptionName;
     }
 
@@ -241,7 +242,7 @@ public class ConsumerInfo extends BaseCo
      * @see setSubscriptionName
      * @param durableSubscriptionId
      */
-    public void setSubcriptionName(String durableSubscriptionId) {
+    public void setSubcriptionName(UTF8Buffer durableSubscriptionId) {
         this.subscriptionName = durableSubscriptionId;
     }
 
@@ -344,10 +345,6 @@ public class ConsumerInfo extends BaseCo
         this.additionalPredicate = additionalPredicate;
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        return visitor.processAddConsumer(this);
-    }
-
     /**
      * @openwire:property version=1
      * @return Returns the networkSubscription.

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ControlCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ControlCommand.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ControlCommand.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ControlCommand.java Mon Jan 30 04:59:15 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * Used to start and stop transports as well as terminating clients.
@@ -29,7 +29,7 @@ public class ControlCommand extends Base
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.CONTROL_COMMAND;
 
-    private String command;
+    private UTF8Buffer command;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -38,15 +38,12 @@ public class ControlCommand extends Base
     /**
      * @openwire:property version=1
      */
-    public String getCommand() {
+    public UTF8Buffer getCommand() {
         return command;
     }
 
-    public void setCommand(String command) {
+    public void setCommand(UTF8Buffer command) {
         this.command = command;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processControlCommand(this);
-    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DestinationInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DestinationInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DestinationInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DestinationInfo.java Mon Jan 30 04:59:15 2012
@@ -18,8 +18,6 @@ package org.apache.activemq.apollo.openw
 
 import java.io.IOException;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * Used to create and destroy destinations on the broker.
  * 
@@ -117,13 +115,4 @@ public class DestinationInfo extends Bas
         this.brokerPath = brokerPath;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        if (isAddOperation()) {
-            return visitor.processAddDestination(this);
-        } else if (isRemoveOperation()) {
-            return visitor.processRemoveDestination(this);
-        }
-        throw new IOException("Unknown operation type: " + getOperationType());
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DiscoveryEvent.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DiscoveryEvent.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DiscoveryEvent.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/DiscoveryEvent.java Mon Jan 30 04:59:15 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 /**
  * Represents a discovery event containing the details of the service
  * 
@@ -26,13 +28,13 @@ public class DiscoveryEvent implements D
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.DISCOVERY_EVENT;
 
-    protected String serviceName;
-    protected String brokerName;
+    protected UTF8Buffer serviceName;
+    protected UTF8Buffer brokerName;
 
     public DiscoveryEvent() {
     }
 
-    public DiscoveryEvent(String serviceName) {
+    public DiscoveryEvent(UTF8Buffer serviceName) {
         this.serviceName = serviceName;
     }
 
@@ -43,22 +45,22 @@ public class DiscoveryEvent implements D
     /**
      * @openwire:property version=1
      */
-    public String getServiceName() {
+    public UTF8Buffer getServiceName() {
         return serviceName;
     }
 
-    public void setServiceName(String serviceName) {
+    public void setServiceName(UTF8Buffer serviceName) {
         this.serviceName = serviceName;
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getBrokerName() {
+    public UTF8Buffer getBrokerName() {
         return brokerName;
     }
 
-    public void setBrokerName(String name) {
+    public void setBrokerName(UTF8Buffer name) {
         this.brokerName = name;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/FlushCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/FlushCommand.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/FlushCommand.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/FlushCommand.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * An indication to the transport layer that a flush is required.
  * 
@@ -32,8 +30,4 @@ public class FlushCommand extends BaseCo
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processFlush(this);
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/KeepAliveInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/KeepAliveInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/KeepAliveInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/KeepAliveInfo.java Mon Jan 30 04:59:15 2012
@@ -17,7 +17,6 @@
 package org.apache.activemq.apollo.openwire.command;
 
 import org.apache.activemq.apollo.util.IntrospectionSupport;
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
 
 /**
  * @openwire:marshaller code="10"
@@ -26,9 +25,6 @@ public class KeepAliveInfo extends BaseC
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.KEEP_ALIVE_INFO;
 
-    private transient Endpoint from;
-    private transient Endpoint to;
-
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
@@ -57,33 +53,6 @@ public class KeepAliveInfo extends BaseC
         return false;
     }
 
-    /**
-     * The endpoint within the transport where this message came from.
-     */
-    public Endpoint getFrom() {
-        return from;
-    }
-
-    public void setFrom(Endpoint from) {
-        this.from = from;
-    }
-
-    /**
-     * The endpoint within the transport where this message is going to - null
-     * means all endpoints.
-     */
-    public Endpoint getTo() {
-        return to;
-    }
-
-    public void setTo(Endpoint to) {
-        this.to = to;
-    }
-
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processKeepAlive(this);
-    }
-
     public boolean isMarshallAware() {
         return false;
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/LastPartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/LastPartialCommand.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/LastPartialCommand.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/LastPartialCommand.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * Represents the end marker of a stream of {@link PartialCommand} instances.
  * 
@@ -34,18 +32,4 @@ public class LastPartialCommand extends 
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        throw new IllegalStateException("The transport layer should filter out LastPartialCommand instances but received: " + this);
-    }
-
-    /**
-     * Lets copy across any transient fields from this command 
-     * to the complete command when it is unmarshalled on the other end
-     *
-     * @param completeCommand the newly unmarshalled complete command
-     */
-    public void configure(Command completeCommand) {
-        // copy across the transient properties added by the low level transport
-        completeCommand.setFrom(getFrom());
-    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Message.java Mon Jan 30 04:59:15 2012
@@ -32,6 +32,7 @@ import org.apache.activemq.apollo.openwi
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.ByteArrayInputStream;
 import org.fusesource.hawtbuf.ByteArrayOutputStream;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * Represents an ActiveMQ message
@@ -58,16 +59,16 @@ public abstract class Message extends Ba
     protected long arrival;
     protected long brokerInTime;
     protected long brokerOutTime;
-    protected String correlationId;
+    protected UTF8Buffer correlationId;
     protected ActiveMQDestination replyTo;
     protected boolean persistent;
-    protected String type;
+    protected UTF8Buffer type;
     protected byte priority;
-    protected String groupID;
+    protected UTF8Buffer groupID;
     protected int groupSequence;
     protected ConsumerId targetConsumerId;
     protected boolean compressed;
-    protected String userID;
+    protected UTF8Buffer userID;
 
     protected Buffer content;
     protected Buffer marshalledProperties;
@@ -303,11 +304,11 @@ public abstract class Message extends Ba
     /**
      * @openwire:property version=1
      */
-    public String getGroupID() {
+    public UTF8Buffer getGroupID() {
         return groupID;
     }
 
-    public void setGroupID(String groupID) {
+    public void setGroupID(UTF8Buffer groupID) {
         this.groupID = groupID;
     }
 
@@ -325,11 +326,11 @@ public abstract class Message extends Ba
     /**
      * @openwire:property version=1
      */
-    public String getCorrelationId() {
+    public UTF8Buffer getCorrelationId() {
         return correlationId;
     }
 
-    public void setCorrelationId(String correlationId) {
+    public void setCorrelationId(UTF8Buffer correlationId) {
         this.correlationId = correlationId;
     }
 
@@ -391,11 +392,11 @@ public abstract class Message extends Ba
     /**
      * @openwire:property version=1
      */
-    public String getType() {
+    public UTF8Buffer getType() {
         return type;
     }
 
-    public void setType(String type) {
+    public void setType(UTF8Buffer type) {
         this.type = type;
     }
 
@@ -554,11 +555,11 @@ public abstract class Message extends Ba
      * 
      * @openwire:property version=1
      */
-    public String getUserID() {
+    public UTF8Buffer getUserID() {
         return userID;
     }
 
-    public void setUserID(String jmsxUserID) {
+    public void setUserID(UTF8Buffer jmsxUserID) {
         this.userID = jmsxUserID;
     }
 
@@ -828,7 +829,7 @@ public abstract class Message extends Ba
         return new Filterable() {
             public <T> T getBodyAs(Class<T> type) throws FilterException {
                 try {
-                    if( type == String.class ) {
+                    if( type == UTF8Buffer.class ) {
                         if ( Message.this instanceof ActiveMQTextMessage ) {
                             return type.cast(((ActiveMQTextMessage)Message.this).getText());
                         }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageAck.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * @openwire:marshaller code="22"
  * @version $Revision: 1.11 $
@@ -211,10 +209,6 @@ public class MessageAck extends BaseComm
         this.messageCount = messageCount;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processMessageAck(this);
-    }
-
     /**
      * The cause of a poison ack, if a message listener
      * throws an exception it will be recorded here

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatch.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * 
  * @openwire:marshaller code="21"
@@ -31,11 +29,6 @@ public class MessageDispatch extends Bas
     protected Message message;
     protected int redeliveryCounter;
 
-    protected transient long deliverySequenceId;
-    protected transient Object consumer;
-    protected transient Runnable transmitCallback;
-    protected transient Throwable rollbackCause;
-
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
@@ -77,14 +70,6 @@ public class MessageDispatch extends Bas
         this.message = message;
     }
 
-    public long getDeliverySequenceId() {
-        return deliverySequenceId;
-    }
-
-    public void setDeliverySequenceId(long deliverySequenceId) {
-        this.deliverySequenceId = deliverySequenceId;
-    }
-
     /**
      * @openwire:property version=1
      */
@@ -96,31 +81,4 @@ public class MessageDispatch extends Bas
         this.redeliveryCounter = deliveryCounter;
     }
 
-    public Object getConsumer() {
-        return consumer;
-    }
-
-    public void setConsumer(Object consumer) {
-        this.consumer = consumer;
-    }
-
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processMessageDispatch(this);
-    }
-
-    public Runnable getTransmitCallback() {
-        return transmitCallback;
-    }
-
-    public void setTransmitCallback(Runnable transmitCallback) {
-        this.transmitCallback = transmitCallback;
-    }
-
-    public Throwable getRollbackCause() {
-        return rollbackCause;
-    }
-
-    public void setRollbackCause(Throwable rollbackCause) {
-        this.rollbackCause = rollbackCause;
-    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessageDispatchNotification.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * @openwire:marshaller code="90"
  */
@@ -72,10 +70,6 @@ public class MessageDispatchNotification
         this.deliverySequenceId = deliverySequenceId;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processMessageDispatchNotification(this);
-    }
-
     /**
      * @openwire:property version=1
      */

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/MessagePull.java Mon Jan 30 04:59:15 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 /**
  * Used to pull messages on demand.
  * 
@@ -30,16 +32,12 @@ public class MessagePull extends BaseCom
     protected ActiveMQDestination destination;
     protected long timeout;
     private MessageId messageId;
-    private String correlationId;
+    private UTF8Buffer correlationId;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        return visitor.processMessagePull(this);
-    }
-
     /**
      * Configures a message pull from the consumer information
      */
@@ -87,11 +85,11 @@ public class MessagePull extends BaseCom
      *
      * @openwire:property version=3
      */
-    public String getCorrelationId() {
+    public UTF8Buffer getCorrelationId() {
         return correlationId;
     }
 
-    public void setCorrelationId(String correlationId) {
+    public void setCorrelationId(UTF8Buffer correlationId) {
         this.correlationId = correlationId;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/PartialCommand.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * Represents a partial command; a large command that has been split up into
  * pieces.
@@ -32,9 +30,6 @@ public class PartialCommand implements C
     private int commandId;
     private byte[] data;
 
-    private transient Endpoint from;
-    private transient Endpoint to;
-
     public PartialCommand() {
     }
 
@@ -66,26 +61,6 @@ public class PartialCommand implements C
         this.data = data;
     }
 
-    public Endpoint getFrom() {
-        return from;
-    }
-
-    public void setFrom(Endpoint from) {
-        this.from = from;
-    }
-
-    public Endpoint getTo() {
-        return to;
-    }
-
-    public void setTo(Endpoint to) {
-        this.to = to;
-    }
-
-    public Response visit(CommandVisitor visitor) throws Exception {
-        throw new IllegalStateException("The transport layer should filter out PartialCommand instances but received: " + this);
-    }
-
     public boolean isResponseRequired() {
         return false;
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerAck.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * A ProducerAck command is sent by a broker to a producer to let it know it has
  * received and processed messages that it has produced. The producer will be
@@ -52,10 +50,6 @@ public class ProducerAck extends BaseCom
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processProducerAck(this);
-    }
-
     /**
      * The producer id that this ack message is destined for.
      * 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerId.java Mon Jan 30 04:59:15 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 /**
  * @openwire:marshaller code="123"
  */
@@ -23,7 +25,7 @@ public class ProducerId implements DataS
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PRODUCER_ID;
 
-    protected String connectionId;
+    protected UTF8Buffer connectionId;
     protected long sessionId;
     protected long value;
 
@@ -92,7 +94,7 @@ public class ProducerId implements DataS
             sessionKey = sessionKey.substring(0, p);
         }
         // The rest is the value
-        connectionId = sessionKey;
+        connectionId = new UTF8Buffer(sessionKey);
     }
 
     public String toString() {
@@ -109,11 +111,11 @@ public class ProducerId implements DataS
     /**
      * @openwire:property version=1 cache=true
      */
-    public String getConnectionId() {
+    public UTF8Buffer getConnectionId() {
         return connectionId;
     }
 
-    public void setConnectionId(String connectionId) {
+    public void setConnectionId(UTF8Buffer connectionId) {
         this.connectionId = connectionId;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ProducerInfo.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * 
  * @openwire:marshaller code="6"
@@ -101,10 +99,6 @@ public class ProducerInfo extends BaseCo
         this.brokerPath = brokerPath;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processAddProducer(this);
-    }
-
     /**
      * If the broker should dispatch messages from this producer async. Since
      * sync dispatch could potentally block the producer thread, this could be

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveInfo.java Mon Jan 30 04:59:15 2012
@@ -18,8 +18,6 @@ package org.apache.activemq.apollo.openw
 
 import java.io.IOException;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * Removes a consumer, producer, session or connection.
  * 
@@ -65,21 +63,6 @@ public class RemoveInfo extends BaseComm
         this.lastDeliveredSequenceId = lastDeliveredSequenceId;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        switch (objectId.getDataStructureType()) {
-        case ConnectionId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveConnection(this, (ConnectionId)objectId, lastDeliveredSequenceId);
-        case SessionId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveSession(this, (SessionId)objectId, lastDeliveredSequenceId);
-        case ConsumerId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveConsumer(this, (ConsumerId)objectId, lastDeliveredSequenceId);
-        case ProducerId.DATA_STRUCTURE_TYPE:
-            return visitor.processRemoveProducer(this, (ProducerId)objectId);
-        default:
-            throw new IOException("Unknown remove command type: " + objectId.getDataStructureType());
-        }
-    }
-
     /**
      * Returns true if this event is for a removed connection
      */

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/RemoveSubscriptionInfo.java Mon Jan 30 04:59:15 2012
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * @openwire:marshaller code="9"
@@ -27,8 +27,8 @@ public class RemoveSubscriptionInfo exte
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.REMOVE_SUBSCRIPTION_INFO;
 
     protected ConnectionId connectionId;
-    protected String clientId;
-    protected String subscriptionName;
+    protected UTF8Buffer clientId;
+    protected UTF8Buffer subscriptionName;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -48,27 +48,23 @@ public class RemoveSubscriptionInfo exte
     /**
      * @openwire:property version=1
      */
-    public String getSubscriptionName() {
+    public UTF8Buffer getSubscriptionName() {
         return subscriptionName;
     }
 
-    public void setSubscriptionName(String subscriptionName) {
+    public void setSubscriptionName(UTF8Buffer subscriptionName) {
         this.subscriptionName = subscriptionName;
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getClientId() {
+    public UTF8Buffer getClientId() {
         return clientId;
     }
 
-    public void setClientId(String clientId) {
+    public void setClientId(UTF8Buffer clientId) {
         this.clientId = clientId;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processRemoveSubscription(this);
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ReplayCommand.java Mon Jan 30 04:59:15 2012
@@ -82,10 +82,6 @@ public class ReplayCommand extends BaseC
         this.lastAckNumber = lastSequenceNumber;
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        return null;
-    }
-
     /**
      * Is used to specify the first sequence number to be replayed
      * 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/Response.java Mon Jan 30 04:59:15 2012
@@ -48,7 +48,4 @@ public class Response extends BaseComman
         return false;
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        return null;
-    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionId.java Mon Jan 30 04:59:15 2012
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
+import org.fusesource.hawtbuf.UTF8Buffer;
+
 /**
  * 
  * @openwire:marshaller code="121"
@@ -24,7 +26,7 @@ public class SessionId implements DataSt
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.SESSION_ID;
 
-    protected String connectionId;
+    protected UTF8Buffer connectionId;
     protected long value;
 
     protected transient int hashCode;
@@ -86,11 +88,11 @@ public class SessionId implements DataSt
     /**
      * @openwire:property version=1 cache=true
      */
-    public String getConnectionId() {
+    public UTF8Buffer getConnectionId() {
         return connectionId;
     }
 
-    public void setConnectionId(String connectionId) {
+    public void setConnectionId(UTF8Buffer connectionId) {
         this.connectionId = connectionId;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SessionInfo.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * 
  * @openwire:marshaller code="4"
@@ -62,8 +60,4 @@ public class SessionInfo extends BaseCom
         return command;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processAddSession(this);
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/ShutdownInfo.java Mon Jan 30 04:59:15 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.apollo.openwire.command;
 
-import org.apache.activemq.apollo.openwire.support.state.CommandVisitor;
-
 /**
  * 
  * @openwire:marshaller code="11"
@@ -30,10 +28,6 @@ public class ShutdownInfo extends BaseCo
         return DATA_STRUCTURE_TYPE;
     }
 
-    public Response visit(CommandVisitor visitor) throws Exception {
-        return visitor.processShutdown(this);
-    }
-
     public boolean isShutdownInfo() {
         return true;
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/SubscriptionInfo.java Mon Jan 30 04:59:15 2012
@@ -18,6 +18,7 @@ package org.apache.activemq.apollo.openw
 
 
 import org.apache.activemq.apollo.util.IntrospectionSupport;
+import org.fusesource.hawtbuf.UTF8Buffer;
 
 /**
  * Used to represent a durable subscription.
@@ -31,9 +32,9 @@ public class SubscriptionInfo implements
 
     protected ActiveMQDestination subscribedDestination;
     protected ActiveMQDestination destination;
-    protected String clientId;
-    protected String subscriptionName;
-    protected String selector;
+    protected UTF8Buffer clientId;
+    protected UTF8Buffer subscriptionName;
+    protected UTF8Buffer selector;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -42,11 +43,11 @@ public class SubscriptionInfo implements
     /**
      * @openwire:property version=1
      */
-    public String getClientId() {
+    public UTF8Buffer getClientId() {
         return clientId;
     }
 
-    public void setClientId(String clientId) {
+    public void setClientId(UTF8Buffer clientId) {
         this.clientId = clientId;
     }
 
@@ -67,22 +68,22 @@ public class SubscriptionInfo implements
     /**
      * @openwire:property version=1
      */
-    public String getSelector() {
+    public UTF8Buffer getSelector() {
         return selector;
     }
 
-    public void setSelector(String selector) {
+    public void setSelector(UTF8Buffer selector) {
         this.selector = selector;
     }
 
     /**
      * @openwire:property version=1
      */
-    public String getSubscriptionName() {
+    public UTF8Buffer getSubscriptionName() {
         return subscriptionName;
     }
 
-    public void setSubscriptionName(String subscriptionName) {
+    public void setSubscriptionName(UTF8Buffer subscriptionName) {
         this.subscriptionName = subscriptionName;
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/TransactionInfo.java Mon Jan 30 04:59:15 2012
@@ -86,27 +86,4 @@ public class TransactionInfo extends Bas
         this.type = type;
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        switch (type) {
-        case TransactionInfo.BEGIN:
-            return visitor.processBeginTransaction(this);
-        case TransactionInfo.END:
-            return visitor.processEndTransaction(this);
-        case TransactionInfo.PREPARE:
-            return visitor.processPrepareTransaction(this);
-        case TransactionInfo.COMMIT_ONE_PHASE:
-            return visitor.processCommitTransactionOnePhase(this);
-        case TransactionInfo.COMMIT_TWO_PHASE:
-            return visitor.processCommitTransactionTwoPhase(this);
-        case TransactionInfo.ROLLBACK:
-            return visitor.processRollbackTransaction(this);
-        case TransactionInfo.RECOVER:
-            return visitor.processRecoverTransactions(this);
-        case TransactionInfo.FORGET:
-            return visitor.processForgetTransaction(this);
-        default:
-            throw new IOException("Transaction info type unknown: " + type);
-        }
-    }
-
 }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/command/WireFormatInfo.java Mon Jan 30 04:59:15 2012
@@ -44,8 +44,6 @@ public class WireFormatInfo implements C
     protected Buffer marshalledProperties;
 
     protected transient Map<String, Object> properties;
-    private transient Endpoint from;
-    private transient Endpoint to;
 
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
@@ -92,29 +90,6 @@ public class WireFormatInfo implements C
         this.marshalledProperties = marshalledProperties;
     }
 
-    /**
-     * The endpoint within the transport where this message came from.
-     */
-    public Endpoint getFrom() {
-        return from;
-    }
-
-    public void setFrom(Endpoint from) {
-        this.from = from;
-    }
-
-    /**
-     * The endpoint within the transport where this message is going to - null
-     * means all endpoints.
-     */
-    public Endpoint getTo() {
-        return to;
-    }
-
-    public void setTo(Endpoint to) {
-        this.to = to;
-    }
-
     // ////////////////////
     // 
     // Implementation Methods.
@@ -291,10 +266,6 @@ public class WireFormatInfo implements C
         setProperty("CacheSize", new Integer(cacheSize));
     }
 
-    public Response visit(org.apache.activemq.apollo.openwire.support.state.CommandVisitor visitor) throws Exception {
-        return visitor.processWireFormat(this);
-    }
-
     public String toString() {
         Map<String, Object> p = null;
         try {

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/support/advisory/AdvisorySupport.java Mon Jan 30 04:59:15 2012
@@ -18,6 +18,8 @@ package org.apache.activemq.apollo.openw
 
 import org.apache.activemq.apollo.openwire.command.ActiveMQDestination;
 import org.apache.activemq.apollo.openwire.command.ActiveMQTopic;
+import org.fusesource.hawtbuf.UTF8Buffer;
+import static org.fusesource.hawtbuf.Buffer.*;
 
 public final class AdvisorySupport {
 
@@ -46,6 +48,7 @@ public final class AdvisorySupport {
     public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
     public static final String AGENT_TOPIC = "ActiveMQ.Agent";
     public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
+    public static final UTF8Buffer ADIVSORY_MESSAGE_TYPE_BUFFER = utf8("Advisory");
     public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
     public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME="originBrokerName";
     public static final String MSG_PROPERTY_ORIGIN_BROKER_URL="originBrokerURL";
@@ -168,172 +171,52 @@ public final class AdvisorySupport {
     }
 
     public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) || destination.equals(QUEUE_ADVISORY_TOPIC)
-                   || destination.equals(TOPIC_ADVISORY_TOPIC);
-        }
+        return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) || destination.equals(QUEUE_ADVISORY_TOPIC)
+               || destination.equals(TOPIC_ADVISORY_TOPIC);
     }
 
     public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(ADVISORY_TOPIC_PREFIX));
     }
 
     public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.equals(CONNECTION_ADVISORY_TOPIC);
-        }
+        return destination.equals(CONNECTION_ADVISORY_TOPIC);
     }
 
     public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isProducerAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(PRODUCER_ADVISORY_TOPIC_PREFIX));
     }
 
     public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(CONSUMER_ADVISORY_TOPIC_PREFIX));
     }
     
     public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(SLOW_CONSUMER_TOPIC_PREFIX));
     }
     
     public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(FAST_PRODUCER_TOPIC_PREFIX));
     }
     
     public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(MESSAGE_CONSUMED_TOPIC_PREFIX));
     }
     
     public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(MASTER_BROKER_TOPIC_PREFIX));
     }
     
     public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(MESSAGE_DELIVERED_TOPIC_PREFIX));
     }
     
     public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(MESSAGE_DISCAREDED_TOPIC_PREFIX));
     }
     
     public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
-        if (destination.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                if (isFullAdvisoryTopic(compositeDestinations[i])) {
-                    return true;
-                }
-            }
-            return false;
-        } else {
-            return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
-        }
+        return destination.isTopic() && destination.getPhysicalName().startsWith(utf8(FULL_TOPIC_PREFIX));
     }
 
     /**

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Mon Jan 30 04:59:15 2012
@@ -33,6 +33,7 @@ import org.apache.activemq.apollo.util._
 import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
 import path.PathParser
+import path.PathParser._
 import scala.util.continuations._
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
@@ -1035,12 +1036,41 @@ class StompProtocolHandler extends Proto
 
   var message_id_counter = 0;
 
+  def encode_destination(value: Array[DestinationDTO]): String = {
+    if (value == null) {
+      null
+    } else {
+      val rc = new StringBuilder
+      value.foreach { dest =>
+        if (rc.length != 0 ) {
+          assert( destination_parser.destination_separator!=null )
+          rc.append(destination_parser.destination_separator)
+        }
+        import collection.JavaConversions._
+        dest match {
+          case d:QueueDestinationDTO =>
+            rc.append(destination_parser.queue_prefix)
+            rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
+          case d:DurableSubscriptionDestinationDTO =>
+            rc.append(destination_parser.dsub_prefix)
+            rc.append(destination_parser.unsanitize_destination_part(d.subscription_id))
+          case d:TopicDestinationDTO =>
+            rc.append(destination_parser.topic_prefix)
+            rc.append(destination_parser.encode_path_iter(dest.path.toIterable, false))
+          case _ =>
+            throw new Exception("Uknown destination type: "+dest.getClass);
+        }
+      }
+      rc.toString
+    }
+  }
+
   def updated_headers(destination: Array[DestinationDTO], headers:HeaderMap) = {
     var rc:HeaderMap=Nil
 
     // Do we need to re-write the destination names?
     if( destination.find(_.temp()).isDefined ) {
-      rc ::= (DESTINATION -> encode_header(destination_parser.encode_destination(destination)))
+      rc ::= (DESTINATION -> encode_header(encode_destination(destination)))
     }
     get(headers, REPLY_TO).foreach { value=>
       // we may need to translate local temp destination names to broker destination names
@@ -1048,7 +1078,7 @@ class StompProtocolHandler extends Proto
         try {
           val dests: Array[DestinationDTO] = value
           if (dests.find(_.temp()).isDefined) {
-            rc ::= (REPLY_TO -> encode_header(destination_parser.encode_destination(dests)))
+            rc ::= (REPLY_TO -> encode_header(encode_destination(dests)))
           }
         } catch {
           case _=> // the translation is a best effort thing.

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala?rev=1237540&r1=1237539&r2=1237540&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/path/PathParser.scala Mon Jan 30 04:59:15 2012
@@ -171,16 +171,21 @@ class PathParser {
     return new Path(subject.toList.map(decode_part(_)))
   }
 
-  def parts(subject: String): Array[String] = {
-    if(path_separator!=null) {
+  def parts(subject: String, sanitize:Boolean=false): Array[String] = {
+    val rc = if(path_separator!=null) {
       subject.split(Pattern.quote(path_separator))
     } else {
       Array(subject)
     }
+    if (sanitize) {
+      rc.map(sanitize_destination_part(_, true))
+    } else {
+      rc
+    }
   }
 
-  def decode_path(subject: String): Path = {
-    return decode_path(parts(subject))
+  def decode_path(subject: String, sanitize:Boolean=false): Path = {
+    return decode_path(parts(subject, sanitize))
   }
 
   def regex_map[T](text:String, pattern: Pattern)(func: Either[CharSequence, Matcher] => T) = {
@@ -240,25 +245,34 @@ class PathParser {
     * Converts the path back to the string representation.
     * @return
     */
-  def encode_path(path: Path): String = encode_path(path_parts(path))
+  def encode_path(path: Path, unsanitize_destinations:Boolean=false): String = encode_path_iter(path_parts(path))
 
-  def path_parts(path: Path):Array[String] = {
+  def path_parts(path: Path, unsanitize_destinations:Boolean=false):Array[String] = {
     (path.parts.map( _ match {
       case RootPart => ""
       case AnyChildPart => any_child_wildcard
       case AnyDescendantPart => any_descendant_wildcard
       case RegexChildPart(_, original) => original
-      case LiteralPart(value) => value
+      case LiteralPart(value) =>
+        if(unsanitize_destinations) {
+          unsanitize_destination_part(value)
+        } else {
+          value
+        }
     })).toArray
   }
 
-  def encode_path(parts: Iterable[String]): String = {
+  def encode_path_iter(parts: Iterable[String], unsanitize_destinations:Boolean=false): String = {
     var buffer: StringBuffer = new StringBuffer
     for (p <- parts) {
       if ( buffer.length() != 0) {
         buffer.append(path_separator)
       }
-      buffer.append(p)
+      if(unsanitize_destinations) {
+        buffer.append(unsanitize_destination_part(p))
+      } else {
+        buffer.append(p)
+      }
     }
     return buffer.toString
   }



Mime
View raw message