activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r506105 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: ./ activemq/connector/openwire/ activemq/support/
Date Sun, 11 Feb 2007 19:38:48 GMT
Author: tabish
Date: Sun Feb 11 11:38:47 2007
New Revision: 506105

URL: http://svn.apache.org/viewvc?view=rev&rev=506105
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnectorFactory.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireSessionInfo.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?view=diff&rev=506105&r1=506104&r2=506105
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Sun Feb 11 11:38:47 2007
@@ -51,9 +51,12 @@
     activemq/connector/stomp/StompConnector.cpp \
     activemq/connector/stomp/StompSelector.cpp \
     activemq/connector/openwire/OpenWireFormat.cpp \
+    activemq/connector/openwire/OpenWireFormatFactory.cpp \
     activemq/connector/openwire/OpenWireCommandReader.cpp \
     activemq/connector/openwire/OpenWireCommandWriter.cpp \
     activemq/connector/openwire/OpenWireFormatNegotiator.cpp \
+    activemq/connector/openwire/OpenWireConnector.cpp \
+    activemq/connector/openwire/OpenWireConnectorFactory.cpp \
     activemq/connector/openwire/marshal/BaseDataStreamMarshaller.cpp \
     activemq/connector/openwire/utils/HexTable.cpp \
     activemq/connector/openwire/utils/BooleanStream.cpp \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=506105&r1=506104&r2=506105
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sun Feb 11 11:38:47 2007
@@ -23,9 +23,18 @@
 #include <activemq/transport/ExceptionResponse.h>
 #include <activemq/exceptions/UnsupportedOperationException.h>
 #include <activemq/util/Integer.h>
+#include <activemq/util/Guid.h>
 #include <activemq/connector/openwire/OpenWireConnectorException.h>
 #include <activemq/connector/openwire/OpenWireFormatFactory.h>
 
+#include <activemq/connector/openwire/commands/ConnectionId.h>
+#include <activemq/connector/openwire/commands/RemoveInfo.h>
+#include <activemq/connector/openwire/commands/ShutdownInfo.h>
+#include <activemq/connector/openwire/commands/SessionInfo.h>
+#include <activemq/connector/openwire/commands/MessageDispatch.h>
+#include <activemq/connector/openwire/commands/WireFormatInfo.h>
+#include <activemq/connector/openwire/commands/BrokerInfo.h>
+
 using namespace std;
 using namespace activemq;
 using namespace activemq::connector;
@@ -33,6 +42,7 @@
 using namespace activemq::transport;
 using namespace activemq::exceptions;
 using namespace activemq::connector::openwire;
