activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r734228 [1/2] - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/connector/openwire/ main/activemq/connector/stomp/ main/activemq/connector/stomp/commands/ main/activemq/connector/stomp/marshal/ main/activemq/transport/ main/activem...
Date Tue, 13 Jan 2009 20:14:45 GMT
Author: tabish
Date: Tue Jan 13 12:13:51 2009
New Revision: 734228

URL: http://svn.apache.org/viewvc?rev=734228&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-100

Some refactoring work that leads to V3 and addition of Failover.

Added:
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/commands/
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/commands/CommandConstants.cpp   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/commands/CommandConstants.h   (with props)
Removed:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireCommandReader.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireCommandReader.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireCommandWriter.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireCommandWriter.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompCommandReader.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompCommandReader.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompCommandWriter.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompCommandWriter.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompFrame.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/marshal/
    activemq/activemq-cpp/trunk/src/main/activemq/transport/CommandReader.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/CommandWriter.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/StompCommandReaderTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/StompCommandReaderTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/StompCommandWriterTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/StompCommandWriterTest.h
Modified:
    activemq/activemq-cpp/trunk/src/main/Makefile.am
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbortCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbstractCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AckCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BeginCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/CommitCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectedCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/DisconnectCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ErrorCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/MessageCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ReceiptCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompMessage.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/SubscribeCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompFrame.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompWireFormat.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompWireFormat.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/marshal/MarshalException.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/marshal/Marshalable.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/marshal/Marshaler.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/marshal/Marshaler.h
    activemq/activemq-cpp/trunk/src/test/Makefile.am
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/StompConnectorTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/StompFrameTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/AbortCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/AckCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/BeginCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/BytesMessageCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/CommitCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/ConnectCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/ConnectedCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/DisconnectCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/ErrorCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/MessageCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/ReceiptCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/SubscribeCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/TextMessageCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/commands/UnsubscribeCommandTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/connector/stomp/marshal/MarshalerTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h
    activemq/activemq-cpp/trunk/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Tue Jan 13 12:13:51 2009
@@ -41,8 +41,6 @@
     activemq/core/ActiveMQProducer.cpp \
     activemq/core/ActiveMQConnectionFactory.cpp \
     activemq/core/ActiveMQSessionExecutor.cpp \
-    activemq/connector/openwire/OpenWireCommandReader.cpp \
-    activemq/connector/openwire/OpenWireCommandWriter.cpp \
     activemq/connector/openwire/OpenWireConnector.cpp \
     activemq/connector/openwire/OpenWireConnectorFactory.cpp \
     activemq/connector/ConnectorFactoryMap.cpp \
@@ -71,6 +69,8 @@
     activemq/wireformat/stomp/StompWireFormat.cpp \
     activemq/wireformat/stomp/StompWireFormatFactory.cpp \
     activemq/wireformat/stomp/StompResponseBuilder.cpp \
+    activemq/wireformat/stomp/marshal/Marshaler.cpp \
+    activemq/wireformat/stomp/commands/CommandConstants.cpp \
     decaf/internal/DecafRuntime.cpp \
     decaf/internal/AprPool.cpp \
     decaf/internal/util/ByteArrayAdapter.cpp \
@@ -145,10 +145,7 @@
     decaf/nio/ShortBuffer.cpp \
     activemq/connector/stomp/StompSessionManager.cpp \
     activemq/connector/stomp/commands/CommandConstants.cpp \
-    activemq/connector/stomp/StompCommandReader.cpp \
-    activemq/connector/stomp/StompCommandWriter.cpp \
     activemq/connector/stomp/StompConnectorFactory.cpp \
-    activemq/connector/stomp/marshal/Marshaler.cpp \
     activemq/connector/stomp/StompConnector.cpp \
     activemq/connector/stomp/StompSelector.cpp \
     activemq/connector/stomp/StompConnectionNegotiator.cpp
@@ -193,8 +190,6 @@
     activemq/connector/ConnectorResourceListener.h \
     activemq/connector/ConnectorFactory.h \
     activemq/connector/BaseConnectorResource.h \
-    activemq/connector/openwire/OpenWireCommandReader.h \
-    activemq/connector/openwire/OpenWireCommandWriter.h \
     activemq/connector/openwire/OpenWireConsumerInfo.h \
     activemq/connector/openwire/OpenWireProducerInfo.h \
     activemq/connector/openwire/OpenWireSessionInfo.h \
@@ -213,8 +208,6 @@
     activemq/transport/Response.h \
     activemq/transport/Command.h \
     activemq/transport/CommandListener.h \
-    activemq/transport/CommandReader.h \
-    activemq/transport/CommandWriter.h \
     activemq/transport/CommandIOException.h \
     activemq/transport/IOTransport.h \
     activemq/transport/mock/MockTransport.h \
@@ -239,6 +232,11 @@
     activemq/wireformat/stomp/StompWireFormat.h \
     activemq/wireformat/stomp/StompWireFormatFactory.h \
     activemq/wireformat/stomp/StompResponseBuilder.h \
+    activemq/wireformat/stomp/StompFrame.h \
+    activemq/wireformat/stomp/marshal/Marshalable.h \
+    activemq/wireformat/stomp/marshal/MarshalException.h \
+    activemq/wireformat/stomp/marshal/Marshaler.h \
+    activemq/wireformat/stomp/commands/CommandConstants.h \
     activemq/wireformat/openwire/OpenWireFormat.h \
     activemq/wireformat/openwire/OpenWireFormatNegotiator.h \
     activemq/wireformat/openwire/OpenWireResponseBuilder.h \
@@ -435,7 +433,6 @@
     activemq/connector/stomp/StompSessionManager.h \
     activemq/connector/stomp/StompQueue.h \
     activemq/connector/stomp/StompConnectorFactory.h \
-    activemq/connector/stomp/StompFrame.h \
     activemq/connector/stomp/commands/DisconnectCommand.h \
     activemq/connector/stomp/commands/AbortCommand.h \
     activemq/connector/stomp/commands/ErrorCommand.h \
@@ -455,11 +452,6 @@
     activemq/connector/stomp/commands/MessageCommand.h \
     activemq/connector/stomp/commands/CommitCommand.h \
     activemq/connector/stomp/StompCommandListener.h \
-    activemq/connector/stomp/marshal/Marshalable.h \
-    activemq/connector/stomp/marshal/MarshalException.h \
-    activemq/connector/stomp/marshal/Marshaler.h \
-    activemq/connector/stomp/StompCommandReader.h \
-    activemq/connector/stomp/StompCommandWriter.h \
     activemq/connector/stomp/StompConnectorException.h \
     activemq/connector/stomp/StompConnectionNegotiator.h \
     activemq/connector/stomp/StompProducerInfo.h \

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Tue Jan 13 12:13:51 2009
@@ -123,21 +123,11 @@
     this->brokerInfo = NULL;
     this->brokerWireFormatInfo = NULL;
     this->properties.copy( &properties );
