activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r419365 [5/25] - in /incubator/activemq/trunk: activemq-core/src/main/java/org/apache/activemq/thread/ activemq-core/src/test/java/org/apache/activemq/openwire/v1/ activemq-cpp/src/main/activemq/concurrent/ activemq-cpp/src/main/activemq/co...
Date Wed, 05 Jul 2006 22:27:47 GMT
Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp Wed Jul  5 15:27:34 2006
@@ -1,795 +1,795 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-#include <activemq/connector/stomp/StompConnector.h>
-#include <activemq/concurrent/Concurrent.h>
-#include <activemq/transport/BrokerError.h>
-#include <activemq/transport/Transport.h>
-#include <activemq/transport/ExceptionResponse.h>
-#include <activemq/connector/stomp/StompTopic.h>
-#include <activemq/connector/stomp/StompQueue.h>
-#include <activemq/connector/stomp/commands/ConnectCommand.h>
-#include <activemq/connector/stomp/commands/ErrorCommand.h>
-#include <activemq/connector/stomp/commands/BeginCommand.h>
-#include <activemq/connector/stomp/commands/AbortCommand.h>
-#include <activemq/connector/stomp/commands/AckCommand.h>
-#include <activemq/connector/stomp/commands/CommitCommand.h>
-#include <activemq/connector/stomp/commands/MessageCommand.h>
-#include <activemq/connector/stomp/commands/BytesMessageCommand.h>
-#include <activemq/connector/stomp/commands/TextMessageCommand.h>
-#include <activemq/connector/stomp/commands/ConnectedCommand.h>
-#include <activemq/connector/stomp/commands/DisconnectCommand.h>
-#include <activemq/exceptions/UnsupportedOperationException.h>
-#include <activemq/connector/stomp/StompProducerInfo.h>
-#include <activemq/connector/stomp/StompTransactionInfo.h>
-#include <activemq/util/Integer.h>
-
-using namespace std;
-using namespace activemq;
-using namespace activemq::connector;
-using namespace activemq::util;
-using namespace activemq::transport;
-using namespace activemq::exceptions;
-using namespace activemq::connector::stomp;
-using namespace activemq::connector::stomp::commands;
-
-////////////////////////////////////////////////////////////////////////////////
-StompConnector::StompConnector( Transport* transport, 
-                                const util::Properties& properties )
-    throw ( IllegalArgumentException )
-{
-    if(transport == NULL)
-    {
-        throw IllegalArgumentException(
-            __FILE__, __LINE__,
-            "StompConnector::StompConnector - Transport cannot be NULL");
-    }
-    
-    this->transport = transport;
-    this->state = DISCONNECTED;
-    this->exceptionListener = NULL;
-    this->messageListener = NULL;
-    this->sessionManager = NULL;
-    this->nextProducerId = 0;
-    this->nextTransactionId = 0;
-    this->properties.copy( &properties );
-    
-    // Observe the transport for events.
-    this->transport->setCommandListener( this );
-    this->transport->setTransportExceptionListener( this );
-
-    // Setup the reader and writer in the transport.
-    this->transport->setCommandReader( &reader );
-    this->transport->setCommandWriter( &writer );
-
-    // Register ourself for those commands that we process    
-    addCmdListener( CommandConstants::ERROR_CMD, this );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-StompConnector::~StompConnector(void)
-{
-    try
-    {
-        close();
-        
-        delete sessionManager;
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-unsigned int StompConnector::getNextProducerId(void)
-{
-    synchronized(&mutex)
-    {
-        return nextProducerId++;
-    }
-    
-    return 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-unsigned int StompConnector::getNextTransactionId(void)
-{
-    synchronized(&mutex)
-    {
-        return nextTransactionId++;
-    }
-    
-    return 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::enforceConnected( void ) throw ( ConnectorException )
-{
-    if( state != CONNECTED )
-    {
-        throw StompConnectorException(
-            __FILE__, __LINE__,
-            "StompConnector::enforceConnected - Not Connected!" );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::addCmdListener( 
-    commands::CommandConstants::CommandId commandId,
-    StompCommandListener* listener )
-{
-    cmdListenerMap.insert( make_pair( commandId, listener ) );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::removeCmdListener( 
-    commands::CommandConstants::CommandId commandId )
-{
-    cmdListenerMap.erase(commandId);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::start(void) throw( cms::CMSException )
-{
-    try
-    {
-        synchronized( &mutex )
-        {
-            if( state == CONNECTED )
-            {
-                throw ActiveMQException( 
-                    __FILE__, __LINE__, 
-                    "StompConnector::start - already started" );
-            }
-                
-            // Start the transport - this establishes the socket.
-            transport->start();
-
-            // Send the connect message to the broker.
-            connect();         
-        }        
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::close(void) throw( cms::CMSException ){
-    
-    try
-    {
-        synchronized( &mutex )
-        {  
-            if( state == this->CONNECTED )
-            {
-                // Send the disconnect message to the broker.
-                disconnect();
-
-                // Close the transport.
-                transport->close();
-            }
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::connect(void)
-{
-    try
-    {
-        // Mark this connector as started.
-        state = this->CONNECTING;
-
-        // Send the connect command to the broker 
-        ConnectCommand cmd;
-
-        // Encode User Name and Password and Client ID
-        string login = getLogin();
-        if( login.length() > 0 ){
-            cmd.setLogin( login );
-        }        
-        string password = getPassword();
-        if( password.length() > 0 ){
-            cmd.setPassword( password );
-        }        
-        string clientId = getClientId();
-        if( clientId.length() > 0 ){
-            cmd.setClientId( clientId );
-        }
-
-        Response* response = transport->request( &cmd );
-        
-        if( dynamic_cast< ExceptionResponse* >( response ) != NULL )
-        {
-            throw StompConnectorException(
-                __FILE__, __LINE__,
-                "StompConnector::connect - Failed on Connect Request" );
-        }
-
-        ConnectedCommand* connected = 
-            dynamic_cast< ConnectedCommand* >( response );
-
-        if( connected == NULL )
-        {
-            throw StompConnectorException(
-                __FILE__, __LINE__,
-                "StompConnector::connect - "
-                "Response not a connected response" );            
-        }
-
-        // Connected so we now create the SessionManager
-        sessionManager = new StompSessionManager(
-            connected->getSessionId(), transport );
-
-        // Give our message listener to the session manager it will
-        // notify all the interested clients                
-        sessionManager->setConsumerMessageListener( messageListener );
-
-        // Add the Session Manager as the Command Listener for 
-        // Message commands so that it can route them to the 
-        // correct consumers.
-        addCmdListener( CommandConstants::MESSAGE, sessionManager );
-        
-        // In Stomp, the client Id is the same as the session id that is
-        // returned in the Connected response
-        properties.setProperty( 
-            commands::CommandConstants::toString( 
-                commands::CommandConstants::HEADER_CLIENT_ID ),
-            connected->getSessionId() );        
-
-        // Tag us in the Connected State now.
-        state = CONNECTED;
-        
-        // Clean up
-        delete response;
-    }
-    AMQ_CATCH_RETHROW( BrokerError )
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::disconnect(void)
-{
-    try
-    {
-        // Mark state as no longer connected.
-        state = this->DISCONNECTED;
-
-        // Send the disconnect command to the broker.
-        DisconnectCommand cmd;
-        transport->oneway( &cmd );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-SessionInfo* StompConnector::createSession(
-    cms::Session::AcknowledgeMode ackMode) 
-        throw( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        return sessionManager->createSession( ackMode );
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ConsumerInfo* StompConnector::createConsumer(
-    cms::Destination* destination, 
-    SessionInfo* session,
-    const std::string& selector)
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        return sessionManager->createConsumer( 
-            destination, session, selector );
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ConsumerInfo* StompConnector::createDurableConsumer(
-    cms::Topic* topic, 
-    SessionInfo* session,
-    const std::string& name,
-    const std::string& selector,
-    bool noLocal)
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        return sessionManager->createDurableConsumer( 
-            topic, session, name, selector, noLocal );
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ProducerInfo* StompConnector::createProducer(
-    cms::Destination* destination, 
-    SessionInfo* session)
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        ProducerInfo* producer = new StompProducerInfo();
-        
-        producer->setDestination( *destination );
-        producer->setProducerId( getNextProducerId() );
-        producer->setSessionInfo( session );
-        
-        return producer;
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Topic* StompConnector::createTopic(const std::string& name, 
-                                        SessionInfo* session)
-    throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        return new StompTopic(name);
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Queue* StompConnector::createQueue(const std::string& name, 
-                                        SessionInfo* session)
-    throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        return new StompQueue(name);
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TemporaryTopic* StompConnector::createTemporaryTopic(
-    SessionInfo* session)
-        throw ( ConnectorException )
-{
-    try
-    {
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__, 
-            "StompConnector::createTemporaryTopic - No Stomp Support");
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TemporaryQueue* StompConnector::createTemporaryQueue(
-    SessionInfo* session)
-        throw ( ConnectorException )
-{
-    try
-    {
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__, 
-            "StompConnector::createTemporaryQueue - No Stomp Support");
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::send(cms::Message* message, 
-                          ProducerInfo* producerInfo) 
-    throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        const SessionInfo* session = producerInfo->getSessionInfo();
-        Command* command = dynamic_cast< transport::Command* >( message );
-        
-        if( command == NULL )
-        {
-            throw StompConnectorException(
-                __FILE__, __LINE__,
-                "StompConnector::send - "
-                "Message is not a valid stomp type.");
-        }
-
-        if( session->getAckMode() == cms::Session::Transactional )
-        {
-            StompCommand* stompCommand = 
-                dynamic_cast< StompCommand* >( message );
-
-            if( stompCommand == NULL )
-            {
-                throw StompConnectorException(
-                    __FILE__, __LINE__,
-                    "StompConnector::send - "
-                    "Message is not a valid stomp type.");
-            }
-    
-            stompCommand->setTransactionId(
-                Integer::toString( 
-                    session->getTransactionInfo()->getTransactionId() ) );
-        }
-        
-        // Send it
-        transport->oneway( command );
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::send(std::list<cms::Message*>& messages,
-                          ProducerInfo* producerInfo) 
-    throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        list<cms::Message*>::const_iterator itr = messages.begin();
-        
-        for(; itr != messages.end(); ++itr)
-        {
-            this->send(*itr, producerInfo);
-        }
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::acknowledge( const SessionInfo* session,
-                                  const cms::Message* message,
-                                  AckType ackType = ConsumedAck )
-    throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        // Auto to Stomp means don't do anything, so we drop it here
-        // for client acknowledge we have to send and ack.  
-        if( session->getAckMode() == cms::Session::ClientAcknowledge )
-        {
-            AckCommand cmd;
-
-            if( message->getCMSMessageId() == NULL )
-            {
-                throw StompConnectorException(
-                    __FILE__, __LINE__,
-                    "StompConnector::send - "
-                    "Message has no Message Id, cannot ack.");
-            }
-
-            cmd.setMessageId( message->getCMSMessageId() );
-
-            if( session->getAckMode() == cms::Session::Transactional )
-            {
-                cmd.setTransactionId( 
-                    Integer::toString( 
-                        session->getTransactionInfo()->getTransactionId() ) );
-            }
-            
-            transport->oneway( &cmd );
-        }
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-TransactionInfo* StompConnector::startTransaction(
-    SessionInfo* session) 
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        TransactionInfo* transaction = new StompTransactionInfo();
-        
-        transaction->setTransactionId( getNextTransactionId() );
-        
-        session->setTransactionInfo( transaction );
-
-        BeginCommand cmd;
-
-        cmd.setTransactionId( 
-                Integer::toString( transaction->getTransactionId() ) );
-        
-        transport->oneway( &cmd );
-        
-        return transaction;
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::commit(TransactionInfo* transaction, 
-                            SessionInfo* session)
-    throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        CommitCommand cmd;
-        
-        cmd.setTransactionId( 
-                Integer::toString( transaction->getTransactionId() ) );
-        
-        transport->oneway( &cmd );
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::rollback(TransactionInfo* transaction, 
-                              SessionInfo* session)
-    throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        AbortCommand cmd;
-        
-        cmd.setTransactionId( 
-                Integer::toString( transaction->getTransactionId() ) );
-        
-        transport->oneway( &cmd );
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::Message* StompConnector::createMessage(
-    SessionInfo* session,
-    TransactionInfo* transaction)
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        MessageCommand* cmd = new MessageCommand();
-        
-        if( transaction != NULL )
-        {
-            cmd->setTransactionId( 
-                Integer::toString( transaction->getTransactionId() ) );
-        }
-        
-        return cmd;
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* StompConnector::createBytesMessage(
-    SessionInfo* session,
-    TransactionInfo* transaction)
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        BytesMessageCommand* cmd = new BytesMessageCommand();
-        
-        if( transaction != NULL )
-        {
-            cmd->setTransactionId( 
-                Integer::toString( transaction->getTransactionId() ) );
-        }
-        
-        return cmd;
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* StompConnector::createTextMessage(
-    SessionInfo* session,
-    TransactionInfo* transaction)
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-        
-        TextMessageCommand* cmd = new TextMessageCommand;
-        
-        if( transaction != NULL )
-        {
-            cmd->setTransactionId( 
-                Integer::toString( transaction->getTransactionId() ) );
-        }
-        
-        return cmd;
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-cms::MapMessage* StompConnector::createMapMessage(
-    SessionInfo* session,
-    TransactionInfo* transaction)
-        throw ( ConnectorException )
-{
-    try
-    {
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__, 
-            "StompConnector::createTemporaryQueue - No Stomp Support");
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::unsubscribe(const std::string& name)
-    throw ( ConnectorException )
-{
-    try
-    {
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__, 
-            "StompConnector::createTemporaryQueue - No Stomp Support");
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::destroyResource( ConnectorResource* resource )
-    throw ( ConnectorException )
-{
-    try
-    {
-        ConsumerInfo* consumer = 
-            dynamic_cast<ConsumerInfo*>(resource);
-        SessionInfo* session = 
-            dynamic_cast<SessionInfo*>(resource);
-
-        if( consumer != NULL)
-        {
-            sessionManager->removeConsumer( consumer );
-        }
-        else if( session != NULL)
-        {
-            sessionManager->removeSession( session );
-        }
-
-        // No matter what we end it here.
-        delete resource;
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::onCommand( transport::Command* command )
-{
-    try
-    {
-        StompCommand* stompCommand = dynamic_cast< StompCommand* >(command);
-
-        if(stompCommand == NULL)
-        {
-            fire( ConnectorException(
-                __FILE__, __LINE__,
-                "StompConnector::onCommand - Recieved an unknown Command") );
-        }
-
-        CmdListenerMap::iterator itr = 
-            cmdListenerMap.find( stompCommand->getStompCommandId() );
-            
-        if( itr == cmdListenerMap.end() )
-        {
-            fire( ConnectorException(
-                __FILE__, __LINE__,
-                "StompConnector::onCommand - "
-                "Recieved command with no listener") );
-
-            // This isn't going an farther, so delete it.
-            delete command;
-
-            return;   // we are done
-        }
-        
-        // Hand off
-        itr->second->onStompCommand( stompCommand );         
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::onTransportException( 
-    transport::Transport* source, 
-    const exceptions::ActiveMQException& ex )
-{
-    try
-    {
-        // Inform the user.
-        fire( ex );
-        
-        // Close down.
-        close();
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( ConnectorException );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void StompConnector::onStompCommand( commands::StompCommand* command ) 
-    throw ( StompConnectorException )
-{
-    try
-    {        
-        ErrorCommand* error = 
-            dynamic_cast<ErrorCommand*>(command);
-        
-        if(error != NULL)
-        {
-            fire( StompConnectorException(
-                __FILE__, __LINE__,
-                (string( "StompConnector::onStompCommand - " ) + 
-                error->getErrorMessage() ).c_str() ) );
-                
-            // Shutdown
-            close();
-        }
-    }
-    AMQ_CATCH_RETHROW( StompConnectorException )
-    AMQ_CATCHALL_THROW( StompConnectorException );
-}
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/transport/BrokerError.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/ExceptionResponse.h>
+#include <activemq/connector/stomp/StompTopic.h>
+#include <activemq/connector/stomp/StompQueue.h>
+#include <activemq/connector/stomp/commands/ConnectCommand.h>
+#include <activemq/connector/stomp/commands/ErrorCommand.h>
+#include <activemq/connector/stomp/commands/BeginCommand.h>
+#include <activemq/connector/stomp/commands/AbortCommand.h>
+#include <activemq/connector/stomp/commands/AckCommand.h>
+#include <activemq/connector/stomp/commands/CommitCommand.h>
+#include <activemq/connector/stomp/commands/MessageCommand.h>
+#include <activemq/connector/stomp/commands/BytesMessageCommand.h>
+#include <activemq/connector/stomp/commands/TextMessageCommand.h>
+#include <activemq/connector/stomp/commands/ConnectedCommand.h>
+#include <activemq/connector/stomp/commands/DisconnectCommand.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
+#include <activemq/connector/stomp/StompProducerInfo.h>
+#include <activemq/connector/stomp/StompTransactionInfo.h>
+#include <activemq/util/Integer.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::connector;
+using namespace activemq::util;
+using namespace activemq::transport;
+using namespace activemq::exceptions;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+StompConnector::StompConnector( Transport* transport, 
+                                const util::Properties& properties )
+    throw ( IllegalArgumentException )
+{
+    if(transport == NULL)
+    {
+        throw IllegalArgumentException(
+            __FILE__, __LINE__,
+            "StompConnector::StompConnector - Transport cannot be NULL");
+    }
+    
+    this->transport = transport;
+    this->state = DISCONNECTED;
+    this->exceptionListener = NULL;
+    this->messageListener = NULL;
+    this->sessionManager = NULL;
+    this->nextProducerId = 0;
+    this->nextTransactionId = 0;
+    this->properties.copy( &properties );
+    
+    // Observe the transport for events.
+    this->transport->setCommandListener( this );
+    this->transport->setTransportExceptionListener( this );
+
+    // Setup the reader and writer in the transport.
+    this->transport->setCommandReader( &reader );
+    this->transport->setCommandWriter( &writer );
+
+    // Register ourself for those commands that we process    
+    addCmdListener( CommandConstants::ERROR_CMD, this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompConnector::~StompConnector(void)
+{
+    try
+    {
+        close();
+        
+        delete sessionManager;
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompConnector::getNextProducerId(void)
+{
+    synchronized(&mutex)
+    {
+        return nextProducerId++;
+    }
+    
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompConnector::getNextTransactionId(void)
+{
+    synchronized(&mutex)
+    {
+        return nextTransactionId++;
+    }
+    
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::enforceConnected( void ) throw ( ConnectorException )
+{
+    if( state != CONNECTED )
+    {
+        throw StompConnectorException(
+            __FILE__, __LINE__,
+            "StompConnector::enforceConnected - Not Connected!" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::addCmdListener( 
+    commands::CommandConstants::CommandId commandId,
+    StompCommandListener* listener )
+{
+    cmdListenerMap.insert( make_pair( commandId, listener ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::removeCmdListener( 
+    commands::CommandConstants::CommandId commandId )
+{
+    cmdListenerMap.erase(commandId);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::start(void) throw( cms::CMSException )
+{
+    try
+    {
+        synchronized( &mutex )
+        {
+            if( state == CONNECTED )
+            {
+                throw ActiveMQException( 
+                    __FILE__, __LINE__, 
+                    "StompConnector::start - already started" );
+            }
+                
+            // Start the transport - this establishes the socket.
+            transport->start();
+
+            // Send the connect message to the broker.
+            connect();         
+        }        
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::close(void) throw( cms::CMSException ){
+    
+    try
+    {
+        synchronized( &mutex )
+        {  
+            if( state == this->CONNECTED )
+            {
+                // Send the disconnect message to the broker.
+                disconnect();
+
+                // Close the transport.
+                transport->close();
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::connect(void)
+{
+    try
+    {
+        // Mark this connector as started.
+        state = this->CONNECTING;
+
+        // Send the connect command to the broker 
+        ConnectCommand cmd;
+
+        // Encode User Name and Password and Client ID
+        string login = getLogin();
+        if( login.length() > 0 ){
+            cmd.setLogin( login );
+        }        
+        string password = getPassword();
+        if( password.length() > 0 ){
+            cmd.setPassword( password );
+        }        
+        string clientId = getClientId();
+        if( clientId.length() > 0 ){
+            cmd.setClientId( clientId );
+        }
+
+        Response* response = transport->request( &cmd );
+        
+        if( dynamic_cast< ExceptionResponse* >( response ) != NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::connect - Failed on Connect Request" );
+        }
+
+        ConnectedCommand* connected = 
+            dynamic_cast< ConnectedCommand* >( response );
+
+        if( connected == NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::connect - "
+                "Response not a connected response" );            
+        }
+
+        // Connected so we now create the SessionManager
+        sessionManager = new StompSessionManager(
+            connected->getSessionId(), transport );
+
+        // Give our message listener to the session manager it will
+        // notify all the interested clients                
+        sessionManager->setConsumerMessageListener( messageListener );
+
+        // Add the Session Manager as the Command Listener for 
+        // Message commands so that it can route them to the 
+        // correct consumers.
+        addCmdListener( CommandConstants::MESSAGE, sessionManager );
+        
+        // In Stomp, the client Id is the same as the session id that is
+        // returned in the Connected response
+        properties.setProperty( 
+            commands::CommandConstants::toString( 
+                commands::CommandConstants::HEADER_CLIENT_ID ),
+            connected->getSessionId() );        
+
+        // Tag us in the Connected State now.
+        state = CONNECTED;
+        
+        // Clean up
+        delete response;
+    }
+    AMQ_CATCH_RETHROW( BrokerError )
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::disconnect(void)
+{
+    try
+    {
+        // Mark state as no longer connected.
+        state = this->DISCONNECTED;
+
+        // Send the disconnect command to the broker.
+        DisconnectCommand cmd;
+        transport->oneway( &cmd );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SessionInfo* StompConnector::createSession(
+    cms::Session::AcknowledgeMode ackMode) 
+        throw( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return sessionManager->createSession( ackMode );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConsumerInfo* StompConnector::createConsumer(
+    cms::Destination* destination, 
+    SessionInfo* session,
+    const std::string& selector)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return sessionManager->createConsumer( 
+            destination, session, selector );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConsumerInfo* StompConnector::createDurableConsumer(
+    cms::Topic* topic, 
+    SessionInfo* session,
+    const std::string& name,
+    const std::string& selector,
+    bool noLocal)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return sessionManager->createDurableConsumer( 
+            topic, session, name, selector, noLocal );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ProducerInfo* StompConnector::createProducer(
+    cms::Destination* destination, 
+    SessionInfo* session)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        ProducerInfo* producer = new StompProducerInfo();
+        
+        producer->setDestination( *destination );
+        producer->setProducerId( getNextProducerId() );
+        producer->setSessionInfo( session );
+        
+        return producer;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Topic* StompConnector::createTopic(const std::string& name, 
+                                        SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return new StompTopic(name);
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Queue* StompConnector::createQueue(const std::string& name, 
+                                        SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return new StompQueue(name);
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryTopic* StompConnector::createTemporaryTopic(
+    SessionInfo* session)
+        throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryTopic - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryQueue* StompConnector::createTemporaryQueue(
+    SessionInfo* session)
+        throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryQueue - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::send(cms::Message* message, 
+                          ProducerInfo* producerInfo) 
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        const SessionInfo* session = producerInfo->getSessionInfo();
+        Command* command = dynamic_cast< transport::Command* >( message );
+        
+        if( command == NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::send - "
+                "Message is not a valid stomp type.");
+        }
+
+        if( session->getAckMode() == cms::Session::Transactional )
+        {
+            StompCommand* stompCommand = 
+                dynamic_cast< StompCommand* >( message );
+
+            if( stompCommand == NULL )
+            {
+                throw StompConnectorException(
+                    __FILE__, __LINE__,
+                    "StompConnector::send - "
+                    "Message is not a valid stomp type.");
+            }
+    
+            stompCommand->setTransactionId(
+                Integer::toString( 
+                    session->getTransactionInfo()->getTransactionId() ) );
+        }
+        
+        // Send it
+        transport->oneway( command );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::send(std::list<cms::Message*>& messages,
+                          ProducerInfo* producerInfo) 
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        list<cms::Message*>::const_iterator itr = messages.begin();
+        
+        for(; itr != messages.end(); ++itr)
+        {
+            this->send(*itr, producerInfo);
+        }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::acknowledge( const SessionInfo* session,
+                                  const cms::Message* message,
+                                  AckType ackType = ConsumedAck )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        // Auto to Stomp means don't do anything, so we drop it here
+        // for client acknowledge we have to send and ack.  
+        if( session->getAckMode() == cms::Session::ClientAcknowledge )
+        {
+            AckCommand cmd;
+
+            if( message->getCMSMessageId() == NULL )
+            {
+                throw StompConnectorException(
+                    __FILE__, __LINE__,
+                    "StompConnector::send - "
+                    "Message has no Message Id, cannot ack.");
+            }
+
+            cmd.setMessageId( message->getCMSMessageId() );
+
+            if( session->getAckMode() == cms::Session::Transactional )
+            {
+                cmd.setTransactionId( 
+                    Integer::toString( 
+                        session->getTransactionInfo()->getTransactionId() ) );
+            }
+            
+            transport->oneway( &cmd );
+        }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TransactionInfo* StompConnector::startTransaction(
+    SessionInfo* session) 
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        TransactionInfo* transaction = new StompTransactionInfo();
+        
+        transaction->setTransactionId( getNextTransactionId() );
+        
+        session->setTransactionInfo( transaction );
+
+        BeginCommand cmd;
+
+        cmd.setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        
+        transport->oneway( &cmd );
+        
+        return transaction;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::commit(TransactionInfo* transaction, 
+                            SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        CommitCommand cmd;
+        
+        cmd.setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        
+        transport->oneway( &cmd );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::rollback(TransactionInfo* transaction, 
+                              SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        AbortCommand cmd;
+        
+        cmd.setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        
+        transport->oneway( &cmd );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* StompConnector::createMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        MessageCommand* cmd = new MessageCommand();
+        
+        if( transaction != NULL )
+        {
+            cmd->setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        }
+        
+        return cmd;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* StompConnector::createBytesMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        BytesMessageCommand* cmd = new BytesMessageCommand();
+        
+        if( transaction != NULL )
+        {
+            cmd->setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        }
+        
+        return cmd;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* StompConnector::createTextMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        TextMessageCommand* cmd = new TextMessageCommand;
+        
+        if( transaction != NULL )
+        {
+            cmd->setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        }
+        
+        return cmd;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MapMessage* StompConnector::createMapMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryQueue - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::unsubscribe(const std::string& name)
+    throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryQueue - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::destroyResource( ConnectorResource* resource )
+    throw ( ConnectorException )
+{
+    try
+    {
+        ConsumerInfo* consumer = 
+            dynamic_cast<ConsumerInfo*>(resource);
+        SessionInfo* session = 
+            dynamic_cast<SessionInfo*>(resource);
+
+        if( consumer != NULL)
+        {
+            sessionManager->removeConsumer( consumer );
+        }
+        else if( session != NULL)
+        {
+            sessionManager->removeSession( session );
+        }
+
+        // No matter what we end it here.
+        delete resource;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::onCommand( transport::Command* command )
+{
+    try
+    {
+        StompCommand* stompCommand = dynamic_cast< StompCommand* >(command);
+
+        if(stompCommand == NULL)
+        {
+            fire( ConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::onCommand - Recieved an unknown Command") );
+        }
+
+        CmdListenerMap::iterator itr = 
+            cmdListenerMap.find( stompCommand->getStompCommandId() );
+            
+        if( itr == cmdListenerMap.end() )
+        {
+            fire( ConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::onCommand - "
+                "Recieved command with no listener") );
+
+            // This isn't going an farther, so delete it.
+            delete command;
+
+            return;   // we are done
+        }
+        
+        // Hand off
+        itr->second->onStompCommand( stompCommand );         
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::onTransportException( 
+    transport::Transport* source, 
+    const exceptions::ActiveMQException& ex )
+{
+    try
+    {
+        // Inform the user.
+        fire( ex );
+        
+        // Close down.
+        close();
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::onStompCommand( commands::StompCommand* command ) 
+    throw ( StompConnectorException )
+{
+    try
+    {        
+        ErrorCommand* error = 
+            dynamic_cast<ErrorCommand*>(command);
+        
+        if(error != NULL)
+        {
+            fire( StompConnectorException(
+                __FILE__, __LINE__,
+                (string( "StompConnector::onStompCommand - " ) + 
+                error->getErrorMessage() ).c_str() ) );
+                
+            // Shutdown
+            close();
+        }
+    }
+    AMQ_CATCH_RETHROW( StompConnectorException )
+    AMQ_CATCHALL_THROW( StompConnectorException );
+}

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h?rev=419365&r1=419364&r2=419365&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h Wed Jul  5 15:27:34 2006
@@ -1,533 +1,533 @@
-/*
- * Copyright 2006 The Apache Software Foundation or its licensors, as
- * applicable.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- 
-#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_
-#define ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_
-
-#include <activemq/connector/Connector.h>
-#include <activemq/transport/Transport.h>
-#include <activemq/transport/CommandListener.h>
-#include <activemq/transport/TransportExceptionListener.h>
-#include <activemq/concurrent/Mutex.h>
-#include <activemq/util/Properties.h>
-#include <activemq/connector/stomp/StompCommandReader.h>
-#include <activemq/connector/stomp/StompCommandWriter.h>
-#include <activemq/connector/stomp/StompCommandListener.h>
-#include <activemq/connector/stomp/StompSessionManager.h>
-#include <activemq/connector/stomp/commands/CommandConstants.h>
-#include <activemq/exceptions/IllegalArgumentException.h>
-
-namespace activemq{
-namespace connector{
-namespace stomp{
-   
-    /**
-     * The connector implementation for the STOMP protocol.
-     */
-    class StompConnector
-    :
-        public Connector,
-        public transport::CommandListener,
-        public transport::TransportExceptionListener,
-        public StompCommandListener
-    {
-    private:
-    
-        // Flags the state we are in for connection to broker.
-        enum connectionState
-        {
-            DISCONNECTED,
-            CONNECTING,
-            CONNECTED
-        };
-
-        // Maps Command Ids to listener that are interested        
-        typedef std::map< commands::CommandConstants::CommandId, 
-                          StompCommandListener*> CmdListenerMap;
-        
-    private:
-    
-        /**
-         * The transport for sending/receiving commands on the wire.
-         */
-        transport::Transport* transport;
-        
-        /**
-         * Flag to indicate the start state of the connector.
-         */
-        connectionState state;
-        
-        /**
-         * Sync object.
-         */
-        concurrent::Mutex mutex;
-        
-        /**
-         * Observer of messages directed at a particular
-         * consumer.
-         */
-        ConsumerMessageListener* messageListener;
-        
-        /**
-         * Observer of connector exceptions.
-         */
-        cms::ExceptionListener* exceptionListener;
-        
-        /**
-         * This Connector's Command Reader
-         */
-        StompCommandReader reader;
-        
-        /**
-         * This Connector's Command Writer
-         */
-        StompCommandWriter writer;
-        
-        /**
-         * Map to hold StompCommandListeners
-         */
-        CmdListenerMap cmdListenerMap;
-        
-        /**
-         * Session Manager object that will  be allocated when we connect
-         */
-        StompSessionManager* sessionManager;
-        
-        /**
-         * Next avaliable Producer Id
-         */
-        unsigned int nextProducerId;
-        
-        /**
-         * Next avaliable Transaction Id
-         */
-        unsigned int nextTransactionId;
-        
-        /**
-         * Properties for the connector.
-         */
-        util::SimpleProperties properties;
-
-    private:
-    
-        /**
-         * Sends the connect message to the broker and
-         * waits for the response.
-         */
-        void connect(void);
-        
-        /**
-         * Sends a oneway disconnect message to the broker.
-         */
-        void disconnect(void);
-        
-        /**
-         * Fires a consumer message to the observer.
-         */
-        void fire( ConsumerInfo* consumer, core::ActiveMQMessage* msg ){
-            try{
-                if( messageListener != NULL ){
-                    messageListener->onConsumerMessage( 
-                        consumer,
-                        msg );
-                }
-            }catch( ... ){/* do nothing*/}
-        }
-        
-        /**
-         * Fires an exception event to the observing object.
-         */
-        void fire( const exceptions::ActiveMQException& ex ){
-            try{
-                if( exceptionListener != NULL ){
-                    exceptionListener->onException( ex );
-                }
-            }catch( ... ){/* do nothing*/}
-        }
-        
-    public:
-    
-        /**
-         * Constructor for the stomp connector.
-         * @param transport the transport object for sending/receiving
-         * commands on the wire.
-         * @param props properties for configuring the connector.
-         */
-        StompConnector( transport::Transport* transport, 
-                        const util::Properties& properties )
-            throw ( exceptions::IllegalArgumentException );
-
-        virtual ~StompConnector(void);
-        
-        /**
-         * Starts the service.
-         * @throws CMSException
-         */
-        virtual void start(void) throw( cms::CMSException );
-        
-        /**
-         * Closes this object and deallocates the appropriate resources.
-         * @throws CMSException
-         */
-        virtual void close(void) throw( cms::CMSException );
-
-        /**
-         * Gets the Client Id for this connection, if this
-         * connection has been closed, then this method returns ""
-         * @return Client Id String
-         */
-        virtual std::string getClientId(void) const {
-            return properties.getProperty( 
-                commands::CommandConstants::toString( 
-                    commands::CommandConstants::HEADER_CLIENT_ID ), "" );
-        }
-        
-        virtual std::string getLogin(void) const {
-            return properties.getProperty( 
-                commands::CommandConstants::toString( 
-                    commands::CommandConstants::HEADER_LOGIN ), "" );
-        }
-        
-        virtual std::string getPassword(void) const {
-            return properties.getProperty( 
-                commands::CommandConstants::toString( 
-                    commands::CommandConstants::HEADER_PASSWORD ), "" );
-        }
-
-        /**
-         * Gets a reference to the Transport that this connection
-         * is using.
-         * @param reference to a transport
-         * @throws InvalidStateException if the Transport is not set
-         */
-        virtual transport::Transport& getTransport(void) const 
-            throw (exceptions::InvalidStateException ) {
-
-            if( transport == NULL ) {
-                throw exceptions::InvalidStateException(
-                    __FILE__, __LINE__,
-                    "StompConnector::getTransport - "
-                    "Invalid State, No Transport.");
-            }
-            
-            return *transport;
-        }
-
-        /**
-         * Creates a Session Info object for this connector
-         * @param Acknowledgement Mode of the Session
-         * @returns Session Info Object
-         * @throws ConnectorException
-         */
-        virtual SessionInfo* createSession(
-            cms::Session::AcknowledgeMode ackMode) 
-                throw( ConnectorException );
-      
-        /** 
-         * Create a Consumer for the given Session
-         * @param Destination to Subscribe to.
-         * @param Session Information.
-         * @return Consumer Information
-         * @throws ConnectorException
-         */
-        virtual ConsumerInfo* createConsumer(
-            cms::Destination* destination, 
-            SessionInfo* session,
-            const std::string& selector = "")
-                throw ( ConnectorException );
-         
-        /** 
-         * Create a Durable Consumer for the given Session
-         * @param Topic to Subscribe to.
-         * @param Session Information.
-         * @param name of the Durable Topic
-         * @param Selector
-         * @param if set, inhibits the delivery of messages 
-         *        published by its own connection 
-         * @return Consumer Information
-         * @throws ConnectorException
-         */
-        virtual ConsumerInfo* createDurableConsumer(
-            cms::Topic* topic, 
-            SessionInfo* session,
-            const std::string& name,
-            const std::string& selector = "",
-            bool noLocal = false)
-                throw ( ConnectorException );
-
-        /** 
-         * Create a Consumer for the given Session
-         * @param Destination to Subscribe to.
-         * @param Session Information.
-         * @return Producer Information
-         * @throws ConnectorException
-         */
-        virtual ProducerInfo* createProducer(
-            cms::Destination* destination, 
-            SessionInfo* session)
-                throw ( ConnectorException );
-
-        /**
-         * Creates a Topic given a name and session info
-         * @param Topic Name
-         * @param Session Information
-         * @return a newly created Topic Object
-         * @throws ConnectorException
-         */
-        virtual cms::Topic* createTopic( const std::string& name, 
-                                         SessionInfo* session )
-            throw ( ConnectorException );
-          
-        /**
-         * Creates a Queue given a name and session info
-         * @param Queue Name
-         * @param Session Information
-         * @return a newly created Queue Object
-         * @throws ConnectorException
-         */
-        virtual cms::Queue* createQueue( const std::string& name, 
-                                         SessionInfo* session )
-            throw ( ConnectorException );
-
-        /**
-         * Creates a Temporary Topic given a name and session info
-         * @param Temporary Topic Name
-         * @param Session Information
-         * @return a newly created Temporary Topic Object
-         * @throws ConnectorException
-         */
-        virtual cms::TemporaryTopic* createTemporaryTopic(
-            SessionInfo* session)
-                throw ( ConnectorException );
-          
-        /**
-         * Creates a Temporary Queue given a name and session info
-         * @param Temporary Queue Name
-         * @param Session Information
-         * @return a newly created Temporary Queue Object
-         * @throws ConnectorException
-         */
-        virtual cms::TemporaryQueue* createTemporaryQueue(
-            SessionInfo* session)
-                throw ( ConnectorException );
-
-        /**
-         * Sends a Message
-         * @param The Message to send.
-         * @param Producer Info for the sender of this message
-         * @throws ConnectorException
-         */
-        virtual void send( cms::Message* message, ProducerInfo* producerInfo ) 
-            throw ( ConnectorException );
-      
-        /**
-         * Sends a set of Messages
-         * @param List of Messages to send.
-         * @param Producer Info for the sender of this message
-         * @throws ConnectorException
-         */
-        virtual void send( std::list<cms::Message*>& messages,
-                           ProducerInfo* producerInfo ) 
-            throw ( ConnectorException );
-         
-        /**
-         * Acknowledges a Message
-         * @param An ActiveMQMessage to Ack.
-         * @throws ConnectorException
-         */
-        virtual void acknowledge( const SessionInfo* session,
-                                  const cms::Message* message,
-                                  AckType ackType)
-            throw ( ConnectorException );
-
-        /**
-         * Starts a new Transaction.
-         * @param Session Information
-         * @throws ConnectorException
-         */
-        virtual TransactionInfo* startTransaction(
-            SessionInfo* session) 
-                throw ( ConnectorException );
-         
-        /**
-         * Commits a Transaction.
-         * @param The Transaction information
-         * @param Session Information
-         * @throws ConnectorException
-         */
-        virtual void commit(TransactionInfo* transaction, 
-                            SessionInfo* session)
-            throw ( ConnectorException );
-
-        /**
-         * Rolls back a Transaction.
-         * @param The Transaction information
-         * @param Session Information
-         * @throws ConnectorException
-         */
-        virtual void rollback(TransactionInfo* transaction, 
-                              SessionInfo* session)
-            throw ( ConnectorException );
-
-        /**
-         * Creates a new Message.
-         * @param Session Information
-         * @param Transaction Info for this Message
-         * @throws ConnectorException
-         */
-        virtual cms::Message* createMessage(
-            SessionInfo* session,
-            TransactionInfo* transaction)
-                throw ( ConnectorException );
-
-        /**
-         * Creates a new BytesMessage.
-         * @param Session Information
-         * @param Transaction Info for this Message
-         * @throws ConnectorException
-         */
-        virtual cms::BytesMessage* createBytesMessage(
-            SessionInfo* session,
-            TransactionInfo* transaction)
-                throw ( ConnectorException );
-
-        /**
-         * Creates a new TextMessage.
-         * @param Session Information
-         * @param Transaction Info for this Message
-         * @throws ConnectorException
-         */
-        virtual cms::TextMessage* createTextMessage(
-            SessionInfo* session,
-            TransactionInfo* transaction)
-                throw ( ConnectorException );
-
-        /**
-         * Creates a new MapMessage.
-         * @param Session Information
-         * @param Transaction Info for this Message
-         * @throws ConnectorException
-         */
-        virtual cms::MapMessage* createMapMessage(
-            SessionInfo* session,
-            TransactionInfo* transaction)
-                throw ( ConnectorException );
-
-        /** 
-         * Unsubscribe from a givenDurable Subscription
-         * @param name of the Subscription
-         * @throws ConnectorException
-         */
-        virtual void unsubscribe( const std::string& name )
-            throw ( ConnectorException );
-
-        /**
-         * Destroys the given connector resource.
-         * @param resource the resource to be destroyed.
-         * @throws ConnectorException
-         */
-        virtual void destroyResource( ConnectorResource* resource )
-            throw ( ConnectorException );
-            
-        /** 
-         * Sets the listener of consumer messages.
-         * @param listener the observer.
-         */
-        virtual void setConsumerMessageListener(
-            ConsumerMessageListener* listener)
-        {
-            this->messageListener = listener;
-            
-            if(sessionManager != NULL)
-            {
-                sessionManager->setConsumerMessageListener( listener );
-            }
-        }
-
-        /** 
-         * Sets the Listner of exceptions for this connector
-         * @param ExceptionListener the observer.
-         */
-        virtual void setExceptionListener(
-            cms::ExceptionListener* listener)
-        {
-            this->exceptionListener = listener;
-        }
-        
-    public: // transport::CommandListener
-    
-        /**
-         * Event handler for the receipt of a non-response command from the 
-         * transport.
-         * @param command the received command object.
-         */
-        virtual void onCommand( transport::Command* command );
-        
-    public: // TransportExceptionListener
-
-        /**
-         * Event handler for an exception from a command transport.
-         * @param source The source of the exception
-         * @param ex The exception.
-         */
-        virtual void onTransportException( 
-            transport::Transport* source, 
-            const exceptions::ActiveMQException& ex );
-
-    public: // StompCommandListener
-
-        /**
-         * Process the Stomp Command
-         * @param command to process
-         * @throw ConnterException
-         */
-        virtual void onStompCommand( commands::StompCommand* command ) 
-            throw ( StompConnectorException );    
-
-    public:
-    
-        /**
-         * Registers a Command Listener using the CommandId specified
-         * if there is already a listener for that command it will be
-         * removed.
-         * @param CommandId to process
-         * @param pointer to the listener to call
-         */
-        virtual void addCmdListener( 
-            commands::CommandConstants::CommandId commandId,
-            StompCommandListener* listener );
-        
-        /**
-         * UnRegisters a Command Listener using the CommandId specified
-         * @param CommandId of the listener to remove.
-         */
-        virtual void removeCmdListener( 
-            commands::CommandConstants::CommandId commandId );
-        
-    private:
-    
-        unsigned int getNextProducerId( void );
-        unsigned int getNextTransactionId( void );
-
-        // Check for Connected State and Throw an exception if not.
-        void enforceConnected( void ) throw ( ConnectorException );
-        
-    };
-
-}}}
-
-#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_*/
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_
+
+#include <activemq/connector/Connector.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/CommandListener.h>
+#include <activemq/transport/TransportExceptionListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/util/Properties.h>
+#include <activemq/connector/stomp/StompCommandReader.h>
+#include <activemq/connector/stomp/StompCommandWriter.h>
+#include <activemq/connector/stomp/StompCommandListener.h>
+#include <activemq/connector/stomp/StompSessionManager.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+   
+    /**
+     * The connector implementation for the STOMP protocol.
+     */
+    class StompConnector
+    :
+        public Connector,
+        public transport::CommandListener,
+        public transport::TransportExceptionListener,
+        public StompCommandListener
+    {
+    private:
+    
+        // Flags the state we are in for connection to broker.
+        enum connectionState
+        {
+            DISCONNECTED,
+            CONNECTING,
+            CONNECTED
+        };
+
+        // Maps Command Ids to listener that are interested        
+        typedef std::map< commands::CommandConstants::CommandId, 
+                          StompCommandListener*> CmdListenerMap;
+        
+    private:
+    
+        /**
+         * The transport for sending/receiving commands on the wire.
+         */
+        transport::Transport* transport;
+        
+        /**
+         * Flag to indicate the start state of the connector.
+         */
+        connectionState state;
+        
+        /**
+         * Sync object.
+         */
+        concurrent::Mutex mutex;
+        
+        /**
+         * Observer of messages directed at a particular
+         * consumer.
+         */
+        ConsumerMessageListener* messageListener;
+        
+        /**
+         * Observer of connector exceptions.
+         */
+        cms::ExceptionListener* exceptionListener;
+        
+        /**
+         * This Connector's Command Reader
+         */
+        StompCommandReader reader;
+        
+        /**
+         * This Connector's Command Writer
+         */
+        StompCommandWriter writer;
+        
+        /**
+         * Map to hold StompCommandListeners
+         */
+        CmdListenerMap cmdListenerMap;
+        
+        /**
+         * Session Manager object that will  be allocated when we connect
+         */
+        StompSessionManager* sessionManager;
+        
+        /**
+         * Next avaliable Producer Id
+         */
+        unsigned int nextProducerId;
+        
+        /**
+         * Next avaliable Transaction Id
+         */
+        unsigned int nextTransactionId;
+        
+        /**
+         * Properties for the connector.
+         */
+        util::SimpleProperties properties;
+
+    private:
+    
+        /**
+         * Sends the connect message to the broker and
+         * waits for the response.
+         */
+        void connect(void);
+        
+        /**
+         * Sends a oneway disconnect message to the broker.
+         */
+        void disconnect(void);
+        
+        /**
+         * Fires a consumer message to the observer.
+         */
+        void fire( ConsumerInfo* consumer, core::ActiveMQMessage* msg ){
+            try{
+                if( messageListener != NULL ){
+                    messageListener->onConsumerMessage( 
+                        consumer,
+                        msg );
+                }
+            }catch( ... ){/* do nothing*/}
+        }
+        
+        /**
+         * Fires an exception event to the observing object.
+         */
+        void fire( const exceptions::ActiveMQException& ex ){
+            try{
+                if( exceptionListener != NULL ){
+                    exceptionListener->onException( ex );
+                }
+            }catch( ... ){/* do nothing*/}
+        }
+        
+    public:
+    
+        /**
+         * Constructor for the stomp connector.
+         * @param transport the transport object for sending/receiving
+         * commands on the wire.
+         * @param props properties for configuring the connector.
+         */
+        StompConnector( transport::Transport* transport, 
+                        const util::Properties& properties )
+            throw ( exceptions::IllegalArgumentException );
+
+        virtual ~StompConnector(void);
+        
+        /**
+         * Starts the service.
+         * @throws CMSException
+         */
+        virtual void start(void) throw( cms::CMSException );
+        
+        /**
+         * Closes this object and deallocates the appropriate resources.
+         * @throws CMSException
+         */
+        virtual void close(void) throw( cms::CMSException );
+
+        /**
+         * Gets the Client Id for this connection, if this
+         * connection has been closed, then this method returns ""
+         * @return Client Id String
+         */
+        virtual std::string getClientId(void) const {
+            return properties.getProperty( 
+                commands::CommandConstants::toString( 
+                    commands::CommandConstants::HEADER_CLIENT_ID ), "" );
+        }
+        
+        virtual std::string getLogin(void) const {
+            return properties.getProperty( 
+                commands::CommandConstants::toString( 
+                    commands::CommandConstants::HEADER_LOGIN ), "" );
+        }
+        
+        virtual std::string getPassword(void) const {
+            return properties.getProperty( 
+                commands::CommandConstants::toString( 
+                    commands::CommandConstants::HEADER_PASSWORD ), "" );
+        }
+
+        /**
+         * Gets a reference to the Transport that this connection
+         * is using.
+         * @param reference to a transport
+         * @throws InvalidStateException if the Transport is not set
+         */
+        virtual transport::Transport& getTransport(void) const 
+            throw (exceptions::InvalidStateException ) {
+
+            if( transport == NULL ) {
+                throw exceptions::InvalidStateException(
+                    __FILE__, __LINE__,
+                    "StompConnector::getTransport - "
+                    "Invalid State, No Transport.");
+            }
+            
+            return *transport;
+        }
+
+        /**
+         * Creates a Session Info object for this connector
+         * @param Acknowledgement Mode of the Session
+         * @returns Session Info Object
+         * @throws ConnectorException
+         */
+        virtual SessionInfo* createSession(
+            cms::Session::AcknowledgeMode ackMode) 
+                throw( ConnectorException );
+      
+        /** 
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createConsumer(
+            cms::Destination* destination, 
+            SessionInfo* session,
+            const std::string& selector = "")
+                throw ( ConnectorException );
+         
+        /** 
+         * Create a Durable Consumer for the given Session
+         * @param Topic to Subscribe to.
+         * @param Session Information.
+         * @param name of the Durable Topic
+         * @param Selector
+         * @param if set, inhibits the delivery of messages 
+         *        published by its own connection 
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createDurableConsumer(
+            cms::Topic* topic, 
+            SessionInfo* session,
+            const std::string& name,
+            const std::string& selector = "",
+            bool noLocal = false)
+                throw ( ConnectorException );
+
+        /** 
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @return Producer Information
+         * @throws ConnectorException
+         */
+        virtual ProducerInfo* createProducer(
+            cms::Destination* destination, 
+            SessionInfo* session)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a Topic given a name and session info
+         * @param Topic Name
+         * @param Session Information
+         * @return a newly created Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::Topic* createTopic( const std::string& name, 
+                                         SessionInfo* session )
+            throw ( ConnectorException );
+          
+        /**
+         * Creates a Queue given a name and session info
+         * @param Queue Name
+         * @param Session Information
+         * @return a newly created Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::Queue* createQueue( const std::string& name, 
+                                         SessionInfo* session )
+            throw ( ConnectorException );
+
+        /**
+         * Creates a Temporary Topic given a name and session info
+         * @param Temporary Topic Name
+         * @param Session Information
+         * @return a newly created Temporary Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryTopic* createTemporaryTopic(
+            SessionInfo* session)
+                throw ( ConnectorException );
+          
+        /**
+         * Creates a Temporary Queue given a name and session info
+         * @param Temporary Queue Name
+         * @param Session Information
+         * @return a newly created Temporary Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryQueue* createTemporaryQueue(
+            SessionInfo* session)
+                throw ( ConnectorException );
+
+        /**
+         * Sends a Message
+         * @param The Message to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send( cms::Message* message, ProducerInfo* producerInfo ) 
+            throw ( ConnectorException );
+      
+        /**
+         * Sends a set of Messages
+         * @param List of Messages to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send( std::list<cms::Message*>& messages,
+                           ProducerInfo* producerInfo ) 
+            throw ( ConnectorException );
+         
+        /**
+         * Acknowledges a Message
+         * @param An ActiveMQMessage to Ack.
+         * @throws ConnectorException
+         */
+        virtual void acknowledge( const SessionInfo* session,
+                                  const cms::Message* message,
+                                  AckType ackType)
+            throw ( ConnectorException );
+
+        /**
+         * Starts a new Transaction.
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual TransactionInfo* startTransaction(
+            SessionInfo* session) 
+                throw ( ConnectorException );
+         
+        /**
+         * Commits a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void commit(TransactionInfo* transaction, 
+                            SessionInfo* session)
+            throw ( ConnectorException );
+
+        /**
+         * Rolls back a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void rollback(TransactionInfo* transaction, 
+                              SessionInfo* session)
+            throw ( ConnectorException );
+
+        /**
+         * Creates a new Message.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::Message* createMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new BytesMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::BytesMessage* createBytesMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new TextMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::TextMessage* createTextMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new MapMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::MapMessage* createMapMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /** 
+         * Unsubscribe from a givenDurable Subscription
+         * @param name of the Subscription
+         * @throws ConnectorException
+         */
+        virtual void unsubscribe( const std::string& name )
+            throw ( ConnectorException );
+
+        /**
+         * Destroys the given connector resource.
+         * @param resource the resource to be destroyed.
+         * @throws ConnectorException
+         */
+        virtual void destroyResource( ConnectorResource* resource )
+            throw ( ConnectorException );
+            
+        /** 
+         * Sets the listener of consumer messages.
+         * @param listener the observer.
+         */
+        virtual void setConsumerMessageListener(
+            ConsumerMessageListener* listener)
+        {
+            this->messageListener = listener;
+            
+            if(sessionManager != NULL)
+            {
+                sessionManager->setConsumerMessageListener( listener );
+            }
+        }
+
+        /** 
+         * Sets the Listner of exceptions for this connector
+         * @param ExceptionListener the observer.
+         */
+        virtual void setExceptionListener(
+            cms::ExceptionListener* listener)
+        {
+            this->exceptionListener = listener;
+        }
+        
+    public: // transport::CommandListener
+    
+        /**
+         * Event handler for the receipt of a non-response command from the 
+         * transport.
+         * @param command the received command object.
+         */
+        virtual void onCommand( transport::Command* command );
+        
+    public: // TransportExceptionListener
+
+        /**
+         * Event handler for an exception from a command transport.
+         * @param source The source of the exception
+         * @param ex The exception.
+         */
+        virtual void onTransportException( 
+            transport::Transport* source, 
+            const exceptions::ActiveMQException& ex );
+
+    public: // StompCommandListener
+
+        /**
+         * Process the Stomp Command
+         * @param command to process
+         * @throw ConnterException
+         */
+        virtual void onStompCommand( commands::StompCommand* command ) 
+            throw ( StompConnectorException );    
+
+    public:
+    
+        /**
+         * Registers a Command Listener using the CommandId specified
+         * if there is already a listener for that command it will be
+         * removed.
+         * @param CommandId to process
+         * @param pointer to the listener to call
+         */
+        virtual void addCmdListener( 
+            commands::CommandConstants::CommandId commandId,
+            StompCommandListener* listener );
+        
+        /**
+         * UnRegisters a Command Listener using the CommandId specified
+         * @param CommandId of the listener to remove.
+         */
+        virtual void removeCmdListener( 
+            commands::CommandConstants::CommandId commandId );
+        
+    private:
+    
+        unsigned int getNextProducerId( void );
+        unsigned int getNextTransactionId( void );
+
+        // Check for Connected State and Throw an exception if not.
+        void enforceConnected( void ) throw ( ConnectorException );
+        
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_*/

Propchange: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message