+using namespace activemq::connector::openwire::commands;
 
 ////////////////////////////////////////////////////////////////////////////////
 OpenWireConnector::OpenWireConnector( Transport* transport,
@@ -52,6 +62,8 @@
     this->state = DISCONNECTED;
     this->exceptionListener = NULL;
     this->messageListener = NULL;
+    this->brokerInfo = NULL;
+    this->brokerWireFormatInfo = NULL;
     this->nextProducerId = 1;
     this->nextTransactionId = 1;
     this->properties.copy( &properties );
@@ -81,6 +93,8 @@
 
         delete transport;
         delete wireFormat;
+        delete brokerInfo;
+        delete brokerWireFormatInfo;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -171,55 +185,43 @@
     try
     {
         // Mark this connector as started.
-        /*state = this->CONNECTING;
+        state = this->CONNECTING;
 
-        // TODO - Create a Connect Command
+        // Fill in our connection info.
+        connectionInfo.setUserName( getUsername() );
+        connectionInfo.setPassword( getPassword() );
 
-        // Encode User Name and Password and Client ID
-        string login = getUsername();
-        if( login.length() > 0 ){
-            cmd.setLogin( login );
-        }
-        string password = getPassword();
-        if( password.length() > 0 ){
-            cmd.setPassword( password );
-        }
+        // Get or Create a Client Id
         string clientId = getClientId();
         if( clientId.length() > 0 ){
-            cmd.setClientId( clientId );
+            connectionInfo.setClientId( clientId );
+        } else {
+            connectionInfo.setClientId( Guid::createGUIDString() );
         }
 
-//        Response* response = transport->request( &cmd );
-//
-//        if( dynamic_cast< ExceptionResponse* >( response ) != NULL )
-//        {
-//            delete response;
-//
-//            throw OpenWireConnectorException(
-//                __FILE__, __LINE__,
-//                "OpenWireConnector::connect - Failed on Connect Request" );
-//        }
+        // Generate a connectionId
+        ConnectionId* connectionId = new ConnectionId();
+        connectionId->setValue( Guid::createGUIDString() );
+        connectionInfo.setConnectionId( connectionId );
 
-//        ConnectedCommand* connected =
-//            dynamic_cast< ConnectedCommand* >( response );
-//
-//        if( connected == NULL )
-//        {
-//            delete response;
-//
-//            throw OpenWireConnectorException(
-//                __FILE__, __LINE__,
-//                "OpenWireConnector::connect - "
-//                "Response not a connected response" );
-//        }
+        // Now we ping the broker and see if we get an ack / nack
+        Response* response = transport->request( &connectionInfo );
 
-        // TODO
+        if( dynamic_cast<ExceptionResponse*>( response ) != NULL )
+        {
+            delete response;
+
+            throw OpenWireConnectorException(
+                __FILE__,
+                __LINE__,
+                "OpenWireConnector::connect - Failed on Connect Request" );
+        }
 
         // Tag us in the Connected State now.
         state = CONNECTED;
 
-        // Clean up
-        delete response;*/
+        // Clean up the ack
+        delete response;
     }
     AMQ_CATCH_RETHROW( BrokerError )
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -232,16 +234,27 @@
     try
     {
         // Mark state as no longer connected.
-        state = this->DISCONNECTED;
+        state = DISCONNECTED;
 
-        // TODO
+        // Remove our ConnectionId from the Broker
+        RemoveInfo remove;
+        remove.setObjectId( connectionInfo.getConnectionId() );
+        transport->oneway( &remove );
+
+        // Send the disconnect command to the broker.
+        ShutdownInfo shutdown;
+        transport->oneway( &shutdown );
+
+    } catch( CommandIOException& ex ){
+        transport->close();
+        throw ex;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-SessionInfo* OpenWireConnector::createSession(
+connector::SessionInfo* OpenWireConnector::createSession(
     cms::Session::AcknowledgeMode ackMode )
         throw( ConnectorException )
 {
@@ -374,6 +387,43 @@
         enforceConnected();
 
         // TODO
+
+//        const SessionInfo* session = producerInfo->getSessionInfo();
+//        Command* command = dynamic_cast< transport::Command* >( message );
+//
+//        if( command == NULL )
+//        {
+//            throw StompConnectorException(
+//                __FILE__, __LINE__,
+//                "StompConnector::send - "
+//                "Message is not a valid stomp type.");
+//        }
+//
+//        if( session->getAckMode() == cms::Session::SESSION_TRANSACTED )
+//        {
+//            StompCommand* stompCommand =
+//                dynamic_cast< StompCommand* >( message );
+//
+//            if( stompCommand == NULL )
+//            {
+//                throw StompConnectorException(
+//                    __FILE__, __LINE__,
+//                    "StompConnector::send - "
+//                    "Message is not a valid stomp type.");
+//            }
+//
+//            stompCommand->setTransactionId(
+//                Integer::toString(
+//                    session->getTransactionInfo()->getTransactionId() ) );
+//        }
+//
+//        // Send it
+//        transport->oneway( command );
+    }
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
@@ -405,11 +455,39 @@
                                      AckType ackType = ConsumedAck )
     throw ( ConnectorException )
 {
-    try
-    {
-        enforceConnected();
+    try {
 
-        // TODO
+//        // Auto to Stomp means don't do anything, so we drop it here
+//        // for client acknowledge we have to send and ack.
+//        if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE ||
+//            session->getAckMode() == cms::Session::SESSION_TRANSACTED )
+//        {
+//            AckCommand cmd;
+//
+//            if( message->getCMSMessageId() == "" )
+//            {
+//                throw StompConnectorException(
+//                    __FILE__, __LINE__,
+//                    "StompConnector::send - "
+//                    "Message has no Message Id, cannot ack.");
+//            }
+//
+//            cmd.setMessageId( message->getCMSMessageId() );
+//
+//            if( session->getAckMode() == cms::Session::SESSION_TRANSACTED )
+//            {
+//                cmd.setTransactionId(
+//                    Integer::toString(
+//                        session->getTransactionInfo()->getTransactionId() ) );
+//            }
+//
+//            transport->oneway( &cmd );
+//        }
+    }
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
@@ -420,14 +498,33 @@
     SessionInfo* session )
         throw ( ConnectorException )
 {
-    try
-    {
+    try {
+
         enforceConnected();
 
-        // TODO
+//        TransactionInfo* transaction = new StompTransactionInfo();
+//
+//        transaction->setTransactionId( getNextTransactionId() );
+//
+//        session->setTransactionInfo( transaction );
+//
+//        BeginCommand cmd;
+//
+//        cmd.setTransactionId(
+//                Integer::toString( transaction->getTransactionId() ) );
+//
+//        transport->oneway( &cmd );
+//
+//        return transaction;
+    }
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
+    return NULL;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -439,7 +536,17 @@
     {
         enforceConnected();
 
-        // TODO
+//        CommitCommand cmd;
+//
+//        cmd.setTransactionId(
+//                Integer::toString( transaction->getTransactionId() ) );
+//
+//        transport->oneway( &cmd );
+    }
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
@@ -454,7 +561,17 @@
     {
         enforceConnected();
 
-        // TODO
+//        AbortCommand cmd;
+//
+//        cmd.setTransactionId(
+//                Integer::toString( transaction->getTransactionId() ) );
+//
+//        transport->oneway( &cmd );
+    }
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
@@ -557,6 +674,49 @@
 {
     try
     {
+        MessageDispatch* dispatch =
+            dynamic_cast<MessageDispatch*>( command );
+        WireFormatInfo* brokerWireFormatInfo =
+            dynamic_cast<WireFormatInfo*>( command );
+        BrokerInfo* brokerInfo =
+            dynamic_cast<BrokerInfo*>( command );
+        ShutdownInfo* shutdownInfo =
+            dynamic_cast<ShutdownInfo*>( command );
+
+        if( dispatch != NULL ) {
+
+//            ConsumerId* consumerId = dispatch->getConsumerId();
+//            MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+//            if (consumer == null)
+//            {
+//                Tracer.Error("No such consumer active: " + consumerId);
+//            }
+//            else
+//            {
+//                ActiveMQMessage message = (ActiveMQMessage) dispatch.Message;
+//                consumer.Dispatch(message);
+//            }
+
+        } else if( brokerWireFormatInfo != NULL ) {
+            this->brokerWireFormatInfo = brokerWireFormatInfo;
+        } else if( brokerInfo != NULL ) {
+            this->brokerInfo = brokerInfo;
+        } else if( shutdownInfo != NULL ) {
+
+            if( state != DISCONNECTED ) {
+                fire( CommandIOException(
+                    __FILE__,
+                    __LINE__,
+                    "OpenWireConnector::onCommand - "
+                    "Broker closed this connection."));
+            }
+        }
+        else
+        {
+            //LOGCMS_WARN( logger, "Received an unknown command" );
+        }
+
+        delete command;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
@@ -569,11 +729,16 @@
 {
     try
     {
-        // Inform the user.
-        fire( ex );
+        // We're disconnected - the asynchronous error is expected.
+        if( state == DISCONNECTED ){
+            return;
+        }
 
-        // Close down.
-        close();
+        // We were not closing - log the stack trace.
+        //LOGCMS_WARN( logger, ex.getStackTraceString() );
+
+        // Inform the user of the error.
+        fire( ex );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=506105&r1=506104&r2=506105
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Sun Feb 11 11:38:47 2007
@@ -54,6 +54,10 @@
 #include <activemq/connector/openwire/OpenWireFormat.h>
 #include <activemq/connector/openwire/OpenWireFormatNegotiator.h>
 
+#include <activemq/connector/openwire/commands/ConnectionInfo.h>
+#include <activemq/connector/openwire/commands/BrokerInfo.h>
+#include <activemq/connector/openwire/commands/WireFormatInfo.h>
+
 namespace activemq{
 namespace connector{
 namespace openwire{
@@ -85,6 +89,21 @@
          * marshalling details.
          */
         OpenWireFormat* wireFormat;
+
+        /**
+         * Connection Information for this connection to the Broker
+         */
+        commands::ConnectionInfo connectionInfo;
+
+        /**
+         * Command sent from the Broker with its BrokerInfo
+         */
+        commands::BrokerInfo* brokerInfo;
+
+        /**
+         * Command sent from the Broker with its WireFormatInfo
+         */
+        commands::WireFormatInfo* brokerWireFormatInfo;
 
         /**
          * Flag to indicate the start state of the connector.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnectorFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnectorFactory.h?view=diff&rev=506105&r1=506104&r2=506105
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnectorFactory.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnectorFactory.h
Sun Feb 11 11:38:47 2007
@@ -29,7 +29,7 @@
     {
     public:
 
-        virtual ~OpenWireConnectorFactory();
+        virtual ~OpenWireConnectorFactory() {}
 
         /**
          * Creates a StompConnector

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireSessionInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireSessionInfo.h?view=diff&rev=506105&r1=506104&r2=506105
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireSessionInfo.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireSessionInfo.h
Sun Feb 11 11:38:47 2007
@@ -19,6 +19,7 @@
 #define _ACTIVEMQ_CONNECTOR_OPENWIRE_OPENWIRESESSIONINFO_H_
 
 #include <activemq/connector/SessionInfo.h>
+#include <activemq/connector/openwire/commands/SessionInfo.h>
 #include <cms/Session.h>
 #include <string>
 
@@ -30,6 +31,9 @@
 
     private:
 
+        // The OpenWire Session Info DataStructure for this Session
+        commands::SessionInfo* sessionInfo;
+
         // Acknowledge Mode of this Session
         cms::Session::AcknowledgeMode ackMode;
 
@@ -49,6 +53,7 @@
             sessionId = 0;
             ackMode = cms::Session::AUTO_ACKNOWLEDGE;
             transaction = NULL;
+            sessionInfo = NULL;
         }
         virtual ~OpenWireSessionInfo() {}
 
@@ -119,6 +124,27 @@
          */
         virtual void setTransactionInfo( const TransactionInfo* transaction ) {
             this->transaction = transaction;
+        }
+
+        /**
+         * Gets the OpenWire Session Info object that was used to create
+         * this session.
+         * @returns The SessionInfo for this Session or NULL if not set.
+         */
+        virtual const commands::SessionInfo* getSessionInfo() const {
+            return this->sessionInfo;
+        }
+        virtual commands::SessionInfo* getSessionInfo() {
+            return this->sessionInfo;
+        }
+
+        /**
+         * Sets the SessionInfo from OpenWire that was used to create this
+         * Session
+         * @param sessionInfo - the SessionInfo for this Session.
+         */
+        virtual void setSessionInfo( SessionInfo* sessionInfo ) {
+            this->sessionInfo = sessionInfo;
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp?view=diff&rev=506105&r1=506104&r2=506105
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/support/InitDirector.cpp Sun
Feb 11 11:38:47 2007
@@ -20,6 +20,7 @@
 #include <activemq/transport/IOTransportFactory.h>
 #include <activemq/transport/TcpTransportFactory.h>
 #include <activemq/connector/stomp/StompConnectorFactory.h>
+#include <activemq/connector/openwire/OpenWireConnectorFactory.h>
 
 using namespace activemq;
 using namespace activemq::support;
@@ -33,10 +34,11 @@
     {
         logger::LogWriter::getInstance();
         connector::stomp::StompConnectorFactory::getInstance();
+        connector::openwire::OpenWireConnectorFactory::getInstance();
         transport::TcpTransportFactory::getInstance();
         transport::IOTransportFactory::getInstance();
     }
-    
+
     refCount++;
 }
 
@@ -44,7 +46,7 @@
 InitDirector::~InitDirector(void)
 {
     refCount--;
-    
+
     if( refCount == 0 )
     {
     }



Mime
View raw message