-    this->wireFormat = dynamic_cast<OpenWireFormat*>(
-        wireFormatFactory.createWireFormat( properties ) );
-    this->transport = this->wireFormat->createNegotiator( transport );
+    this->transport = transport;
 
     // Observe the transport for events.
     this->transport->setCommandListener( this );
     this->transport->setTransportExceptionListener( this );
-
-    // Setup the Reader and Writer with a Wire Format pointer.
-    this->reader.setOpenWireFormat( wireFormat );
-    this->writer.setOpenWireFormat( wireFormat );
-
-    // Setup the reader and writer in the transport.
-    this->transport->setCommandReader( &reader );
-    this->transport->setCommandWriter( &writer );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -146,8 +136,6 @@
 
         close();
 
-        delete transport;
-        delete wireFormat;
         delete brokerInfo;
         delete brokerWireFormatInfo;
     }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Tue Jan 13 12:13:51 2009
@@ -52,14 +52,9 @@
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/util/LongSequenceGenerator.h>
 
-#include <activemq/connector/openwire/OpenWireCommandReader.h>
-#include <activemq/connector/openwire/OpenWireCommandWriter.h>
 #include <activemq/connector/openwire/OpenWireConsumerInfo.h>
 #include <activemq/connector/openwire/OpenWireProducerInfo.h>
 
-#include <activemq/wireformat/openwire/OpenWireFormat.h>
-#include <activemq/wireformat/openwire/OpenWireFormatNegotiator.h>
-
 #include <activemq/commands/ActiveMQTempDestination.h>
 #include <activemq/commands/BrokerInfo.h>
 #include <activemq/commands/ConnectionInfo.h>
@@ -97,12 +92,6 @@
         transport::Transport* transport;
 
         /**
-         * The OpenWireFormat class that controls Protocol versions and
-         * marshaling details.
-         */
-        wireformat::openwire::OpenWireFormat* wireFormat;
-
-        /**
          * Connection Information for this connection to the Broker
          */
         commands::ConnectionInfo connectionInfo;
@@ -139,16 +128,6 @@
         cms::ExceptionListener* exceptionListener;
 
         /**
-         * This Connector's Command Reader
-         */
-        OpenWireCommandReader reader;
-
-        /**
-         * This Connector's Command Writer
-         */
-        OpenWireCommandWriter writer;
-
-        /**
          * Next available Producer Id
          */
         util::LongSequenceGenerator producerIds;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Tue Jan 13 12:13:51 2009
@@ -78,10 +78,6 @@
     this->transport->setCommandListener( this );
     this->transport->setTransportExceptionListener( this );
 
-    // Setup the reader and writer in the transport.
-    this->transport->setCommandReader( &reader );
-    this->transport->setCommandWriter( &writer );
-
     // Register ourself for those commands that we process
     addCmdListener( CommandConstants::ERROR_CMD, this );
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Tue Jan 13 12:13:51 2009
@@ -23,8 +23,6 @@
 #include <activemq/transport/Transport.h>
 #include <activemq/transport/CommandListener.h>
 #include <activemq/transport/TransportExceptionListener.h>
-#include <activemq/connector/stomp/StompCommandReader.h>
-#include <activemq/connector/stomp/StompCommandWriter.h>
 #include <activemq/connector/stomp/StompCommandListener.h>
 #include <activemq/connector/stomp/StompSessionManager.h>
 #include <activemq/connector/stomp/commands/CommandConstants.h>
@@ -85,16 +83,6 @@
         cms::ExceptionListener* exceptionListener;
 
         /**
-         * This Connector's Command Reader
-         */
-        StompCommandReader reader;
-
-        /**
-         * This Connector's Command Writer
-         */
-        StompCommandWriter writer;
-
-        /**
          * Map to hold StompCommandListeners
          */
         CmdListenerMap cmdListenerMap;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbortCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbortCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbortCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbortCommand.h Tue Jan 13 12:13:51 2009
@@ -40,7 +40,7 @@
                 initialize( getFrame() );
         }
 
-        AbortCommand( StompFrame* frame ) :
+        AbortCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Command>(frame) {
                 validate( getFrame() );
         }
@@ -62,7 +62,7 @@
          * frame with data appropriate for the command type.
          * @param frame the Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::ABORT ) );
         }
@@ -73,7 +73,7 @@
          * @param frame the Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
 
             if((frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::ABORT ) ) &&

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbstractCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbstractCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbstractCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AbstractCommand.h Tue Jan 13 12:13:51 2009
@@ -19,7 +19,7 @@
 #define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABSTRACTCOMMAND_H_
 
 #include <activemq/util/Config.h>
-#include <activemq/connector/stomp/StompFrame.h>
+#include <activemq/wireformat/stomp/StompFrame.h>
 #include <activemq/connector/stomp/commands/StompCommand.h>
 #include <activemq/transport/Command.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -47,11 +47,11 @@
     protected:
 
         // Frame that contains the actual message
-        StompFrame* frame;
+        wireformat::stomp::StompFrame* frame;
 
     protected:
 
-        StompFrame& getFrame() {
+        wireformat::stomp::StompFrame& getFrame() {
             if( frame == NULL ){
                 throw decaf::lang::exceptions::NullPointerException(
                     __FILE__, __LINE__,
@@ -61,7 +61,7 @@
             return *frame;
         }
 
-        const StompFrame& getFrame() const {
+        const wireformat::stomp::StompFrame& getFrame() const {
             if( frame == NULL ){
                 throw decaf::lang::exceptions::NullPointerException(
                     __FILE__, __LINE__,
@@ -99,7 +99,7 @@
          * frame with data appropriate for the command type.
          * @param frame the Frame to init
          */
-        virtual void initialize( StompFrame& frame ) = 0;
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) = 0;
 
         /**
          * Inheritors are required to override this method to validate
@@ -107,7 +107,7 @@
          * @param frame the Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const = 0;
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const = 0;
 
         /**
          * Returns a provider-specific string that provides information
@@ -142,9 +142,9 @@
     public:
 
         AbstractCommand(){
-            frame = new StompFrame;
+            frame = new wireformat::stomp::StompFrame;
         }
-        AbstractCommand( StompFrame* frame ){
+        AbstractCommand( wireformat::stomp::StompFrame* frame ){
             this->frame = frame;
         }
         virtual ~AbstractCommand(){
@@ -263,20 +263,20 @@
          * @throws MarshalException if the command is not
          * in a state that can be marshaled.
          */
-        virtual const StompFrame& marshal()
-            throw ( marshal::MarshalException ) {
+        virtual const wireformat::stomp::StompFrame& marshal()
+            throw ( wireformat::stomp::marshal::MarshalException ) {
 
             try{
                 if( frame == NULL || !validate( *frame ) ){
-                    throw marshal::MarshalException(
+                    throw wireformat::stomp::marshal::MarshalException(
                         __FILE__, __LINE__,
                         "AbstractCommand::marshal() - frame invalid" );
                 }
 
                 return getFrame();
             }
-            AMQ_CATCH_RETHROW( marshal::MarshalException )
-            AMQ_CATCHALL_THROW( marshal::MarshalException )
+            AMQ_CATCH_RETHROW( wireformat::stomp::marshal::MarshalException )
+            AMQ_CATCHALL_THROW( wireformat::stomp::marshal::MarshalException )
         }
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AckCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AckCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AckCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/AckCommand.h Tue Jan 13 12:13:51 2009
@@ -44,7 +44,7 @@
                 initialize( getFrame() );
         }
 
-        AckCommand( StompFrame* frame ) :
+        AckCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Command>( frame ) {
                 validate( getFrame() );
         }
@@ -87,7 +87,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::ACK ) );
         }
@@ -98,7 +98,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
 
             // Make sure the message is an ACK message.
             bool isAck = frame.getCommand() ==

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BeginCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BeginCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BeginCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BeginCommand.h Tue Jan 13 12:13:51 2009
@@ -45,7 +45,7 @@
                 initialize( getFrame() );
         }
 
-        BeginCommand( StompFrame* frame ) :
+        BeginCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Command>( frame ) {
                 validate( getFrame() );
         }
@@ -67,7 +67,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::BEGIN ) );
         }
