activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r504746 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire: OpenWireConnector.cpp OpenWireConnector.h
Date Wed, 07 Feb 2007 23:57:27 GMT
Author: tabish
Date: Wed Feb  7 15:57:27 2007
New Revision: 504746

URL: http://svn.apache.org/viewvc?view=rev&rev=504746
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=auto&rev=504746
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Wed Feb  7 15:57:27 2007
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/connector/OpenWire/OpenWireConnector.h>
+
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/transport/BrokerError.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/ExceptionResponse.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
+#include <activemq/util/Integer.h>
+#include <activemq/connector/OpenWire/OpenWireConnectorException.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::connector;
+using namespace activemq::util;
+using namespace activemq::transport;
+using namespace activemq::exceptions;
+using namespace activemq::connector::openwire;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireConnector::OpenWireConnector( Transport* transport,
+                                      const util::Properties& properties )
+    throw ( IllegalArgumentException )
+{
+    if( transport == NULL )
+    {
+        throw IllegalArgumentException(
+            __FILE__, __LINE__,
+            "OpenWireConnector::OpenWireConnector - Transport cannot be NULL");
+    }
+
+    this->transport = transport;
+    this->state = DISCONNECTED;
+    this->exceptionListener = NULL;
+    this->messageListener = NULL;
+    this->nextProducerId = 1;
+    this->nextTransactionId = 1;
+    this->properties.copy( &properties );
+
+    // Observe the transport for events.
+    this->transport->setCommandListener( this );
+    this->transport->setTransportExceptionListener( this );
+
+    // Setup the reader and writer in the transport.
+    this->transport->setCommandReader( &reader );
+    this->transport->setCommandWriter( &writer );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireConnector::~OpenWireConnector()
+{
+    try
+    {
+        close();
+
+        // TODO - Add any cleanup code here
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int OpenWireConnector::getNextProducerId()
+{
+    synchronized( &mutex )
+    {
+        return nextProducerId++;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int OpenWireConnector::getNextTransactionId()
+{
+    synchronized( &mutex )
+    {
+        return nextTransactionId++;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::enforceConnected() throw ( ConnectorException )
+{
+    if( state != CONNECTED )
+    {
+        throw OpenWireConnectorException(
+            __FILE__, __LINE__,
+            "OpenWireConnector::enforceConnected - Not Connected!" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::start() throw( cms::CMSException )
+{
+    try
+    {
+        synchronized( &mutex )
+        {
+            if( state == CONNECTED )
+            {
+                throw ActiveMQException(
+                    __FILE__, __LINE__,
+                    "OpenWireConnector::start - already started" );
+            }
+
+            // Start the transport - this establishes the socket.
+            transport->start();
+
+            // Send the connect message to the broker.
+            connect();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::close() throw( cms::CMSException ){
+
+    try
+    {
+        synchronized( &mutex )
+        {
+            if( state == this->CONNECTED )
+            {
+                // Send the disconnect message to the broker.
+                disconnect();
+
+                // Close the transport.
+                transport->close();
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::connect()
+{
+    try
+    {
+        // Mark this connector as started.
+        /*state = this->CONNECTING;
+
+        // TODO - Create a Connect Command
+
+        // Encode User Name and Password and Client ID
+        string login = getUsername();
+        if( login.length() > 0 ){
+            cmd.setLogin( login );
+        }
+        string password = getPassword();
+        if( password.length() > 0 ){
+            cmd.setPassword( password );
+        }
+        string clientId = getClientId();
+        if( clientId.length() > 0 ){
+            cmd.setClientId( clientId );
+        }
+
+//        Response* response = transport->request( &cmd );
+//
+//        if( dynamic_cast< ExceptionResponse* >( response ) != NULL )
+//        {
+//            delete response;
+//
+//            throw OpenWireConnectorException(
+//                __FILE__, __LINE__,
+//                "OpenWireConnector::connect - Failed on Connect Request" );
+//        }
+
+//        ConnectedCommand* connected =
+//            dynamic_cast< ConnectedCommand* >( response );
+//
+//        if( connected == NULL )
+//        {
+//            delete response;
+//
+//            throw OpenWireConnectorException(
+//                __FILE__, __LINE__,
+//                "OpenWireConnector::connect - "
+//                "Response not a connected response" );
+//        }
+
+        // TODO
+
+        // Tag us in the Connected State now.
+        state = CONNECTED;
+
+        // Clean up
+        delete response;*/
+    }
+    AMQ_CATCH_RETHROW( BrokerError )
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::disconnect()
+{
+    try
+    {
+        // Mark state as no longer connected.
+        state = this->DISCONNECTED;
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SessionInfo* OpenWireConnector::createSession(
+    cms::Session::AcknowledgeMode ackMode )
+        throw( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConsumerInfo* OpenWireConnector::createConsumer(
+    const cms::Destination* destination,
+    SessionInfo* session,
+    const std::string& selector,
+    bool noLocal )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConsumerInfo* OpenWireConnector::createDurableConsumer(
+    const cms::Topic* topic,
+    SessionInfo* session,
+    const std::string& name,
+    const std::string& selector,
+    bool noLocal )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ProducerInfo* OpenWireConnector::createProducer(
+    const cms::Destination* destination,
+    SessionInfo* session )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Topic* OpenWireConnector::createTopic( const std::string& name,
+                                            SessionInfo* session )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Queue* OpenWireConnector::createQueue( const std::string& name,
+                                            SessionInfo* session )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryTopic* OpenWireConnector::createTemporaryTopic(
+    SessionInfo* session )
+        throw ( ConnectorException )
+{
+    try
+    {
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryQueue* OpenWireConnector::createTemporaryQueue(
+    SessionInfo* session )
+        throw ( ConnectorException )
+{
+    try
+    {
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::send( cms::Message* message,
+                              ProducerInfo* producerInfo )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::send( std::list<cms::Message*>& messages,
+                              ProducerInfo* producerInfo )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        list< cms::Message* >::const_iterator itr = messages.begin();
+
+        for( ; itr != messages.end(); ++itr )
+        {
+            this->send( *itr, producerInfo );
+        }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::acknowledge( const SessionInfo* session,
+                                     const cms::Message* message,
+                                     AckType ackType = ConsumedAck )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TransactionInfo* OpenWireConnector::startTransaction(
+    SessionInfo* session )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::commit( TransactionInfo* transaction,
+                                SessionInfo* session )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::rollback( TransactionInfo* transaction,
+                                  SessionInfo* session )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* OpenWireConnector::createMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* OpenWireConnector::createBytesMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* OpenWireConnector::createTextMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction )
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+
+        // TODO
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MapMessage* OpenWireConnector::createMapMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction )
+        throw ( ConnectorException )
+{
+    try
+    {
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::unsubscribe( const std::string& name )
+    throw ( ConnectorException )
+{
+    try
+    {
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::destroyResource( ConnectorResource* resource )
+    throw ( ConnectorException )
+{
+    try
+    {
+        ConsumerInfo* consumer =
+            dynamic_cast<ConsumerInfo*>(resource);
+        SessionInfo* session =
+            dynamic_cast<SessionInfo*>(resource);
+
+        // TODO
+
+        // No matter what we end it here.
+        delete resource;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::onCommand( transport::Command* command )
+{
+    try
+    {
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::onTransportException(
+    transport::Transport* source,
+    const exceptions::ActiveMQException& ex )
+{
+    try
+    {
+        // Inform the user.
+        fire( ex );
+
+        // Close down.
+        close();
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=auto&rev=504746
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Wed Feb  7 15:57:27 2007
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_OPENWIRE_OPENWIRECONNECTOR_H_
+#define _ACTIVEMQ_CONNECTOR_OPENWIRE_OPENWIRECONNECTOR_H_
+
+#include <activemq/connector/Connector.h>
+
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+#include <activemq/exceptions/InvalidStateException.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+#include <activemq/transport/Transport.h>
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/connector/ProducerInfo.h>
+#include <activemq/connector/TransactionInfo.h>
+#include <activemq/connector/ConsumerMessageListener.h>
+#include <activemq/connector/ConnectorException.h>
+#include <activemq/transport/CommandListener.h>
+#include <activemq/transport/TransportExceptionListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/core/ActiveMQConstants.h>
+
+#include <activemq/connector/openwire/OpenWireCommandReader.h>
+#include <activemq/connector/openwire/OpenWireCommandWriter.h>
+
+namespace activemq{
+namespace connector{
+namespace openwire{
+
+    class OpenWireConnector :
+        public Connector,
+        public transport::CommandListener,
+        public transport::TransportExceptionListener
+    {
+    private:
+
+        // Flags the state we are in for connection to broker.
+        enum connectionState
+        {
+            DISCONNECTED,
+            CONNECTING,
+            CONNECTED
+        };
+
+    private:
+
+        /**
+         * The transport for sending/receiving commands on the wire.
+         */
+        transport::Transport* transport;
+
+        /**
+         * Flag to indicate the start state of the connector.
+         */
+        connectionState state;
+
+        /**
+         * Sync object.
+         */
+        concurrent::Mutex mutex;
+
+        /**
+         * Observer of messages directed at a particular
+         * consumer.
+         */
+        ConsumerMessageListener* messageListener;
+
+        /**
+         * Observer of connector exceptions.
+         */
+        cms::ExceptionListener* exceptionListener;
+
+        /**
+         * This Connector's Command Reader
+         */
+        OpenWireCommandReader reader;
+
+        /**
+         * This Connector's Command Writer
+         */
+        OpenWireCommandWriter writer;
+
+        /**
+         * Next avaliable Producer Id
+         */
+        unsigned int nextProducerId;
+
+        /**
+         * Next avaliable Transaction Id
+         */
+        unsigned int nextTransactionId;
+
+        /**
+         * Properties for the connector.
+         */
+        util::SimpleProperties properties;
+
+    private:
+
+        /**
+         * Sends the connect message to the broker and
+         * waits for the response.
+         */
+        void connect(void);
+
+        /**
+         * Sends a oneway disconnect message to the broker.
+         */
+        void disconnect(void);
+
+        /**
+         * Fires a consumer message to the observer.
+         * @param msg - the message to file
+         */
+        void fire( ConsumerInfo* consumer, core::ActiveMQMessage* msg ){
+            try{
+                if( messageListener != NULL ){
+                    messageListener->onConsumerMessage(
+                        consumer,
+                        msg );
+                }
+            }catch( ... ){/* do nothing*/}
+        }
+
+        /**
+         * Fires an exception event to the observing object.
+         * @param ex - the exception to fire.
+         */
+        void fire( const exceptions::ActiveMQException& ex ){
+            try{
+                if( exceptionListener != NULL ){
+                    exceptionListener->onException( ex );
+                }
+            }catch( ... ){/* do nothing*/}
+        }
+
+    public:
+
+        /**
+         * Constructor for the OpenWire connector.
+         * @param transport - the transport object for sending/receiving
+         *                    commands on the wire.
+         * @param properties - properties for configuring the connector.
+         */
+        OpenWireConnector( transport::Transport* transport,
+                           const util::Properties& properties )
+            throw ( exceptions::IllegalArgumentException );
+
+        virtual ~OpenWireConnector();
+
+        /**
+         * Starts the service.
+         * @throws CMSException
+         */
+        virtual void start(void) throw( cms::CMSException );
+
+        /**
+         * Closes this object and deallocates the appropriate resources.
+         * @throws CMSException
+         */
+        virtual void close(void) throw( cms::CMSException );
+
+        /**
+         * Gets the Client Id for this connection, if this
+         * connection has been closed, then this method returns ""
+         * @return Client Id String
+         */
+        virtual std::string getClientId(void) const {
+            return properties.getProperty(
+                core::ActiveMQConstants::toString(
+                    core::ActiveMQConstants::PARAM_CLIENTID ), "" );
+        }
+
+        /**
+         * Gets the Username for this connection, if this
+         * connection has been closed, then this method returns ""
+         * @return Username String
+         */
+        virtual std::string getUsername(void) const {
+            return properties.getProperty(
+                core::ActiveMQConstants::toString(
+                    core::ActiveMQConstants::PARAM_USERNAME ), "" );
+        }
+
+        /**
+         * Gets the Password for this connection, if this
+         * connection has been closed, then this method returns ""
+         * @return Password String
+         */
+        virtual std::string getPassword(void) const {
+            return properties.getProperty(
+                core::ActiveMQConstants::toString(
+                    core::ActiveMQConstants::PARAM_PASSWORD ), "" );
+        }
+
+        /**
+         * Gets a reference to the Transport that this connection
+         * is using.
+         * @param reference to a transport
+         * @throws InvalidStateException if the Transport is not set
+         */
+        virtual transport::Transport& getTransport(void) const
+            throw ( exceptions::InvalidStateException ) {
+
+            if( transport == NULL ) {
+                throw exceptions::InvalidStateException(
+                    __FILE__, __LINE__,
+                    "OpenWireConnector::getTransport - "
+                    "Invalid State, No Transport.");
+            }
+
+            return *transport;
+        }
+
+        /**
+         * Creates a Session Info object for this connector
+         * @param Acknowledgement Mode of the Session
+         * @returns Session Info Object
+         * @throws ConnectorException
+         */
+        virtual SessionInfo* createSession(
+            cms::Session::AcknowledgeMode ackMode )
+                throw( ConnectorException );
+
+        /**
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @param Message Selector
+         * @param No Local redelivery indicator
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createConsumer(
+            const cms::Destination* destination,
+            SessionInfo* session,
+            const std::string& selector = "",
+            bool noLocal = false )
+                throw ( ConnectorException );
+
+        /**
+         * Create a Durable Consumer for the given Session
+         * @param Topic to Subscribe to.
+         * @param Session Information.
+         * @param name of the Durable Topic
+         * @param Selector
+         * @param if set, inhibits the delivery of messages
+         *        published by its own connection
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createDurableConsumer(
+            const cms::Topic* topic,
+            SessionInfo* session,
+            const std::string& name,
+            const std::string& selector = "",
+            bool noLocal = false )
+                throw ( ConnectorException );
+
+        /**
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @return Producer Information
+         * @throws ConnectorException
+         */
+        virtual ProducerInfo* createProducer(
+            const cms::Destination* destination,
+            SessionInfo* session)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a Topic given a name and session info
+         * @param Topic Name
+         * @param Session Information
+         * @return a newly created Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::Topic* createTopic( const std::string& name,
+                                         SessionInfo* session )
+            throw ( ConnectorException );
+
+        /**
+         * Creates a Queue given a name and session info
+         * @param Queue Name
+         * @param Session Information
+         * @return a newly created Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::Queue* createQueue( const std::string& name,
+                                         SessionInfo* session )
+            throw ( ConnectorException );
+
+        /**
+         * Creates a Temporary Topic given a name and session info
+         * @param Temporary Topic Name
+         * @param Session Information
+         * @return a newly created Temporary Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryTopic* createTemporaryTopic(
+            SessionInfo* session )
+                throw ( ConnectorException );
+
+        /**
+         * Creates a Temporary Queue given a name and session info
+         * @param Temporary Queue Name
+         * @param Session Information
+         * @return a newly created Temporary Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryQueue* createTemporaryQueue(
+            SessionInfo* session )
+                throw ( ConnectorException );
+
+        /**
+         * Sends a Message
+         * @param The Message to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send( cms::Message* message, ProducerInfo* producerInfo )
+            throw ( ConnectorException );
+
+        /**
+         * Sends a set of Messages
+         * @param List of Messages to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send( std::list<cms::Message*>& messages,
+                           ProducerInfo* producerInfo )
+            throw ( ConnectorException );
+
+        /**
+         * Acknowledges a Message
+         * @param An ActiveMQMessage to Ack.
+         * @throws ConnectorException
+         */
+        virtual void acknowledge( const SessionInfo* session,
+                                  const cms::Message* message,
+                                  AckType ackType )
+            throw ( ConnectorException );
+
+        /**
+         * Starts a new Transaction.
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual TransactionInfo* startTransaction(
+            SessionInfo* session )
+                throw ( ConnectorException );
+
+        /**
+         * Commits a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void commit( TransactionInfo* transaction,
+                             SessionInfo* session )
+            throw ( ConnectorException );
+
+        /**
+         * Rolls back a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void rollback( TransactionInfo* transaction,
+                               SessionInfo* session )
+            throw ( ConnectorException );
+
+        /**
+         * Creates a new Message.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::Message* createMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction )
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new BytesMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::BytesMessage* createBytesMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction )
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new TextMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::TextMessage* createTextMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction )
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new MapMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::MapMessage* createMapMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction )
+                throw ( ConnectorException );
+
+        /**
+         * Unsubscribe from a givenDurable Subscription
+         * @param name of the Subscription
+         * @throws ConnectorException
+         */
+        virtual void unsubscribe( const std::string& name )
+            throw ( ConnectorException );
+
+        /**
+         * Destroys the given connector resource.
+         * @param resource the resource to be destroyed.
+         * @throws ConnectorException
+         */
+        virtual void destroyResource( ConnectorResource* resource )
+            throw ( ConnectorException );
+
+        /**
+         * Sets the listener of consumer messages.
+         * @param listener the observer.
+         */
+        virtual void setConsumerMessageListener(
+            ConsumerMessageListener* listener )
+        {
+            this->messageListener = listener;
+        }
+
+        /**
+         * Sets the Listner of exceptions for this connector
+         * @param ExceptionListener the observer.
+         */
+        virtual void setExceptionListener(
+            cms::ExceptionListener* listener )
+        {
+            this->exceptionListener = listener;
+        }
+
+    public: // transport::CommandListener
+
+        /**
+         * Event handler for the receipt of a non-response command from the
+         * transport.
+         * @param command the received command object.
+         */
+        virtual void onCommand( transport::Command* command );
+
+    public: // TransportExceptionListener
+
+        /**
+         * Event handler for an exception from a command transport.
+         * @param source The source of the exception
+         * @param ex The exception.
+         */
+        virtual void onTransportException(
+            transport::Transport* source,
+            const exceptions::ActiveMQException& ex );
+
+    private:
+
+        unsigned int getNextProducerId(void);
+        unsigned int getNextTransactionId(void);
+
+        // Check for Connected State and Throw an exception if not.
+        void enforceConnected( void ) throw ( ConnectorException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_OPENWIRE_OPENWIRECONNECTOR_H_*/



Mime
View raw message