activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r418749 [4/17] - in /incubator/activemq/trunk/activemq-cpp: ./ src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/ src/main/activemq/connector/ src/main/activemq/connector/openwire/ src/main/activemq/connector/stomp/ src/main/ac...
Date Mon, 03 Jul 2006 11:51:54 GMT
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompFrame.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_
+ 
+#include <string>
+#include <map>
+#include <activemq/util/SimpleProperties.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    /**
+     * A Stomp-level message frame that encloses all messages
+     * to and from the broker.
+     */
+    class StompFrame{           
+    public:
+    
+        /**
+         * Default constructor.
+         */
+        StompFrame(void){
+            body = NULL;
+            bodyLength = 0;
+        }
+        
+        /**
+         * Destruction - frees the memory pool.
+         */
+        virtual ~StompFrame(void) { delete body; }
+        
+        /**
+         * Clonse this message exactly, returns a new instance that the
+         * caller is required to delete.
+         * @return new copy of this message
+         */
+        virtual StompFrame* clone(void) const {
+            StompFrame* frame = new StompFrame();
+            frame->command = command;
+            frame->properties = properties;
+            char* cpyBody = new char[bodyLength];
+            memcpy(cpyBody, body, bodyLength);
+            frame->setBody(cpyBody, bodyLength);
+            return frame;
+        }
+        
+        /**
+         * Sets the command for this stomp frame.
+         * @param command The command to be set.
+         */
+        void setCommand( const std::string& cmd ){
+            this->command = cmd;
+        }
+        
+        /**
+         * Accessor for this frame's command field.
+         */
+        const std::string& getCommand(void) const{
+            return command;
+        }
+        
+        /**
+         * Gets access to the header properties for this frame.
+         */
+        util::Properties& getProperties(void){ return properties; }
+        const util::Properties& getProperties(void) const { 
+            return properties;
+        }
+        
+        /**
+         * Accessor for the body data of this frame.
+         * @return char pointer to body data
+         */
+        const char* getBody(void) const{
+            return body;
+        }
+        
+        /**
+         * Return the number of bytes contained in this frames body
+         * @return Body bytes length.
+         */
+        int getBodyLength(void) const{ return bodyLength; }
+        
+        /**
+         * Sets the body data of this frame as a byte sequence.
+         * @param bytes The byte buffer to be set in the body.
+         * @param numBytes The number of bytes in the buffer.
+         */
+        void setBody( const char* bytes, const int numBytes ){
+            body = bytes;
+            bodyLength = numBytes;
+        }   
+        
+    private:
+
+        // String Name of this command.
+        std::string command;
+
+        // Properties of the Stomp Message
+        util::SimpleProperties properties;
+
+        // Byte data of Body.
+        const char* body;
+        int bodyLength;     
+        
+    };
+    
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPFRAMEWRAPPER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPPRODUCERINFO_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPPRODUCERINFO_H_
+
+#include <activemq/connector/ProducerInfo.h>
+#include <cms/Destination.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompProducerInfo : public ProducerInfo
+    {
+    private:
+    
+        // Producer Id
+        unsigned int producerId;
+
+        // Default Destination
+        cms::Destination* dest;
+        
+        // Session that this producer is attached to - we do not own this
+        const SessionInfo* session;
+        
+    public:
+
+    	StompProducerInfo(void) { dest = NULL; }
+    	virtual ~StompProducerInfo(void) { delete dest; }
+
+        /**
+         * Retrieves the default destination that this producer
+         * sends its messages to.
+         * @return Destionation, owned by this object
+         */
+        virtual const cms::Destination& getDestination(void) const {
+            return *dest;
+        }
+
+        /**
+         * Sets the Default Destination for this Producer
+         * @param reference to a destination, copied internally
+         */
+        virtual void setDestination( const cms::Destination& dest ) {
+            this->dest = dest.clone();
+        }
+
+        /**
+         * Gets the ID that is assigned to this Producer
+         * @return value of the Producer Id.
+         */
+        virtual unsigned int getProducerId(void) const {
+            return producerId;
+        }
+        
+        /**
+         * Sets the ID that is assigned to this Producer
+         * @return string value of the Producer Id.
+         */
+        virtual void setProducerId( const unsigned int id ) {
+            this->producerId = id;
+        }
+
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual const SessionInfo* getSessionInfo(void) const {
+            return session;
+        }
+
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual void setSessionInfo( const SessionInfo* session ) {
+            this->session = session;
+        }
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPPRODUCERINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPQUEUE_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPQUEUE_H_
+
+#include <activemq/connector/stomp/StompDestination.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <cms/Queue.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompQueue : public StompDestination<cms::Queue>
+    {
+    public:
+
+    	StompQueue(void) : StompDestination< cms::Queue >() {}
+
+        StompQueue(const std::string& name) : 
+            StompDestination< cms::Queue >( name, cms::Destination::QUEUE )
+        {}
+
+    	virtual ~StompQueue(void) {}
+
+        /**
+         * Gets the name of this queue.
+         * @return The queue name.
+         */
+        virtual std::string getQueueName(void) const 
+            throw( cms::CMSException ) {
+                return toString();
+        }
+
+        /**
+         * Creates a new instance of this destination type that is a
+         * copy of this one, and returns it.
+         * @returns cloned copy of this object
+         */
+        virtual cms::Destination* clone(void) const {
+            return new StompQueue( toString() );
+        }
+
+    protected:
+
+        /**
+         * Retrieves the proper Stomp Prefix for the specified type
+         * of Destination
+         * @return string prefix
+         */
+        virtual std::string getPrefix(void) const {
+            return commands::CommandConstants::queuePrefix;
+        }
+        
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPQUEUE_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSelector.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,31 @@
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPSELECTOR_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPSELECTOR_H_
+
+#include <cms/Message.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+    
+    /**
+     * Since the stomp protocol doesn't have a consumer-based selector
+     * mechanism, we have to do the selector logic on the client
+     * side.  This class provides the selector algorithm that is
+     * needed to determine if a given message is to be selected for
+     * a given consumer's selector string.
+     */
+    class StompSelector{
+    public:
+    
+        static bool isSelected( const std::string& selector,
+            cms::Message* msg )
+        {
+            return true;
+        }
+        
+    };
+    
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPSELECTOR_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONINFO_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONINFO_H_
+
+#include <activemq/connector/SessionInfo.h>
+#include <cms/Session.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompSessionInfo : public connector::SessionInfo
+    {
+    private:
+    
+        // Acknowledge Mode of this Session
+        cms::Session::AcknowledgeMode ackMode;
+        
+        // The id of the connection to the broker 
+        // (given to us by the broker)
+        std::string connectionId;
+        
+        // The unique session id
+        unsigned int sessionId;
+        
+        // Info for this sessions current transaction
+        const TransactionInfo* transaction;
+        
+    public:
+
+        /**
+         * Constructor
+         */
+        StompSessionInfo(void)
+        {
+            sessionId = 0;
+            ackMode = cms::Session::AutoAcknowledge;
+        }
+
+        /**
+         * Destructor
+         */
+        virtual ~StompSessionInfo(void) {}
+
+        /**
+         * Gets the Connection Id of the Connection that this consumer is
+         * using to receive its messages.
+         * @return string value of the connection id
+         */
+        virtual const std::string& getConnectionId(void) const{
+            return connectionId;
+        }
+   
+        /**
+         * Sets the Connection Id of the Connection that this consumer is
+         * using to receive its messages.
+         * @param string value of the connection id
+         */
+        virtual void setConnectionId( const std::string& id ){
+            connectionId = id;
+        }
+        
+        /**
+         * Gets the Sessions Id value
+         * @return id for this session
+         */
+        virtual unsigned int getSessionId(void) const {
+            return sessionId;
+        }
+
+        /**
+         * Sets the Session Id for this Session
+         * @param integral id value for this session
+         */
+        virtual void setSessionId( const unsigned int id ) {
+            this->sessionId = id;
+        }
+
+        /**
+         * Sets the Ack Mode of this Session Info object
+         * @param Ack Mode
+         */
+        virtual void setAckMode(cms::Session::AcknowledgeMode ackMode) {
+            this->ackMode = ackMode;
+        }
+        
+        /**
+         * Gets the Ack Mode of this Session
+         * @return Ack Mode
+         */
+        virtual cms::Session::AcknowledgeMode getAckMode(void) const {
+            return ackMode;
+        }
+        
+        /**
+         * Gets the currently active transaction info, if this session is
+         * transacted, returns NULL when not transacted.  You must call
+         * getAckMode and see if the session is transacted.
+         * @return Transaction Id of current Transaction
+         */
+        virtual const TransactionInfo* getTransactionInfo(void) const {
+            return transaction;
+        }
+        
+        /**
+         * Sets the current transaction info for this session, this is nit
+         * used when the session is not transacted.
+         * @param Transaction Id
+         */        
+        virtual void setTransactionInfo( const TransactionInfo* transaction ) {
+            this->transaction = transaction;
+        }
+
+   };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,326 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompSessionManager.h"
+
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/connector/stomp/StompSessionInfo.h>
+#include <activemq/connector/stomp/StompConsumerInfo.h>
+#include <activemq/connector/stomp/commands/SubscribeCommand.h>
+#include <activemq/connector/stomp/commands/UnsubscribeCommand.h>
+#include <activemq/connector/stomp/StompSelector.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::connector;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+StompSessionManager::StompSessionManager( const std::string& connectionId, 
+                                          Transport* transport )
+{
+    if( transport == NULL )
+    {
+        throw NullPointerException( 
+            __FILE__, __LINE__,
+            "StompSessionManager::StompSessionManager" );
+    }
+
+    this->transport = transport;
+    this->connectionId = connectionId;
+    this->nextSessionId = 0;
+    this->nextConsumerId = 0;
+    this->messageListener = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompSessionManager::~StompSessionManager(void)
+{
+    // NOTE - I am not cleaning out the ConsumerInfo objects in the
+    // map becaise it is really the job of the consumer ot remove itself
+    // when it is destructed.  If it doesn't then we would have problems,
+    // but if it does, but it's deleted after this object then we would
+    // still have problems.  
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompSessionManager::getNextSessionId(void)
+{
+    synchronized(&mutex)
+    {
+        return nextSessionId++;
+    }
+    
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompSessionManager::getNextConsumerId(void)
+{
+    synchronized(&mutex)
+    {
+        return nextConsumerId++;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+connector::SessionInfo* StompSessionManager::createSession(
+    cms::Session::AcknowledgeMode ackMode) 
+        throw ( exceptions::ActiveMQException )
+{
+    try
+    {
+        SessionInfo* session = new StompSessionInfo();
+        
+        // Init data
+        session->setAckMode(ackMode);
+        session->setConnectionId( connectionId );
+        session->setSessionId( getNextSessionId() );
+        
+        return session;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::removeSession( 
+    connector::SessionInfo* session )
+        throw ( exceptions::ActiveMQException )
+{
+    // NO-op
+}
+    
+////////////////////////////////////////////////////////////////////////////////
+connector::ConsumerInfo* StompSessionManager::createConsumer(
+    cms::Destination* destination, 
+    SessionInfo* session,
+    const std::string& selector)
+        throw( ConnectorException )
+{
+    try
+    {
+        // Delegate to the createDurableConsumer method, just pas the
+        // appropriate params so that a regular consumer is created on
+        // the broker side.
+        return createDurableConsumer( 
+            destination, session, "", selector, false );    
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+connector::ConsumerInfo* StompSessionManager::createDurableConsumer(
+    cms::Destination* destination, 
+    SessionInfo* session,
+    const std::string& name,
+    const std::string& selector,
+    bool noLocal )
+        throw ( ConnectorException )
+{
+    try
+    {
+        synchronized(&mutex)
+        {
+            // Find the right mapping to consumers        
+            ConsumerMap& consumerMap = 
+                destinationMap[ destination->toString() ];
+            
+            // We only need to send a sub request if there are no active 
+            // consumers on this destination.  
+            if( consumerMap.empty() )
+            {
+                // Send the request to the Broker
+                SubscribeCommand cmd;
+                
+                if( session->getAckMode() == cms::Session::ClientAcknowledge )
+                {
+                    cmd.setAckMode( CommandConstants::ACK_CLIENT );
+                }
+                cmd.setDestination( destination->toProviderString() );
+                cmd.setNoLocal( noLocal );
+
+                if( name != "" )
+                {
+                    cmd.setSubscriptionName( name );
+                }
+                
+                // The Selector is set on the first subscribe on this dest,
+                // and if another consumer is created on this destination
+                // that specifies a selector it will be ignored.  While 
+                // this is not ideal, is the only way to handle the fact
+                // that activemq stomp doesn't support multiple sessions.
+                if( selector != "" )
+                {
+                    cmd.setMessageSelector( selector );
+                }
+        
+                // Fire the message        
+                transport->oneway( &cmd );
+            }
+             
+            // Initialize a new Consumer info Message
+            ConsumerInfo* consumer = new StompConsumerInfo();
+            
+            consumer->setConsumerId( getNextConsumerId() );
+            consumer->setDestination( *destination );
+            consumer->setMessageSelector( selector );
+            consumer->setSessionInfo( session );
+    
+            // Store this consumer for later message dispatching.        
+            consumerMap.insert( 
+                make_pair( consumer->getConsumerId(), consumer ) );
+            
+            return consumer;
+        }
+        
+        return NULL;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::removeConsumer(
+    connector::ConsumerInfo* consumer)
+        throw( ConnectorException )
+{
+    try
+    {
+        synchronized(&mutex)
+        {
+            DestinationMap::iterator itr = 
+                destinationMap.find( consumer->getDestination().toString() );
+                
+            if( itr == destinationMap.end() )
+            {
+                // Already removed from the map
+                return;
+            }
+            
+            ConsumerMap& consumers = itr->second;
+            
+            // Remove from the map.
+            consumers.erase( consumer->getConsumerId() );
+            
+            // If there are no more on this destination then we unsubscribe
+            if( consumers.empty() )
+            {
+                UnsubscribeCommand cmd;
+                
+                cmd.setDestination( 
+                    consumer->getDestination().toProviderString() );
+                
+                // Send the message
+                transport->oneway( &cmd );
+            }    
+        }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompSessionManager::onStompCommand( commands::StompCommand* command ) 
+    throw ( StompConnectorException )
+{
+    try
+    {
+        cms::Message* message = dynamic_cast< cms::Message*>( command );
+
+        if( message == NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompSessionManager::onStompCommand - Invalid Command" );
+        }
+
+        if( messageListener == NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompSessionManager::onStompCommand - "
+                "No Message Listener Registered." );
+        }
+                
+        synchronized(&mutex)
+        {
+            DestinationMap::iterator itr = 
+                destinationMap.find( message->getCMSDestination().toString() );
+
+            if( itr == destinationMap.end() )
+            {
+                throw StompConnectorException(
+                    __FILE__, __LINE__,
+                    "StompSessionManager::onStompCommand - "
+                    "Received a Message that doesn't have a listener" );
+            }
+
+            // If we only have 1 consumer, we don't need to clone the original
+            // message.
+            if(itr->second.size() == 1)
+            {
+                ConsumerInfo* consumerInfo = itr->second.begin()->second;
+                
+                if( StompSelector::isSelected( 
+                        consumerInfo->getMessageSelector(),
+                        message ) )
+                {                    
+                    ActiveMQMessage* msg = 
+                        dynamic_cast< ActiveMQMessage* >( message );
+                    messageListener->onConsumerMessage( consumerInfo, msg );
+                }
+                
+                return;
+            }
+
+            // We have more than one consumer of this message - we have to
+            // clone the message for each consumer so they don't destroy each other's
+            // message.
+            ConsumerMap::iterator c_itr = itr->second.begin();
+            
+            for(; c_itr != itr->second.end(); ++c_itr )
+            {
+                ConsumerInfo* consumerInfo = c_itr->second;
+                
+                if( StompSelector::isSelected( 
+                        consumerInfo->getMessageSelector(),
+                        message ) )
+                {
+                    ActiveMQMessage* msg = 
+                        dynamic_cast< ActiveMQMessage* >( message->clone() );
+                    messageListener->onConsumerMessage( consumerInfo, msg );
+                }
+            }
+            
+            // We got here which means that we sent copies, so remove
+            // the original.
+            delete command;
+        }
+    }
+    AMQ_CATCH_RETHROW( StompConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, StompConnectorException )
+    AMQ_CATCHALL_THROW( StompConnectorException )
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONMANAGER_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONMANAGER_H_
+
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/connector/ConnectorException.h>
+#include <activemq/connector/stomp/StompCommandListener.h>
+#include <activemq/connector/ConsumerMessageListener.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+    
+    /**
+     * The Stomp Session Manager is responsible for managing multiple
+     * Client Sessions.  The management involves routing messages to 
+     * sessions.  If more than one ActiveMQConsumer is created that is
+     * listening to the same Topic or Queue, then the messages that are
+     * received must be delivered to each of those sessions, and copied
+     * so that a transactional session can manage the lifetime of the
+     * message.
+     */
+    class StompSessionManager : public StompCommandListener
+    {
+    private:
+    
+        // Map Types
+        typedef std::map<unsigned int, ConsumerInfo*>  ConsumerMap;
+        typedef std::map<std::string, ConsumerMap>     DestinationMap;        
+
+    private:
+    
+        // Next id to be used for a Session Id
+        unsigned int nextSessionId;
+        
+        // Next id to be used for a Consumer Id
+        unsigned int nextConsumerId;
+        
+        // Mutex to protect ids.
+        concurrent::Mutex mutex;
+        
+        // Mapping of a Session to all the consumer's
+        DestinationMap destinationMap;
+        
+        // Transport that we use to find a transport for sending
+        // commands
+        transport::Transport* transport;
+        
+        // Consumer Message listener, we notify this whenever we receive 
+        // a new StompMessage type command.
+        ConsumerMessageListener* messageListener;
+        
+        // The global connection id
+        std::string connectionId;
+        
+    public:
+
+    	StompSessionManager( const std::string& connectionId, 
+                             transport::Transport* transport );
+    	virtual ~StompSessionManager(void);
+
+        /**
+         * Creates a new Session and returns a SessionInfo object whose
+         * lifetime is the property of the caller.
+         * @param the ackMode of the session.
+         * @return new SessionInfo object
+         */
+        virtual connector::SessionInfo* createSession(
+            cms::Session::AcknowledgeMode ackMode)
+                throw ( exceptions::ActiveMQException );
+
+        /**
+         * removes the specified Session from the Manager, all data that
+         * is associated with session consumers is now lost.  The session
+         * is not deleted here, it is the owner's responsibility.
+         * @param the session info for the session to remove.
+         */
+        virtual void removeSession( connector::SessionInfo* session )
+            throw ( exceptions::ActiveMQException );
+        
+        /**
+         * Creates a new consumer to the specified session, will subscribe
+         * to the destination if another consumer hasn't already been 
+         * subbed to that destination.  The returned consumer is the 
+         * owned by the caller and not deleted by this class.
+         * @param destination to subscribe to
+         * @param session to associate with
+         * @param selector string
+         * @return new ConsumerInfo object.
+         */
+        virtual connector::ConsumerInfo* createConsumer(
+            cms::Destination* destination, 
+            SessionInfo* session,
+            const std::string& selector)
+                throw( ConnectorException );
+
+        /**
+         * Creates a new durable consumer to the specified session, will 
+         * subscribe to the destination if another consumer hasn't already 
+         * been subbed to that destination.  The returned consumer is the 
+         * owned by the caller and not deleted by this class.
+         * @param Topic to subscribe to
+         * @param session to associate with
+         * @param Subscription Name
+         * @param selector string
+         * @param Should we be notified of messages we send.
+         * @return new ConsumerInfo object.
+         */
+        virtual connector::ConsumerInfo* createDurableConsumer(
+            cms::Destination* destination, 
+            SessionInfo* session,
+            const std::string& name,
+            const std::string& selector,
+            bool noLocal )
+                throw ( ConnectorException );
+
+        /**
+         * Removes the Consumer from the session, will unsubscrive if the
+         * consumer is the only one listeneing on this destination.  The
+         * Consumer is not deleted, just unassociated from the Manager
+         * caller is responsible for managing the lifetime.
+         * @param the ConsumerInfo for the consumer to remove
+         * @throws ConnectorException
+         */            
+        virtual void removeConsumer( connector::ConsumerInfo* consumer )
+            throw( ConnectorException );
+
+        /** 
+         * Sets the listener of consumer messages.
+         * @param listener the observer.
+         */
+        virtual void setConsumerMessageListener(
+            ConsumerMessageListener* listener )
+        {
+            this->messageListener = listener;
+        }
+
+    public:   // StompCommand Listener
+    
+        /**
+         * Process the Stomp Command
+         * @param command to process
+         * @throw ConnterException
+         */
+        virtual void onStompCommand( commands::StompCommand* command ) 
+            throw ( StompConnectorException );    
+            
+    protected:
+    
+        /**
+         * Gets the Next Session Id
+         * @return unique session id
+         */
+        virtual unsigned int getNextSessionId(void);
+        
+        /**
+         * Gets the Next Session Id
+         * @return unique session id
+         */
+        virtual unsigned int getNextConsumerId(void);
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPSESSIONMANAGER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPTOPIC_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPTOPIC_H_
+
+#include <activemq/connector/stomp/StompDestination.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <cms/Topic.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+    
+    class StompTopic : public StompDestination<cms::Topic>
+    {
+    public:
+
+        StompTopic(void) : StompDestination<cms::Topic>() {}
+
+        StompTopic(const std::string& name) : 
+            StompDestination< cms::Topic >( name, cms::Destination::TOPIC )
+        {}
+
+        virtual ~StompTopic(void) {}
+
+        /**
+         * Gets the name of this queue.
+         * @return The queue name.
+         */
+        virtual std::string getTopicName(void) const 
+            throw( cms::CMSException ) {
+                return toString();
+        }
+
+        /**
+         * Creates a new instance of this destination type that is a
+         * copy of this one, and returns it.
+         * @returns cloned copy of this object
+         */
+        virtual cms::Destination* clone(void) const {
+            return new StompTopic( toString() );
+        }
+
+    protected:
+
+        /**
+         * Retrieves the proper Stomp Prefix for the specified type
+         * of Destination
+         * @return string prefix
+         */
+        virtual std::string getPrefix(void) const {
+            return commands::CommandConstants::topicPrefix;
+        }
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPTOPIC_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,88 @@
+/*
+* Copyright 2006 The Apache Software Foundation or its licensors, as
+* applicable.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef ACTIVEMQ_CONNECTOR_STOMPTRANSACTIONINFO_H_
+#define ACTIVEMQ_CONNECTOR_STOMPTRANSACTIONINFO_H_
+
+#include <activemq/connector/TransactionInfo.h>
+#include <activemq/connector/SessionInfo.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompTransactionInfo : public connector::TransactionInfo
+    {
+    private:
+    
+        // Transaction Id
+        unsigned int transactionId;
+        
+        // Session Info - We do not own this
+        const SessionInfo* session;
+        
+    public:
+
+        /**
+         * TransactionInfo Constructor
+         */
+        StompTransactionInfo(void) {
+            transactionId = 0;
+            session = NULL;
+        }
+
+        /**
+         * Destructor
+         */
+        virtual ~StompTransactionInfo(void) {}
+
+        /**
+         * Gets the Transction Id
+         * @return unsigned int Id
+         */
+        virtual unsigned int getTransactionId(void) const {
+            return transactionId;
+        }
+
+        /**
+         * Sets the Transction Id
+         * @param unsigned int Id
+         */
+        virtual void setTransactionId( const unsigned int id ) {
+            this->transactionId = id;
+        } 
+
+        /**
+         * Gets the Session Info that this Transction is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual const SessionInfo* getSessionInfo(void) const {
+            return session;
+        }
+        
+        /**
+         * Gets the Session Info that this Transction is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual void setSessionInfo( const SessionInfo* session ) {
+            this->session = session;
+        }
+
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMPTRANSACTIONINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbortCommand.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABORTCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABORTCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+    /**
+     * Represents the Stomp Abort Command which rolls back a
+     * transaction in progress.
+     */
+    class AbortCommand : public AbstractCommand< transport::Command >
+    {
+    public:
+
+        AbortCommand(void) :
+            AbstractCommand<transport::Command>() {
+                initialize( getFrame() );
+        }
+        AbortCommand( StompFrame* frame ) : 
+            AbstractCommand<transport::Command>(frame) {
+                validate( getFrame() );
+        }
+        virtual ~AbortCommand(void) {}
+        
+    protected:
+    
+        /**
+         * Inheritors are required to override this method to init the
+         * frame with data appropriate for the command type.
+         * @param Frame to init
+         */
+        virtual void initialize( StompFrame& frame )
+        {
+            frame.setCommand( CommandConstants::toString(
+                CommandConstants::ABORT ) );
+        }
+
+        /**
+         * Inheritors are required to override this method to validate 
+         * the passed stomp frame before it is marshalled or unmarshaled
+         * @param Frame to validate
+         * @returns true if frame is valid
+         */
+        virtual bool validate( const StompFrame& frame ) const
+        {
+            if((frame.getCommand() == 
+                CommandConstants::toString( CommandConstants::ABORT ) ) &&
+               (frame.getProperties().hasProperty(
+                    CommandConstants::toString( 
+                        CommandConstants::HEADER_TRANSACTIONID ) ) ) )
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+    };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABORTCOMMAND_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABSTRACTCOMMAND_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABSTRACTCOMMAND_H_
+
+#include <activemq/connector/stomp/StompFrame.h>
+#include <activemq/connector/stomp/commands/StompCommand.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/util/Integer.h>
+#include <activemq/util/Long.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+    
+    /**
+     * Interface for all Stomp commands.  Commands wrap
+     * around a stomp frame and provide their own marshalling
+     * to and from frames.  Stomp frame objects are dumb and have
+     * a generic interface that becomes cumbersome to use directly.
+     * Commands help to abstract the stomp frame by providing a
+     * more user-friendly interface to the frame content.  
+     */
+    
+    template<typename T>
+    class AbstractCommand
+    : 
+        public StompCommand,
+        public T
+    {
+    protected:
+    
+        // Frame that contains the actual message
+        StompFrame* frame;
+
+    protected:
+    
+        StompFrame& getFrame(void) {
+            if( frame == NULL ){
+                throw exceptions::NullPointerException(
+                    __FILE__, __LINE__,
+                    "AbstractCommand::getFrame - Frame not initialized");
+            }
+
+            return *frame;
+        }
+        
+        const StompFrame& getFrame(void) const {
+            if( frame == NULL ){
+                throw exceptions::NullPointerException(
+                    __FILE__, __LINE__,
+                    "AbstractCommand::getFrame - Frame not initialized");
+            }
+
+            return *frame;
+        }
+        
+        void destroyFrame(void)
+        {
+            if( frame != NULL ){
+                delete frame;
+                frame = NULL;
+            }
+        }
+    
+        const char* getPropertyValue( const std::string& name ) const{
+            return getFrame().getProperties().getProperty( name );
+        }
+
+        const std::string getPropertyValue( 
+            const std::string& name, 
+            const std::string& defReturn ) const {
+            return getFrame().getProperties().getProperty( 
+                name, defReturn );
+        }
+        
+        void setPropertyValue( const std::string& name, const std::string& value ){
+            getFrame().getProperties().setProperty( name, value );
+        }
+        
+        /**
+         * Inheritors are required to override this method to init the
+         * frame with data appropriate for the command type.
+         * @param Frame to init
+         */
+        virtual void initialize( StompFrame& frame ) = 0;
+
+        /**
+         * Inheritors are required to override this method to validate 
+         * the passed stomp frame before it is marshalled or unmarshaled
+         * @param Frame to validate
+         * @returns true if frame is valid
+         */
+        virtual bool validate( const StompFrame& frame ) const = 0;
+        
+    public:
+    
+        AbstractCommand(void){ 
+            frame = new StompFrame;
+        }
+        AbstractCommand(StompFrame* frame){ 
+            this->frame = frame;
+        }
+        virtual ~AbstractCommand(void){
+            destroyFrame();
+        }
+        
+        /**
+         * Sets the Command Id of this Message
+         * @param Command Id
+         */
+        virtual void setCommandId( const unsigned int id ){
+            setPropertyValue(
+                CommandConstants::toString( 
+                    CommandConstants::HEADER_REQUESTID),
+                 util::Integer::toString( id ) );
+        }
+
+        /**
+         * Gets the Command Id of this Message
+         * @return Command Id
+         */
+        virtual unsigned int getCommandId(void) const {
+            return util::Integer::parseInt(
+                getPropertyValue(
+                    CommandConstants::toString( 
+                        CommandConstants::HEADER_REQUESTID ),
+                    "0" ) );
+        }
+        
+        /**
+         * Set if this Message requires a Response
+         * @param true if response is required
+         */
+        virtual void setResponseRequired( const bool required ) {
+        }
+        
+        /**
+         * Is a Response required for this Command
+         * @return true if a response is required.
+         */
+        virtual bool isResponseRequired(void) const {
+            return frame->getProperties().hasProperty( 
+                CommandConstants::toString( 
+                    CommandConstants::HEADER_REQUESTID) );
+        }
+        
+        /**
+         * Gets the Correlation Id that is associated with this message
+         * @return the Correlation Id
+         */
+        virtual unsigned int getCorrelationId(void) const {
+            return util::Integer::parseInt(
+                getPropertyValue(
+                    CommandConstants::toString( 
+                        CommandConstants::HEADER_RESPONSEID ),
+                     "0" ) );
+        }
+
+        /**
+         * Sets the Correlation Id if this Command
+         * @param Id
+         */
+        virtual void setCorrelationId( const unsigned int corrId ) {
+            setPropertyValue(
+                CommandConstants::toString( 
+                    CommandConstants::HEADER_RESPONSEID),
+                 util::Integer::toString( corrId ) );
+        }
+        
+        /**
+         * Get the Transaction Id of this Command
+         * @return the Id of the Transaction
+         */      
+        virtual const char* getTransactionId(void) const{
+            return getPropertyValue( 
+                CommandConstants::toString( 
+                    CommandConstants::HEADER_TRANSACTIONID) );
+        }
+      
+        /**
+         * Set the Transaction Id of this Command
+         * @param the Id of the Transaction
+         */
+        virtual void setTransactionId( const std::string& id ){
+            setPropertyValue( 
+                CommandConstants::toString( 
+                    CommandConstants::HEADER_TRANSACTIONID),
+                id );
+        }  
+
+        /**
+         * Retrieve the Stomp Command Id for this message.
+         * @return Stomp CommandId enum
+         */
+        virtual CommandConstants::CommandId getStompCommandId(void) const {
+            return CommandConstants::toCommandId(
+                getFrame().getCommand() );
+        }
+        
+        /**
+         * Marshals the command to a stomp frame.
+         * @returns the stomp frame representation of this
+         * command.
+         * @throws MarshalException if the command is not
+         * in a state that can be marshaled.
+         */
+        virtual const StompFrame& marshal(void) const 
+            throw (marshal::MarshalException)
+        {
+            if( frame == NULL || !validate( *frame ) ){
+                throw marshal::MarshalException( 
+                    __FILE__, __LINE__,
+                    "AbstractCommand::marshal() - frame invalid" );
+            }
+            
+            return getFrame();
+        }
+
+    protected:
+
+        /**
+         * Fetch the number of bytes in the Stomp Frame Body
+         * @return number of bytes
+         */
+        virtual unsigned long getNumBytes(void) const{
+            return getFrame().getBodyLength();
+        }
+
+        /**
+         * Returns a char array of bytes that are contained in the message
+         * @param pointer to array of bytes.
+         */
+        virtual const char* getBytes(void) const{
+            return getFrame().getBody();
+        }
+    
+        /**
+         * Set the bytes that are to be sent in the body of this message
+         * the content length flag indicates if the Content Length header
+         * should be set.
+         * @param bytes to store
+         * @param number of bytes to pull from the bytes buffer
+         * @param true if the content length header should be set
+         */
+        virtual void setBytes( const char* bytes, 
+                               const unsigned long numBytes,
+                               const bool setContentLength = true )
+        {
+            char* copy = new char[numBytes];
+            memcpy( copy, bytes, numBytes );
+            getFrame().setBody( copy, numBytes );
+            if( setContentLength )
+            {
+                setPropertyValue( 
+                    CommandConstants::toString( 
+                        CommandConstants::HEADER_CONTENTLENGTH),
+                    util::Long::toString( numBytes ) );
+            }
+        }
+    };
+    
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_ABSTRACTCOMMAND_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNCETOR_STOMP_COMMANDS_ACKCOMMAND_H_
+#define _ACTIVEMQ_CONNCETOR_STOMP_COMMANDS_ACKCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+    /**
+     * Stomp Command that Represents Acknowledgement of a message
+     * receive.  The Ack Command has one required attribute, message
+     * Id.  For each message sent to the client from the broker, the
+     * message will not be considered consumed until an Ack is sent.  
+     * Optionally a Transaction Id can be set that indicates that the
+     * message acknowledgement should be part of a named transaction.
+     */
+    class AckCommand : public AbstractCommand< transport::Command >
+    {
+    public:
+
+        AckCommand(void) :
+            AbstractCommand<transport::Command>() {
+                initialize( getFrame() );
+        }
+        AckCommand( StompFrame* frame ) : 
+            AbstractCommand<transport::Command>(frame) {
+                validate( getFrame() );
+        }
+        virtual ~AckCommand(void) {}
+      
+        /**
+         * Get the Message Id of this Command
+         * @return the Id of the Message
+         */      
+        virtual const char* getMessageId(void) const{
+            return getPropertyValue( 
+                CommandConstants::toString( 
+                    CommandConstants::HEADER_MESSAGEID) );
+        }
+      
+        /**
+         * Set the Message Id that this Ack is associated with
+         * @param the Message Id
+         */
+        virtual void setMessageId(const std::string& messageId){
+            setPropertyValue( 
+                CommandConstants::toString( 
+                    CommandConstants::HEADER_MESSAGEID),
+                messageId );
+        }
+
+    protected:
+    
+        /**
+         * Inheritors are required to override this method to init the
+         * frame with data appropriate for the command type.
+         * @param Frame to init
+         */
+        virtual void initialize( StompFrame& frame )
+        {
+            frame.setCommand( CommandConstants::toString(
+                CommandConstants::ACK ) );
+        }
+
+        /**
+         * Inheritors are required to override this method to validate 
+         * the passed stomp frame before it is marshalled or unmarshaled
+         * @param Frame to validate
+         * @returns true if frame is valid
+         */
+        virtual bool validate( const StompFrame& frame ) const
+        {
+            if((frame.getCommand() == 
+                CommandConstants::toString( CommandConstants::ACK )) &&
+               (frame.getProperties().hasProperty(
+                   CommandConstants::toString( 
+                       CommandConstants::HEADER_TRANSACTIONID ) ) &&
+               (frame.getProperties().hasProperty(
+                   CommandConstants::toString( 
+                       CommandConstants::HEADER_MESSAGEID ) ) ) ) );
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+    };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNCETOR_STOMP_COMMANDS_ACKCOMMAND_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BEGINCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BEGINCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+    
+    /**
+     * Begins a Transaction.  Transactions in this case apply to
+     * sending and acknowledging -- any messages sent or acknowledged
+     * during a transaction will be handled atomically based on the
+     * transaction.
+     * 
+     * A transaction Identifier is required and this id will be used
+     * for all sends, commits, aborts, or acks.
+     */
+    class BeginCommand : public AbstractCommand< transport::Command >
+    {
+    public:
+   
+        BeginCommand(void) :
+            AbstractCommand<transport::Command>() {
+                initialize( getFrame() );
+        }
+        BeginCommand( StompFrame* frame ) : 
+            AbstractCommand<transport::Command>(frame) {
+                validate( getFrame() );
+        }
+        virtual ~BeginCommand(void) {}
+
+    protected:
+    
+        /**
+         * Inheritors are required to override this method to init the
+         * frame with data appropriate for the command type.
+         * @param Frame to init
+         */
+        virtual void initialize( StompFrame& frame )
+        {
+            frame.setCommand( CommandConstants::toString(
+                CommandConstants::BEGIN ) );
+        }
+
+        /**
+         * Inheritors are required to override this method to validate 
+         * the passed stomp frame before it is marshalled or unmarshaled
+         * @param Frame to validate
+         * @returns true if frame is valid
+         */
+        virtual bool validate( const StompFrame& frame ) const
+        {
+            if((frame.getCommand() == 
+                CommandConstants::toString( CommandConstants::BEGIN )) &&
+               (frame.getProperties().hasProperty(
+                    CommandConstants::toString( 
+                        CommandConstants::HEADER_TRANSACTIONID ) ) ) )
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+    };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BEGINCOMMAND_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BYTESMESSAGECOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BYTESMESSAGECOMMAND_H_
+
+#include <cms/BytesMessage.h>
+#include <activemq/connector/stomp/commands/StompMessage.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+
+    /**
+     * Implements the interface for a cms::BytesMessage.  Uses the template
+     * class StompMessage to implement all cms::Message type functionality
+     * and implements the BytesMessage interface here.
+     */    
+    class BytesMessageCommand : public StompMessage< cms::BytesMessage >
+    {
+    public:
+
+        BytesMessageCommand(void) :
+            StompMessage< cms::BytesMessage >() {
+                initialize( getFrame() );
+        }
+        BytesMessageCommand( StompFrame* frame ) : 
+            StompMessage< cms::BytesMessage >(frame) {
+                validate( getFrame() );
+        }
+    	virtual ~BytesMessageCommand(void) {}
+
+        /**
+         * Clonse this message exactly, returns a new instance that the
+         * caller is required to delete.
+         * @return new copy of this message
+         */
+        virtual cms::Message* clone(void) const {
+            StompFrame* frame = getFrame().clone();
+            
+            return new BytesMessageCommand( frame );
+        }   
+
+        /**
+         * sets the bytes given to the message body.  
+         * @param Byte Buffer to copy
+         * @param Number of bytes in Buffer to copy
+         * @throws CMSException
+         */
+        virtual void setBodyBytes( const unsigned char* buffer, 
+                                   const unsigned long numBytes ) 
+            throw( cms::CMSException ) {
+            this->setBytes(
+                reinterpret_cast<const char*>( buffer ), numBytes );
+        }
+        
+        /**
+         * Gets the bytes that are contained in this message, user should
+         * copy this data into a user allocated buffer.  Call 
+         * <code>getBodyLength</code> to determine the number of bytes
+         * to expect.
+         * @return const pointer to a byte buffer
+         */
+        virtual const unsigned char* getBodyBytes(void) const {
+            return reinterpret_cast<const unsigned char*>( this->getBytes() );
+        }
+      
+        /**
+         * Returns the number of bytes contained in the body of this message.
+         * @return number of bytes.
+         */
+        virtual unsigned long getBodyLength(void) const {
+            return this->getNumBytes();
+        }
+
+    };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_BYTESMESSAGECOMMAND_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "CommandConstants.h"
+#include <stdio.h>
+
+#include <activemq/connector/stomp/StompTopic.h>
+#include <activemq/connector/stomp/StompQueue.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+const char* CommandConstants::queuePrefix = "/queue/";
+const char* CommandConstants::topicPrefix = "/topic/";
+
+////////////////////////////////////////////////////////////////////////////////
+string CommandConstants::StaticInitializer::stompHeaders[NUM_STOMP_HEADERS];
+string CommandConstants::StaticInitializer::commands[NUM_COMMANDS];
+string CommandConstants::StaticInitializer::ackModes[NUM_ACK_MODES];
+string CommandConstants::StaticInitializer::msgTypes[NUM_MSG_TYPES];
+map<std::string, CommandConstants::StompHeader> CommandConstants::StaticInitializer::stompHeaderMap;
+map<std::string, CommandConstants::CommandId> CommandConstants::StaticInitializer::commandMap;
+map<std::string, CommandConstants::AckMode> CommandConstants::StaticInitializer::ackModeMap;
+map<std::string, CommandConstants::MessageType> CommandConstants::StaticInitializer::msgTypeMap;
+CommandConstants::StaticInitializer CommandConstants::staticInits;
+
+////////////////////////////////////////////////////////////////////////////////
+CommandConstants::StaticInitializer::StaticInitializer(){
+    
+    stompHeaders[HEADER_DESTINATION] = "destination";
+    stompHeaders[HEADER_TRANSACTIONID] = "transaction";
+    stompHeaders[HEADER_CONTENTLENGTH] = "content-length";
+    stompHeaders[HEADER_SESSIONID] = "session";
+    stompHeaders[HEADER_RECEIPTID] = "receipt-id";
+    stompHeaders[HEADER_RECEIPT_REQUIRED] = "receipt";
+    stompHeaders[HEADER_MESSAGEID] = "message-id";
+    stompHeaders[HEADER_ACK] = "ack";
+    stompHeaders[HEADER_LOGIN] = "login";
+    stompHeaders[HEADER_PASSWORD] = "passcode";
+    stompHeaders[HEADER_CLIENT_ID] = "client-id";
+    stompHeaders[HEADER_MESSAGE] = "message";
+    stompHeaders[HEADER_CORRELATIONID] = "correlation-id";
+    stompHeaders[HEADER_REQUESTID] = "request-id";
+    stompHeaders[HEADER_RESPONSEID] = "response-id";
+    stompHeaders[HEADER_EXPIRES] = "expires";
+    stompHeaders[HEADER_PERSISTANT] = "persistent";
+    stompHeaders[HEADER_PRIORITY] = "priority";
+    stompHeaders[HEADER_REPLYTO] = "reply-to";
+    stompHeaders[HEADER_TYPE] = "type";
+    stompHeaders[HEADER_AMQMSGTYPE] = "amq-msg-type";
+    stompHeaders[HEADER_JMSXGROUPID] = "JMSXGroupID";
+    stompHeaders[HEADER_JMSXGROUPSEQNO] = "JMSXGroupSeq";
+    stompHeaders[HEADER_SELECTOR] = "selector";
+    stompHeaders[HEADER_DISPATCH_ASYNC] = "activemq.dispatchAsync";
+    stompHeaders[HEADER_EXCLUSIVE] = "activemq.exclusive";
+    stompHeaders[HEADER_MAXPENDINGMSGLIMIT] = "activemq.maximumPendingMessageLimit";
+    stompHeaders[HEADER_NOLOCAL] = "activemq.noLocal";
+    stompHeaders[HEADER_PREFETCHSIZE] = "activemq.prefetchSize";
+    stompHeaders[HEADER_PRIORITY] = "activemq.priority";
+    stompHeaders[HEADER_RETROACTIVE] = "activemq.retroactive";
+    stompHeaders[HEADER_SUBSCRIPTIONNAME] = "activemq.subscriptionName";
+    stompHeaders[HEADER_TIMESTAMP] = "timestamp";
+    stompHeaders[HEADER_REDELIVERED] = "redelivered";
+    stompHeaders[HEADER_REDELIVERYCOUNT] = "redelivery_count";
+    stompHeaders[HEADER_SELECTOR] = "selector";
+    stompHeaders[HEADER_ID] = "id";
+    stompHeaders[HEADER_SUBSCRIPTION] = "subscription";
+    commands[CONNECT] = "CONNECT";
+    commands[CONNECTED] = "CONNECTED";
+    commands[DISCONNECT] = "DISCONNECT";
+    commands[SUBSCRIBE] = "SUBSCRIBE";
+    commands[UNSUBSCRIBE] = "UNSUBSCRIBE";
+    commands[MESSAGE] = "MESSAGE";
+    commands[SEND] = "SEND";
+    commands[BEGIN] = "BEGIN";
+    commands[COMMIT] = "COMMIT";
+    commands[ABORT] = "ABORT";
+    commands[ACK] = "ACK";
+    commands[ERROR_CMD] = "ERROR";
+    commands[RECEIPT] = "RECEIPT";
+    ackModes[ACK_CLIENT] = "client";
+    ackModes[ACK_AUTO] = "auto";
+    msgTypes[TEXT] = "text";
+    msgTypes[BYTES] = "bytes";
+
+    for( int ix=0; ix<NUM_STOMP_HEADERS; ++ix ){
+        stompHeaderMap[stompHeaders[ix]] = (StompHeader)ix;
+    }
+    
+    for( int ix=0; ix<NUM_COMMANDS; ++ix ){
+        commandMap[commands[ix]] = (CommandId)ix;
+    }
+    
+    for( int ix=0; ix<NUM_ACK_MODES; ++ix ){
+        ackModeMap[ackModes[ix]] = (AckMode)ix;
+    }
+
+    for( int ix=0; ix<NUM_MSG_TYPES; ++ix ){
+        msgTypeMap[msgTypes[ix]] = (MessageType)ix;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Destination* CommandConstants::toDestination( const std::string& dest )
+    throw ( exceptions::IllegalArgumentException )
+{
+    int qpos = dest.find(queuePrefix);
+    int tpos = dest.find(topicPrefix);
+    
+    if(tpos == 0)
+    {
+        return new StompTopic(dest.substr(strlen(topicPrefix)));
+    }
+    else if(qpos == 0)
+    {
+        return new StompQueue(dest.substr(strlen(queuePrefix)));
+    }
+    else
+    {
+        throw IllegalArgumentException(
+            __FILE__, __LINE__,
+            "CommandConstants::toDestionation - Not a valid Stomp Dest");
+    }
+}  
+
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMANDCONSTANTS_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMANDCONSTANTS_H_
+
+#include <cms/Destination.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+#include <string>
+#include <map>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+    
+    class CommandConstants{    
+    public:
+    
+        enum CommandId{
+            CONNECT,
+            CONNECTED,
+            DISCONNECT,
+            SUBSCRIBE,
+            UNSUBSCRIBE,
+            MESSAGE,
+            SEND,
+            BEGIN,
+            COMMIT,
+            ABORT,
+            ACK,
+            ERROR_CMD,
+            RECEIPT,
+            NUM_COMMANDS
+        };
+        
+        enum StompHeader{
+            HEADER_DESTINATION,
+            HEADER_TRANSACTIONID,
+            HEADER_CONTENTLENGTH,
+            HEADER_SESSIONID,
+            HEADER_RECEIPT_REQUIRED,
+            HEADER_RECEIPTID,
+            HEADER_MESSAGEID,
+            HEADER_ACK,
+            HEADER_LOGIN,
+            HEADER_PASSWORD,
+            HEADER_CLIENT_ID,
+            HEADER_MESSAGE,
+            HEADER_CORRELATIONID,
+            HEADER_REQUESTID,
+            HEADER_RESPONSEID,
+            HEADER_EXPIRES,
+            HEADER_PERSISTANT,
+            HEADER_REPLYTO,
+            HEADER_TYPE,
+            HEADER_AMQMSGTYPE,
+            HEADER_JMSXGROUPID,
+            HEADER_JMSXGROUPSEQNO,
+            HEADER_DISPATCH_ASYNC,
+            HEADER_EXCLUSIVE,
+            HEADER_MAXPENDINGMSGLIMIT,
+            HEADER_NOLOCAL,
+            HEADER_PREFETCHSIZE,
+            HEADER_PRIORITY,
+            HEADER_RETROACTIVE,
+            HEADER_SUBSCRIPTIONNAME,
+            HEADER_TIMESTAMP,
+            HEADER_REDELIVERED,
+            HEADER_REDELIVERYCOUNT,
+            HEADER_SELECTOR,
+            HEADER_ID,
+            HEADER_SUBSCRIPTION,
+            NUM_STOMP_HEADERS
+        }; 
+        
+        enum AckMode{
+            ACK_CLIENT,
+            ACK_AUTO,
+            NUM_ACK_MODES
+        };
+        
+        enum MessageType
+        {
+            TEXT,
+            BYTES,
+            NUM_MSG_TYPES
+        };
+        
+        static const char* queuePrefix;
+        static const char* topicPrefix;
+        
+        static const std::string& toString( const CommandId cmd ){
+            return StaticInitializer::commands[cmd];
+        }
+        
+        static CommandId toCommandId( const std::string& cmd ){     
+            std::map<std::string, CommandId>::iterator iter = 
+                StaticInitializer::commandMap.find(cmd);
+
+            if( iter == StaticInitializer::commandMap.end() ){
+                return NUM_COMMANDS;
+            }
+                    
+            return iter->second;
+        }             
+        
+        static std::string toString( const StompHeader header ){
+            return StaticInitializer::stompHeaders[header];
+        }
+        
+        static StompHeader toStompHeader( const std::string& header ){  
+            
+            std::map<std::string, StompHeader>::iterator iter = 
+                StaticInitializer::stompHeaderMap.find(header);
+
+            if( iter == StaticInitializer::stompHeaderMap.end() ){
+                return NUM_STOMP_HEADERS;
+            }
+                    
+            return iter->second;            
+        }        
+        
+        static std::string toString( const AckMode mode ){
+            return StaticInitializer::ackModes[mode];
+        }
+        
+        static AckMode toAckMode( const std::string& mode ){
+            std::map<std::string, AckMode>::iterator iter = 
+                StaticInitializer::ackModeMap.find(mode);
+
+            if( iter == StaticInitializer::ackModeMap.end() ){
+                return NUM_ACK_MODES;
+            }
+                    
+            return iter->second;
+        }  
+         
+        static std::string toString( const MessageType type ){
+            return StaticInitializer::msgTypes[type];
+        }
+        
+        static MessageType toMessageType( const std::string& type ){
+            std::map<std::string, MessageType>::iterator iter = 
+                StaticInitializer::msgTypeMap.find(type);
+
+            if( iter == StaticInitializer::msgTypeMap.end() ){
+                return NUM_MSG_TYPES;
+            }
+                    
+            return iter->second;
+        }  
+
+        static cms::Destination* toDestination( const std::string& dest )
+            throw ( exceptions::IllegalArgumentException );
+
+        class StaticInitializer{
+        public:
+            StaticInitializer();
+            virtual ~StaticInitializer(){}
+            
+            static std::string stompHeaders[NUM_STOMP_HEADERS];
+            static std::string commands[NUM_COMMANDS];
+            static std::string ackModes[NUM_ACK_MODES];
+            static std::string msgTypes[NUM_MSG_TYPES];
+            static std::map<std::string, StompHeader> stompHeaderMap;
+            static std::map<std::string, CommandId> commandMap;
+            static std::map<std::string, AckMode> ackModeMap;
+            static std::map<std::string, MessageType> msgTypeMap;
+        };
+        
+    private:
+    
+        static StaticInitializer staticInits;        
+    };
+    
+}}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMANDCONSTANTS_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMITCOMMAND_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMITCOMMAND_H_
+
+#include <activemq/connector/stomp/commands/AbstractCommand.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/transport/Command.h>
+#include <string>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+namespace commands{
+    
+    /**
+     * Commits a Transaction.
+     */
+    class CommitCommand  : public AbstractCommand< transport::Command >
+    {
+    public:
+   
+        CommitCommand(void) :
+            AbstractCommand<transport::Command>() {
+                initialize( getFrame() );
+        }
+        CommitCommand( StompFrame* frame ) : 
+            AbstractCommand<transport::Command>(frame) {
+                validate( getFrame() );
+        }
+        virtual ~CommitCommand(void) {}
+
+    protected:
+    
+        /**
+         * Inheritors are required to override this method to init the
+         * frame with data appropriate for the command type.
+         * @param Frame to init
+         */
+        virtual void initialize( StompFrame& frame )
+        {
+            frame.setCommand( CommandConstants::toString(
+                CommandConstants::COMMIT ) );
+        }
+
+        /**
+         * Inheritors are required to override this method to validate 
+         * the passed stomp frame before it is marshalled or unmarshaled
+         * @param Frame to validate
+         * @returns true if frame is valid
+         */
+        virtual bool validate( const StompFrame& frame ) const
+        {
+            if((frame.getCommand() == 
+                CommandConstants::toString( CommandConstants::COMMIT )) &&
+               (frame.getProperties().hasProperty(
+                    CommandConstants::toString( 
+                        CommandConstants::HEADER_TRANSACTIONID ) ) ) )
+            {
+                return true;
+            }
+
+            return false;
+        }
+
+    };
+
+}}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_COMMANDS_COMMITCOMMAND_H_*/



Mime
View raw message