@@ -78,7 +78,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
 
             if((frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::BEGIN )) &&

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h Tue Jan 13 12:13:51 2009
@@ -89,7 +89,7 @@
          * Constructor for initialization in read-only mode.
          * @param frame The stomp frame that was received from the broker.
          */
-        BytesMessageCommand( StompFrame* frame ) :
+        BytesMessageCommand( wireformat::stomp::StompFrame* frame ) :
             StompMessage< cms::BytesMessage >( frame ),
             dataInputStream(&inputStream),
             dataOutputStream(&outputStream) {
@@ -130,7 +130,8 @@
          * @throws MarshalException if the command is not
          * in a state that can be marshaled.
          */
-        virtual const StompFrame& marshal() throw ( marshal::MarshalException ) {
+        virtual const wireformat::stomp::StompFrame& marshal()
+            throw ( wireformat::stomp::marshal::MarshalException ) {
 
             try{
                 // Before we send out the frame tag it with the content length
@@ -143,8 +144,8 @@
 
                 return StompMessage<cms::BytesMessage>::marshal();
             }
-            AMQ_CATCH_RETHROW( marshal::MarshalException )
-            AMQ_CATCHALL_THROW( marshal::MarshalException )
+            AMQ_CATCH_RETHROW( wireformat::stomp::marshal::MarshalException )
+            AMQ_CATCHALL_THROW( wireformat::stomp::marshal::MarshalException )
         }
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/CommitCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/CommitCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/CommitCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/CommitCommand.h Tue Jan 13 12:13:51 2009
@@ -40,7 +40,7 @@
                 initialize( getFrame() );
         }
 
-        CommitCommand( StompFrame* frame ) :
+        CommitCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Command>( frame ) {
                 validate( getFrame() );
         }
@@ -62,7 +62,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::COMMIT ) );
         }
@@ -73,7 +73,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
             if((frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::COMMIT )) &&
                (frame.getProperties().hasProperty(

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectCommand.h Tue Jan 13 12:13:51 2009
@@ -39,7 +39,7 @@
                 initialize( getFrame() );
         }
 
-        ConnectCommand( StompFrame* frame ) :
+        ConnectCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Command>( frame ) {
                 validate( getFrame() );
         }
@@ -135,7 +135,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::CONNECT ) );
         }
@@ -146,7 +146,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
             if( frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::CONNECT ) ) {
                 return true;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectedCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectedCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectedCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ConnectedCommand.h Tue Jan 13 12:13:51 2009
@@ -40,7 +40,7 @@
                 initialize( getFrame() );
         }
 
-        ConnectedCommand( StompFrame* frame ) :
+        ConnectedCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand< transport::Command >( frame ) {
                 validate( getFrame() );
         }
@@ -88,7 +88,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::CONNECTED ) );
         }
@@ -99,7 +99,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
             if( frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::CONNECTED ) ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/DisconnectCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/DisconnectCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/DisconnectCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/DisconnectCommand.h Tue Jan 13 12:13:51 2009
@@ -40,7 +40,7 @@
                 initialize( getFrame() );
         }
 
-        DisconnectCommand( StompFrame* frame ) :
+        DisconnectCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Command>( frame ) {
                 validate( getFrame() );
         }
@@ -62,7 +62,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::DISCONNECT ) );
         }
@@ -73,7 +73,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
             if( frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::DISCONNECT ) ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ErrorCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ErrorCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ErrorCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ErrorCommand.h Tue Jan 13 12:13:51 2009
@@ -39,7 +39,7 @@
                 initialize( getFrame() );
         }
 
-        ErrorCommand( StompFrame* frame ) :
+        ErrorCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Command>( frame ) {
                 validate( getFrame() );
         }
@@ -104,7 +104,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::ERROR_CMD ) );
         }
@@ -115,7 +115,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
             if((frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::ERROR_CMD ) ) &&
                (frame.getProperties().hasProperty(

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/MessageCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/MessageCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/MessageCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/MessageCommand.h Tue Jan 13 12:13:51 2009
@@ -40,7 +40,7 @@
                 initialize( getFrame() );
         }
 
-        MessageCommand( StompFrame* frame ) :
+        MessageCommand( wireformat::stomp::StompFrame* frame ) :
             StompMessage< cms::Message >( frame ) {
                 validate( getFrame() );
         }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ReceiptCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ReceiptCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ReceiptCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/ReceiptCommand.h Tue Jan 13 12:13:51 2009
@@ -40,7 +40,7 @@
                 initialize( getFrame() );
         }
 
-        ReceiptCommand( StompFrame* frame ) :
+        ReceiptCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand<transport::Response>( frame ) {
                 validate( getFrame() );
         }
@@ -82,7 +82,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::RECEIPT ) );
         }
@@ -93,7 +93,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
             if((frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::RECEIPT )) &&
                (frame.getProperties().hasProperty(

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompCommand.h Tue Jan 13 12:13:51 2009
@@ -19,8 +19,8 @@
 #define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_STOMPCOMMAND_H_
 
 #include <activemq/connector/stomp/commands/CommandConstants.h>
-#include <activemq/connector/stomp/marshal/Marshalable.h>
-#include <activemq/connector/stomp/marshal/MarshalException.h>
+#include <activemq/wireformat/stomp/marshal/Marshalable.h>
+#include <activemq/wireformat/stomp/marshal/MarshalException.h>
 #include <activemq/util/Config.h>
 
 namespace activemq{
@@ -28,7 +28,7 @@
 namespace stomp{
 namespace commands{
 
-    class AMQCPP_API StompCommand : public marshal::Marshalable {
+    class AMQCPP_API StompCommand : public wireformat::stomp::marshal::Marshalable {
     protected:
 
         /**
@@ -36,7 +36,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) = 0;
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) = 0;
 
         /**
          * Inheritors are required to override this method to validate
@@ -44,7 +44,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const = 0;
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const = 0;
 
     public:
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompMessage.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompMessage.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompMessage.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/StompMessage.h Tue Jan 13 12:13:51 2009
@@ -77,7 +77,7 @@
             replyTo( NULL) {
         }
 
-        StompMessage( StompFrame* frame ) :
+        StompMessage( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand< transport::Command >( frame ),
             ackHandler( NULL ),
             dest( NULL ),
@@ -731,18 +731,18 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::SEND ) );
         }
 
         /**
          * Inheritors are required to override this method to validate
-         * the passed stomp frame before it is marshalled or unmarshaled
+         * the passed stomp frame before it is marshaled or un-marshaled
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
 
             if(frame.getCommand() ==
                CommandConstants::toString( CommandConstants::SEND ) ) {

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/SubscribeCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/SubscribeCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/SubscribeCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/SubscribeCommand.h Tue Jan 13 12:13:51 2009
@@ -41,12 +41,12 @@
                 initialize( getFrame() );
         }
 
-        SubscribeCommand( StompFrame* frame ) :
+        SubscribeCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand< transport::Command >( frame ) {
                 validate( getFrame() );
         }
 
-        virtual ~SubscribeCommand(void) {}
+        virtual ~SubscribeCommand() {}
 
         /**
          * Clone the StompCommand and return the new copy.
@@ -332,7 +332,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
 
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::SUBSCRIBE ) );
@@ -350,7 +350,7 @@
          * @param frame Frame to validate
          * @returns frame true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
 
             if((frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::SUBSCRIBE )) &&

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/TextMessageCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/TextMessageCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/TextMessageCommand.h Tue Jan 13 12:13:51 2009
@@ -37,7 +37,7 @@
                 initialize( getFrame() );
         }
 
-        TextMessageCommand( StompFrame* frame ) :
+        TextMessageCommand( wireformat::stomp::StompFrame* frame ) :
             StompMessage< cms::TextMessage >( frame ) {
                 validate( getFrame() );
         }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h Tue Jan 13 12:13:51 2009
@@ -40,7 +40,7 @@
                 initialize( getFrame() );
         }
 
-        UnsubscribeCommand( StompFrame* frame ) :
+        UnsubscribeCommand( wireformat::stomp::StompFrame* frame ) :
             AbstractCommand< transport::Command >( frame ) {
                 validate( getFrame() );
         }
@@ -83,7 +83,7 @@
          * frame with data appropriate for the command type.
          * @param frame Frame to init
          */
-        virtual void initialize( StompFrame& frame ) {
+        virtual void initialize( wireformat::stomp::StompFrame& frame ) {
             frame.setCommand( CommandConstants::toString(
                 CommandConstants::UNSUBSCRIBE ) );
         }
@@ -94,7 +94,7 @@
          * @param frame Frame to validate
          * @returns true if frame is valid
          */
-        virtual bool validate( const StompFrame& frame ) const {
+        virtual bool validate( const wireformat::stomp::StompFrame& frame ) const {
             if((frame.getCommand() ==
                 CommandConstants::toString( CommandConstants::UNSUBSCRIBE )) &&
                (frame.getProperties().hasProperty(

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp Tue Jan 13 12:13:51 2009
@@ -49,7 +49,24 @@
 
         WireFormat* wireFormat = this->createWireFormat( properties );
 
-        return this->doCreate( location, wireFormat, properties );
+        // Create the initial Transport, then wrap it in the normal Filters
+        Transport* transport = doCreateComposite( location, wireFormat, properties );
+
+        // If there is a negotiator need then we create and wrap here.
+        if( wireFormat->hasNegotiator() ) {
+            transport = wireFormat->createNegotiator( transport );
+        }
+
+        // Create the Transport for response correlator
+        transport = new ResponseCorrelator( transport );
+
+        // If command tracing was enabled, wrap the transport with a logging transport.
+        if( properties.getProperty( "transport.commandTracingEnabled", "false" ) == "true" ) {
+            // Create the Transport for response correlator
+            transport = new LoggingTransport( transport );
+        }
+
+        return transport;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -66,32 +83,12 @@
 
         WireFormat* wireFormat = this->createWireFormat( properties );
 
-        return doCreateComposite( location, wireFormat, properties );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Transport* AbstractTransportFactory::doCreate( const decaf::net::URI& location,
-                                               WireFormat* wireFormat,
-                                               const decaf::util::Properties& properties )
-    throw ( exceptions::ActiveMQException ) {
-
-    try{
-
         // Create the initial Transport, then wrap it in the normal Filters
-        Transport* transport = doCreateComposite(
-            location, wireFormat, properties );
-
-        // Create the Transport for response correlator
-        transport = new ResponseCorrelator( transport );
+        Transport* transport = doCreateComposite( location, wireFormat, properties );
 
-        // If command tracing was enabled, wrap the transport with a logging transport.
-        if( properties.getProperty( "transport.commandTracingEnabled", "false" ) == "true" ) {
-            // Create the Transport for response correlator
-            transport = new LoggingTransport( transport );
+        // If there is a negotiator need then we create and wrap here.
+        if( wireFormat->hasNegotiator() ) {
+            transport = wireFormat->createNegotiator( transport );
         }
 
         return transport;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h Tue Jan 13 12:13:51 2009
@@ -61,21 +61,10 @@
     protected:
 
         /**
-         * Creates a fully configured Transport instance which could be a chain
-         * of filters and transports.
-         * @param location - URI location to connect to.
-         * @param wireformat - the assigned WireFormat for the new Transport.
-         * @param properties - Properties to apply to the transport.
-         * @throws ActiveMQexception if an error occurs
-         */
-        virtual Transport* doCreate( const decaf::net::URI& location,
-                                     wireformat::WireFormat* wireFormat,
-                                     const decaf::util::Properties& properties )
-            throw ( exceptions::ActiveMQException );
-
-        /**
          * Creates a slimed down Transport instance which can be used in composite
-         * transport instances.
+         * transport instances or as the basis for a fully wrapped Transport.  This
+         * method must be implemented by the actual TransportFactory that extends this
+         * abstract base class.
          * @param location - URI location to connect to.
          * @param wireformat - the assigned WireFormat for the new Transport.
          * @param properties - Properties to apply to the transport.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp Tue Jan 13 12:13:51 2009
@@ -16,17 +16,17 @@
  */
 
 #include "IOTransport.h"
-#include "CommandReader.h"
-#include "CommandWriter.h"
 
 #include <decaf/util/concurrent/Concurrent.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <activemq/wireformat/WireFormat.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/util/Config.h>
 
 using namespace activemq;
 using namespace activemq::transport;
 using namespace activemq::exceptions;
+using namespace activemq::wireformat;
 using namespace decaf::lang;
 using namespace decaf::util::concurrent;
 
@@ -36,14 +36,25 @@
 ////////////////////////////////////////////////////////////////////////////////
 IOTransport::IOTransport(){
 
-    listener = NULL;
-    reader = NULL;
-    writer = NULL;
-    exceptionListener = NULL;
-    inputStream = NULL;
-    outputStream = NULL;
-    closed = false;
-    thread = NULL;
+    this->listener = NULL;
+    this->exceptionListener = NULL;
+    this->inputStream = NULL;
+    this->outputStream = NULL;
+    this->closed = false;
+    this->thread = NULL;
+    this->wireFormat = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+IOTransport::IOTransport( WireFormat* wireFormat ) {
+
+    this->listener = NULL;
+    this->exceptionListener = NULL;
+    this->inputStream = NULL;
+    this->outputStream = NULL;
+    this->closed = false;
+    this->thread = NULL;
+    this->wireFormat = wireFormat;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -79,7 +90,7 @@
                 "IOTransport::oneway() - attempting to write NULL command" );
         }
 
-        // Make sure we have an output strema to write to.
+        // Make sure we have an output stream to write to.
         if( outputStream == NULL ){
             throw CommandIOException(
                 __FILE__, __LINE__,
@@ -88,7 +99,7 @@
 
         synchronized( outputStream ){
             // Write the command to the output stream.
-            writer->writeCommand( command );
+            this->wireFormat->marshal( command, this->outputStream );
         }
     }
     AMQ_CATCH_RETHROW( CommandIOException )
@@ -114,18 +125,13 @@
         }
 
         // Make sure all variables that we need have been set.
-        if( inputStream == NULL || outputStream == NULL ||
-            reader == NULL || writer == NULL ){
+        if( inputStream == NULL || outputStream == NULL || wireFormat == NULL ){
             throw ActiveMQException(
                 __FILE__, __LINE__,
                 "IOTransport::start() - "
-                "IO sreams and reader/writer must be set before calling start" );
+                "IO streams and wireFormat instances must be set before calling start" );
         }
 
-        // Init the Command Reader and Writer with the Streams
-        reader->setInputStream( inputStream );
-        writer->setOutputStream( outputStream );
-
         // Start the polling thread.
         thread = new Thread( this );
         thread->start();
@@ -153,7 +159,6 @@
         // (which is likely).  Otherwise, the join that
         // follows will block forever.
         if( inputStream != NULL ){
-
             inputStream->close();
             inputStream = NULL;
         }
@@ -167,10 +172,12 @@
 
         // Close the output stream.
         if( outputStream != NULL ){
-
             outputStream->close();
             outputStream = NULL;
         }
+
+        // Clear the WireFormat so we can't use it anymore
+        this->wireFormat = NULL;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -185,12 +192,11 @@
         while( !closed ){
 
             // Read the next command from the input stream.
-            Command* command = reader->readCommand();
+            Command* command = wireFormat->unmarshal( this->inputStream );
 
             // Notify the listener.
             fire( command );
         }
-
     }
     catch( exceptions::ActiveMQException& ex ){
         ex.setMark( __FILE__, __LINE__ );

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Tue Jan 13 12:13:51 2009
@@ -22,19 +22,24 @@
 #include <activemq/transport/Transport.h>
 #include <activemq/transport/TransportExceptionListener.h>
 #include <activemq/transport/CommandListener.h>
-#include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/transport/Command.h>
+#include <activemq/exceptions/ActiveMQException.h>
 
 #include <decaf/lang/Runnable.h>
 #include <decaf/lang/Thread.h>
+#include <decaf/io/DataInputStream.h>
+#include <decaf/io/DataOutputStream.h>
 #include <decaf/util/logging/LoggerDefines.h>
 
 namespace activemq{
+namespace wireformat{
+    class WireFormat;
+}
 namespace transport{
 
     /**
      * Implementation of the Transport interface that performs
-     * marshalling of commands to IO streams.  This class does not
+     * marshaling of commands to IO streams.  This class does not
      * implement the request method, it only handles oneway messages.
      * A thread polls on the input stream for in-coming commands.  When
      * a command is received, the command listener is notified.  The
@@ -57,14 +62,9 @@
         CommandListener* listener;
 
         /**
-         * Reads commands from the input stream.
-         */
-        CommandReader* reader;
-
-        /**
-         * Writes commands to the output stream.
+         * WireFormat instance to use to Encode / Decode.
          */
-        CommandWriter* writer;
+        wireformat::WireFormat* wireFormat;
 
         /**
          * Listener of exceptions from this transport.
@@ -74,12 +74,12 @@
         /**
          * The input stream for incoming commands.
          */
-        decaf::io::InputStream* inputStream;
+        decaf::io::DataInputStream* inputStream;
 
         /**
          * The output stream for out-going commands.
          */
-        decaf::io::OutputStream* outputStream;
+        decaf::io::DataOutputStream* outputStream;
 
         /**
          * The polling thread.
@@ -94,7 +94,7 @@
     private:
 
         /**
-         * Notify the excpetion listener
+         * Notify the exception listener
          * @param ex the exception to send
          */
         void fire( decaf::lang::Exception& ex ){
@@ -134,7 +134,19 @@
 
     public:
 
+        /**
+         * Default Constructor
+         */
         IOTransport();
+
+        /**
+         * Create an instance of this Transport and assign its WireFormat instance
+         * at creation time.
+         * @param wireFormat
+         *        Data encoder / decoder to use when reading and writing.
+         */
+        IOTransport( wireformat::WireFormat* wireFormat );
+
         virtual ~IOTransport();
 
         /**
@@ -177,19 +189,11 @@
         }
 
         /**
-         * Sets the command reader.
-         * @param reader the object that will be used for reading command objects.
-         */
-        virtual void setCommandReader( CommandReader* reader ){
-            this->reader = reader;
-        }
-
-        /**
-         * Sets the command writer.
-         * @param writer the object that will be used for writing command objects.
+         * Sets the WireFormat instance to use.
+         * @param WireFormat the object used to encode / decode commands.
          */
-        virtual void setCommandWriter( CommandWriter* writer ){
-            this->writer = writer;
+        virtual void setWireFormat( wireformat::WireFormat* wireFormat ){
+            this->wireFormat = wireFormat;
         }
 
         /**
@@ -204,7 +208,7 @@
          * Sets the input stream for in-coming commands.
          * @param is The input stream.
          */
-        virtual void setInputStream( decaf::io::InputStream* is ){
+        virtual void setInputStream( decaf::io::DataInputStream* is ){
             this->inputStream = is;
         }
 
@@ -212,7 +216,7 @@
          * Sets the output stream for out-going commands.
          * @param os The output stream.
          */
-        virtual void setOutputStream( decaf::io::OutputStream* os ){
+        virtual void setOutputStream( decaf::io::DataOutputStream* os ){
             this->outputStream = os;
         }
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Tue Jan 13 12:13:51 2009
@@ -30,12 +30,13 @@
 #include <typeinfo>
 
 namespace activemq{
+namespace wireformat{
+    class WireFormat;
+}
 namespace transport{
 
     // Forward declarations.
     class CommandListener;
-    class CommandReader;
-    class CommandWriter;
     class TransportExceptionListener;
 
     /**
@@ -44,10 +45,9 @@
      * messages will be delivered to the specified listener object upon
      * receipt.  A user of the Transport can set an exception listener
      * to be notified of errors that occurs in Threads that the Transport
-     * layer runs.  Since a Transport doesn't know the Wire Format of the
-     * Commands it reads and writes, its up to the managing object to
-     * provide object(s) that implement the CommandReader and CommandWriter
-     * interfaces.
+     * layer runs.  Transports should be given an instance of a WireFormat
+     * object when created so that they can turn the built in Commands to /
+     * from the required wire format encoding.
      */
     class AMQCPP_API Transport : public cms::Startable,
                                  public cms::Closeable {
@@ -102,16 +102,10 @@
         virtual void setCommandListener( CommandListener* listener ) = 0;
 
         /**
-         * Sets the command reader.
-         * @param reader the object that will be used for reading command objects.
+         * Sets the WireFormat instance to use.
+         * @param WireFormat the object used to encode / decode commands.
          */
-        virtual void setCommandReader( CommandReader* reader ) = 0;
-
-        /**
-         * Sets the command writer.
-         * @param writer the object that will be used for writing command objects.
-         */
-        virtual void setCommandWriter( CommandWriter* writer ) = 0;
+        virtual void setWireFormat( wireformat::WireFormat* wireFormat ) = 0;
 
         /**
          * Sets the observer of asynchronous exceptions from this transport.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp Tue Jan 13 12:13:51 2009
@@ -48,9 +48,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::onTransportException(
-    Transport* source AMQCPP_UNUSED,
-    const decaf::lang::Exception& ex ) {
+void TransportFilter::onTransportException( Transport* source AMQCPP_UNUSED,
+                                            const decaf::lang::Exception& ex ) {
 
     fire( ex );
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Tue Jan 13 12:13:51 2009
@@ -165,22 +165,6 @@
         }
 
         /**
-         * Sets the command reader.
-         * @param reader the object that will be used for reading command objects.
-         */
-        virtual void setCommandReader( CommandReader* reader ){
-            next->setCommandReader( reader );
-        }
-
-        /**
-         * Sets the command writer.
-         * @param writer the object that will be used for writing command objects.
-         */
-        virtual void setCommandWriter( CommandWriter* writer ){
-            next->setCommandWriter( writer );
-        }
-
-        /**
          * Sets the observer of asynchronous exceptions from this transport.
          * @param listener the listener of transport exceptions.
          */
@@ -189,6 +173,14 @@
         }
 
         /**
+         * Sets the WireFormat instance to use.
+         * @param WireFormat the object used to encode / decode commands.
+         */
+        virtual void setWireFormat( wireformat::WireFormat* wireFormat ) {
+            next->setWireFormat( wireFormat );
+        }
+
+        /**
          * Starts this transport object and creates the thread for
          * polling on the input stream for commands.  If this object
          * has been closed, throws an exception.  Before calling start,

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h Tue Jan 13 12:13:51 2009
@@ -230,9 +230,11 @@
             outgoingCommandListener = listener;
         }
 
-        // Not impl is needed in this transport for these methods.
-        virtual void setCommandReader( CommandReader* reader AMQCPP_UNUSED){}
-        virtual void setCommandWriter( CommandWriter* writer AMQCPP_UNUSED){}
+        /**
+         * Sets the WireFormat instance to use.
+         * @param WireFormat the object used to encode / decode commands.
+         */
+        virtual void setWireFormat( wireformat::WireFormat* wireFormat AMQCPP_UNUSED ) {}
 
         virtual void setTransportExceptionListener(
             TransportExceptionListener* listener )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp Tue Jan 13 12:13:51 2009
@@ -39,10 +39,8 @@
                             Transport* next, const bool own )
 :   TransportFilter( next, own ),
     socket( NULL ),
-    loggingInputStream( NULL ),
-    loggingOutputStream( NULL ),
-    bufferedInputStream( NULL ),
-    bufferedOutputStream( NULL )
+    dataInputStream( NULL ),
+    dataOutputStream( NULL )
 {
     this->initialize( uri, properties );
 }
@@ -52,10 +50,8 @@
                             Transport* next, const bool own )
 :   TransportFilter( next, own ),
     socket( NULL ),
-    loggingInputStream( NULL ),
-    loggingOutputStream( NULL ),
-    bufferedInputStream( NULL ),
-    bufferedOutputStream( NULL )
+    dataInputStream( NULL ),
+    dataOutputStream( NULL )
 {
     if( !properties.hasProperty( "transport.uri" ) ) {
         throw ActiveMQException(
@@ -74,32 +70,18 @@
 
         try{
             close();
-        } catch( cms::CMSException& ex ){ /* Absorb */ }
+        }
+        AMQ_CATCH_NOTHROW( ActiveMQException )
+        AMQ_CATCH_NOTHROW( Exception )
+        AMQ_CATCHALL_NOTHROW()
 
         if( socket != NULL ) {
             delete socket;
             socket = NULL;
         }
 
-        if( loggingInputStream != NULL ) {
-            delete loggingInputStream;
-            loggingInputStream = NULL;
-        }
-
-        if( loggingOutputStream != NULL ) {
-            delete loggingOutputStream;
-            loggingOutputStream = NULL;
-        }
-
-        if( bufferedInputStream != NULL ) {
-            delete bufferedInputStream;
-            bufferedInputStream = NULL;
-        }
-
-        if( bufferedOutputStream != NULL ) {
-            delete bufferedOutputStream;
-            bufferedOutputStream = NULL;
-        }
+        delete this->dataInputStream;
+        delete this->dataOutputStream;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCH_NOTHROW( Exception )
@@ -150,20 +132,31 @@
 
         // If tcp tracing was enabled, wrap the iostreams with logging streams
         if( properties.getProperty( "transport.tcpTracingEnabled", "false" ) == "true" ) {
-            loggingInputStream = new LoggingInputStream( inputStream );
-            loggingOutputStream = new LoggingOutputStream( outputStream );
-
-            inputStream = loggingInputStream;
-            outputStream = loggingOutputStream;
-        }
 
-        // Now wrap the input/output streams with buffered streams
-        bufferedInputStream = new BufferedInputStream(inputStream);
-        bufferedOutputStream = new BufferedOutputStream(outputStream);
+            // Wrap with logging stream, we don't own the wrapped streams
+            inputStream = new LoggingInputStream( inputStream );
+            outputStream = new LoggingOutputStream( outputStream );
+
+            // Now wrap with the Buffered streams, we own the source streams
+            inputStream = new BufferedInputStream( inputStream, true );
+            outputStream = new BufferedOutputStream( outputStream, true );
+
+        } else {
+
+            // Wrap with the Buffered streams, we don't own the source streams
+            inputStream = new BufferedInputStream( inputStream );
+            outputStream = new BufferedOutputStream( outputStream );
+        }
+
+        // Now wrap the Buffered Streams with DataInput based streams.  We own
+        // the Source streams, all the streams in the chain that we own are
+        // destroyed when these are.
+        this->dataInputStream = new DataInputStream( inputStream, true );
+        this->dataOutputStream = new DataOutputStream( outputStream, true );
 
         // Give the IOTransport the streams.
-        ioTransport->setInputStream( bufferedInputStream );
-        ioTransport->setOutputStream( bufferedOutputStream );
+        ioTransport->setInputStream( dataInputStream );
+        ioTransport->setOutputStream( dataOutputStream );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h Tue Jan 13 12:13:51 2009
@@ -27,6 +27,8 @@
 #include <activemq/io/LoggingOutputStream.h>
 #include <decaf/io/BufferedInputStream.h>
 #include <decaf/io/BufferedOutputStream.h>
+#include <decaf/io/DataInputStream.h>
+#include <decaf/io/DataOutputStream.h>
 
 namespace activemq{
 namespace transport{
@@ -35,7 +37,7 @@
     /**
      * Implements a TCP/IP based transport filter, this transport
      * is meant to wrap an instance of an IOTransport.  The lower
-     * level transport should take care of manaing stream reads
+     * level transport should take care of managing stream reads
      * and writes.
      */
     class AMQCPP_API TcpTransport : public TransportFilter {
@@ -46,11 +48,15 @@
          */
         decaf::net::Socket* socket;
 
-        io::LoggingInputStream* loggingInputStream;
-        io::LoggingOutputStream* loggingOutputStream;
+        /**
+         * Input Stream for Reading in Messages
+         */
+        decaf::io::DataInputStream* dataInputStream;
 
-        decaf::io::BufferedInputStream* bufferedInputStream;
-        decaf::io::BufferedOutputStream* bufferedOutputStream;
+        /**
+         * Output Stream for Writing out Messages.
+         */
+        decaf::io::DataOutputStream* dataOutputStream;
 
     public:
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp Tue Jan 13 12:13:51 2009
@@ -33,7 +33,7 @@
     throw ( exceptions::ActiveMQException ) {
 
     try {
-        return new TcpTransport( location, properties, new IOTransport() );
+        return new TcpTransport( location, properties, new IOTransport( wireFormat ) );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h Tue Jan 13 12:13:51 2009
@@ -18,6 +18,8 @@
 #ifndef _ACTIVEMQ_WIREFORMAT_WIREFORMAT_H_
 #define _ACTIVEMQ_WIREFORMAT_WIREFORMAT_H_
 
+#include <activemq/wireformat/WireFormatNegotiator.h>
+
 #include <decaf/io/DataInputStream.h>
 #include <decaf/io/DataOutputStream.h>
 #include <decaf/io/IOException.h>
@@ -31,8 +33,6 @@
 namespace activemq{
 namespace wireformat{
 
-    class WireFormatNegotiator;
-
     /**
      * Provides a mechanism to marshal commands into and out of packets
      * or into and out of streams, Channels and Datagrams.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompFrame.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompFrame.h?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompFrame.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompFrame.h Tue Jan 13 12:13:51 2009
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_
-#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_
+#ifndef _ACTIVEMQ_WIREFORMAT_STOMP_STOMPFRAMEWRAPPER_H_
+#define _ACTIVEMQ_WIREFORMAT_STOMP_STOMPFRAMEWRAPPER_H_
 
 #include <string>
 #include <string.h>
@@ -25,7 +25,7 @@
 #include <activemq/util/Config.h>
 
 namespace activemq{
-namespace connector{
+namespace wireformat{
 namespace stomp{
 
     /**
@@ -142,4 +142,4 @@
 
 }}}
 
-#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_*/
+#endif /*_ACTIVEMQ_WIREFORMAT_STOMP_STOMPFRAMEWRAPPER_H_*/

Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompWireFormat.cpp?rev=734228&r1=734227&r2=734228&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompWireFormat.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/stomp/StompWireFormat.cpp Tue Jan 13 12:13:51 2009
@@ -17,10 +17,18 @@
 
 #include "StompWireFormat.h"
 
+#include <activemq/wireformat/stomp/StompFrame.h>
+#include <activemq/wireformat/stomp/commands/CommandConstants.h>
+#include <decaf/lang/Character.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/io/IOException.h>
+#include <memory>
+
 using namespace std;
 using namespace activemq;
 using namespace activemq::wireformat;
 using namespace activemq::wireformat::stomp;
+using namespace activemq::wireformat::stomp::commands;
 using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::lang;
@@ -38,13 +46,87 @@
 void StompWireFormat::marshal( transport::Command* command, decaf::io::DataOutputStream* out )
     throw ( decaf::io::IOException ) {
 
+    try{
+
+        if( out == NULL ) {
+            throw decaf::io::IOException(
+                __FILE__, __LINE__,
+                "StompCommandWriter::writeCommand - "
+                "output stream is NULL" );
+        }
+
+        const StompFrame& frame = marshaler.marshal( command );
+
+        // Write the command.
+        const string& cmdString = frame.getCommand();
+        out->write( (unsigned char*)cmdString.c_str(), 0, cmdString.length() );
+        out->writeByte( '\n' );
+
+        // Write all the headers.
+        vector< pair<string,string> > headers = frame.getProperties().toArray();
+        for( std::size_t ix=0; ix < headers.size(); ++ix ) {
+            string& name = headers[ix].first;
+            string& value = headers[ix].second;
+
+            out->write( (unsigned char*)name.c_str(), 0, name.length() );
+            out->writeByte( ':' );
+            out->write( (unsigned char*)value.c_str(), 0, value.length() );
+            out->writeByte( '\n' );
+        }
+
+        // Finish the header section with a form feed.
+        out->writeByte( '\n' );
+
+        // Write the body.
+        const std::vector<unsigned char>& body = frame.getBody();
+        if( body.size() > 0 ) {
+            out->write( &body[0], 0, body.size() );
+        }
+
+        if( ( frame.getBodyLength() == 0 ) ||
+            ( frame.getProperties().getProperty(
+                  CommandConstants::toString(
+                      CommandConstants::HEADER_CONTENTLENGTH ), "" ) != "" ) ) {
+
+            out->writeByte( '\0' );
+        }
+
+        out->writeByte( '\n' );
+
+        // Flush the stream.
+        out->flush();
+    }
+    AMQ_CATCH_RETHROW( decaf::io::IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, decaf::io::IOException )
+    AMQ_CATCHALL_THROW( decaf::io::IOException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 transport::Command* StompWireFormat::unmarshal( decaf::io::DataInputStream* in )
     throw ( decaf::io::IOException ) {
 
-    return NULL;
+    auto_ptr<StompFrame> frame;
+
+    try{
+
+        // Create a new Frame for reading to.
+        frame.reset( new StompFrame() );
+
+        // Read the command header.
+        readStompCommandHeader( *( frame.get() ), in );
+
+        // Read the headers.
+        readStompHeaders( *( frame.get() ), in );
+
+        // Read the body.
+        readStompBody( *( frame.get() ), in );
+
+        // Return the Command, caller must delete it.
+        return marshaler.marshal( frame.release() );
+    }
+    AMQ_CATCH_RETHROW( decaf::io::IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, decaf::io::IOException )
+    AMQ_CATCHALL_THROW( decaf::io::IOException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -54,3 +136,221 @@
     throw UnsupportedOperationException( __FILE__, __LINE__,
         "No Negotiator is required to use this WireFormat." );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void StompWireFormat::readStompCommandHeader( StompFrame& frame, decaf::io::DataInputStream* in )
+   throw ( decaf::io::IOException ) {
+
+    try{
+
+        while( true ) {
+
+            // The command header is formatted
+            // just like any other stomp header.
+            readStompHeaderLine( in );
+
+            // Ignore all white space before the command.
+            long long offset = -1;
+            for( size_t ix = 0; ix < buffer.size()-1; ++ix ) {
+
+                // Find the first non whitespace character
+                if( !Character::isWhitespace(buffer[ix]) ){
+                    offset = (long long)ix;
+                    break;
+                }
+            }
+
+            if( offset >= 0 ) {
+                // Set the command in the frame - copy the memory.
+                frame.setCommand( reinterpret_cast<char*>(&buffer[(size_t)offset]) );
+                break;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( decaf::io::IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, decaf::io::IOException )
+    AMQ_CATCHALL_THROW( decaf::io::IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompWireFormat::readStompHeaders( StompFrame& frame, decaf::io::DataInputStream* in )
+    throw ( decaf::io::IOException ) {
+
+    try{
+
+        // Read the command;
+        bool endOfHeaders = false;
+
+        while( !endOfHeaders ) {
+
+            // Read in the next header line.
+            std::size_t numChars = readStompHeaderLine( in );
+
+            if( numChars == 0 ) {
+
+                // should never get here
+                throw decaf::io::IOException(
+                    __FILE__, __LINE__,
+                    "StompWireFormat::readStompHeaders: no characters read" );
+            }
+
+            // Check for an empty line to demark the end of the header section.
+            // if its not the end then we have a header to process, so parse it.
+            if( numChars == 1 && buffer[0] == '\0' ) {
+
+                endOfHeaders = true;
+
+            } else {
+
+                // Search through this line to separate the key/value pair.
+                for( size_t ix = 0; ix < buffer.size(); ++ix ) {
+
+                    // If found the key/value separator...
+                    if( buffer[ix] == ':' ) {
+
+                        // Null-terminate the key.
+                        buffer[ix] = '\0';
+
+                        const char* key = reinterpret_cast<char*>(&buffer[0]);
+                        const char* value = reinterpret_cast<char*>(&buffer[ix+1]);
+
+                        // Assign the header key/value pair.
+                        frame.getProperties().setProperty(key, value);
+
+                        // Break out of the for loop.
+                        break;
+                    }
+                }
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( decaf::io::IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, decaf::io::IOException )
+    AMQ_CATCHALL_THROW( decaf::io::IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::size_t StompWireFormat::readStompHeaderLine( decaf::io::DataInputStream* in )
+    throw ( decaf::io::IOException ) {
+
+    try{
+
+        // Clear any data from the buffer.
+        buffer.clear();
+
+        std::size_t count = 0;
+
+        while( true ) {
+
+            // Read the next char from the stream.
+            buffer.push_back( in->readByte() );
+
+            // Increment the position pointer.
+            count++;
+
+            // If we reached the line terminator, return the total number
+            // of characters read.
+            if( buffer[count-1] == '\n' )
+            {
+                // Overwrite the line feed with a null character.
+                buffer[count-1] = '\0';
+
+                return count;
+            }
+        }
+
+        // If we get here something bad must have happened.
+        throw decaf::io::IOException(
+            __FILE__, __LINE__,
+            "StompWireFormat::readStompHeaderLine: "
+            "Unrecoverable, error condition");
+    }
+    AMQ_CATCH_RETHROW( decaf::io::IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, decaf::io::IOException )
+    AMQ_CATCHALL_THROW( decaf::io::IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompWireFormat::readStompBody( StompFrame& frame, decaf::io::DataInputStream* in )
+   throw ( decaf::io::IOException ) {
+
+    try{
+
+        // Clear any data from the buffer.
+        buffer.clear();
+
+        unsigned int content_length = 0;
+
+        if(frame.getProperties().hasProperty(
+            commands::CommandConstants::toString(
+                commands::CommandConstants::HEADER_CONTENTLENGTH))) {
+
+            string length =
+                frame.getProperties().getProperty(
+                    commands::CommandConstants::toString(
+                        commands::CommandConstants::HEADER_CONTENTLENGTH));
+
+            content_length = (unsigned int)Integer::parseInt( length );
+         }
+
+         if( content_length != 0 ) {
+            // For this case its assumed that content length indicates how
+            // much to read.  We reserve space in the buffer for it to
+            // minimize the number of reallocs that might occur.  We are
+            // assuming that content length doesn't count the trailing null
+            // that indicates the end of frame.  The reserve won't do anything
+            // if the buffer already has that much capacity.  The resize call
+            // basically sets the end iterator to the correct location since
+            // this is a char vector and we already reserve enough space.
+            // Resize doesn't realloc the vector smaller if content_length
+            // is less than capacity of the buffer, it just move the end
+            // iterator.  Reserve adds the benefit that the mem is set to
+            // zero.  Over time as larger messages come in thsi will cause
+            // us to adapt to that size so that future messages that are
+            // around that size won't alloc any new memory.
+
+            buffer.reserve( (std::size_t)content_length );
+            buffer.resize( (std::size_t)content_length );
+
+            // Read the Content Length now
+            in->read( &buffer[0], 0, content_length );
+
+            // Content Length read, now pop the end terminator off (\0\n).
+            if( in->readByte() != '\0' ) {
+
+                throw decaf::io::IOException(
+                    __FILE__, __LINE__,
+                    "StompWireFormat::readStompBody: "
+                    "Read Content Length, and no trailing null");
+            }
+
+        } else {
+
+            // Content length was either zero, or not set, so we read until the
+            // first null is encounted.
+            while( true ) {
+
+                char byte = in->readByte();
+
+                buffer.push_back(byte);
+
+                content_length++;
+
+                if( byte != '\0' )
+                {
+                    continue;
+                }
+
+                break;  // Read null and newline we are done.
+            }
+        }
+
+        if( content_length != 0 ) {
+            // Set the body contents in the frame - copy the memory
+            frame.getBody() = buffer;
+        }
+    }
+    AMQ_CATCH_RETHROW( decaf::io::IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, decaf::io::IOException )
+    AMQ_CATCHALL_THROW( decaf::io::IOException )
+}



Mime
View raw message