activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r736842 [2/5] - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/commands/ main/activemq/connector/ main/activemq/core/ main/activemq/exceptions/ main/activemq/library/ main/activemq/transport/ main/activemq/transport/mock/ main/act...
Date Thu, 22 Jan 2009 22:55:28 GMT
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp Thu Jan 22 14:55:27 2009
@@ -17,6 +17,7 @@
 #include "ActiveMQProducer.h"
 
 #include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQConnection.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -25,14 +26,14 @@
 using namespace std;
 using namespace activemq;
 using namespace activemq::core;
-using namespace activemq::connector;
 using namespace activemq::exceptions;
 using namespace decaf::util;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQProducer::ActiveMQProducer( connector::ProducerInfo* producerInfo,
+ActiveMQProducer::ActiveMQProducer( commands::ProducerInfo* producerInfo,
+                                    const cms::Destination* destination,
                                     ActiveMQSession* session ) {
 
     if( session == NULL || producerInfo == NULL ) {
@@ -43,24 +44,26 @@
 
     // Init Producer Data
     this->session = session;
-    this->producerInfo = producerInfo;
+    this->producerInfo.reset( producerInfo );
+    this->destination.reset( destination != NULL ? destination->clone() : NULL );
     this->closed = false;
 
     // Default the Delivery options
     this->defaultDeliveryMode = cms::DeliveryMode::PERSISTENT;
     this->disableTimestamps = false;
+    this->disableMessageId = false;
     this->defaultPriority = 4;
     this->defaultTimeToLive = 0;
 
+    // TODO - How to manage resources
     // Listen for our resource to close
-    this->producerInfo->addListener( this );
+    //this->producerInfo->addListener( this );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQProducer::~ActiveMQProducer() {
     try {
         close();
-        delete producerInfo;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -73,13 +76,7 @@
 
         if( !closed ) {
 
-            // Close the ProducerInfo
-            if( !producerInfo->isClosed() ) {
-                // We don't want a callback now
-                this->producerInfo->removeListener( this );
-                this->producerInfo->close();
-            }
-
+            this->session->getConnection()->disposeOf( this->producerInfo->getProducerId() );
             closed = true;
         }
     }
@@ -94,13 +91,16 @@
 
     try {
 
-        if( closed ) {
+        this->checkClosed();
+
+        if( this->destination.get() == NULL ) {
             throw ActiveMQException(
                 __FILE__, __LINE__,
-                "ActiveMQProducer::send - This Producer is closed" );
+                "ActiveMQProducer::send - "
+                "Producer has no Destination, must call send( dest, msg )" );
         }
 
-        send( producerInfo->getDestination(), message );
+        this->send( this->destination.get(), message );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -113,14 +113,17 @@
                                 throw ( cms::CMSException ) {
     try {
 
-        if( closed ) {
+        this->checkClosed();
+
+        if( this->destination.get() == NULL ) {
             throw ActiveMQException(
                 __FILE__, __LINE__,
-                "ActiveMQProducer::send - This Producer is closed" );
+                "ActiveMQProducer::send - "
+                "Producer has no Destination, must call send( dest, msg )" );
         }
 
-        send( producerInfo->getDestination(), message, deliveryMode,
-              priority, timeToLive );
+        this->send( this->destination.get(), message, deliveryMode,
+                    priority, timeToLive );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -133,14 +136,10 @@
 
     try {
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQProducer::send - This Producer is closed" );
-        }
+        this->checkClosed();
 
-        send( destination, message, defaultDeliveryMode,
-              defaultPriority, defaultTimeToLive );
+        this->send( destination, message, defaultDeliveryMode,
+                    defaultPriority, defaultTimeToLive );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -155,11 +154,7 @@
 
     try {
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQProducer::send - This Producer is closed" );
-        }
+        checkClosed();
 
         if( destination == NULL ) {
 
@@ -185,37 +180,58 @@
 
         message->setCMSExpiration( expiration );
 
-        session->send( message, this );
+        // Delegate send to the session so that it can choose how to
+        // send the message.
+        this->session->send( message, this, this->memoryUsage.get() );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
+// TODO
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::onConnectorResourceClosed(
-    const ConnectorResource* resource ) throw ( cms::CMSException ) {
+//void ActiveMQProducer::onConnectorResourceClosed(
+//    const ConnectorResource* resource ) throw ( cms::CMSException ) {
+//
+//    try{
+//
+//        checkClosed();
+//
+//        if( resource != producerInfo ) {
+//            throw ActiveMQException(
+//                __FILE__, __LINE__,
+//                "ActiveMQProducer::onConnectorResourceClosed - "
+//                "Unknown object passed to this callback");
+//        }
+//
+//        // If our producer isn't closed already, then lets close
+//        this->close();
+//    }
+//    AMQ_CATCH_RETHROW( ActiveMQException )
+//    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+//    AMQ_CATCHALL_THROW( ActiveMQException )
+//}
 
-    try{
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducer::onProducerAck( const commands::ProducerAck& ack ) {
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQProducer::onConnectorResourceClosed - "
-                "Producer Already Closed");
-        }
+    try{
 
-        if( resource != producerInfo ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQProducer::onConnectorResourceClosed - "
-                "Unknown object passed to this callback");
+        if( this->memoryUsage.get() != NULL ) {
+            this->memoryUsage->decreaseUsage( ack.getSize() );
         }
-
-        // If our producer isn't closed already, then lets close
-        this->close();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducer::checkClosed() throw( exceptions::ActiveMQException ) {
+    if( closed ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQProducer - Producer Already Closed" );
+    }
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h Thu Jan 22 14:55:27 2009
@@ -23,22 +23,27 @@
 #include <cms/DeliveryMode.h>
 
 #include <activemq/util/Config.h>
-#include <activemq/connector/ConnectorResourceListener.h>
-#include <activemq/connector/ProducerInfo.h>
+#include <activemq/util/MemoryUsage.h>
+#include <activemq/commands/ProducerInfo.h>
+#include <activemq/commands/ProducerAck.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <memory>
 
 namespace activemq{
 namespace core{
 
     class ActiveMQSession;
 
-    class AMQCPP_API ActiveMQProducer : public cms::MessageProducer,
-                                        public connector::ConnectorResourceListener
-    {
+    class AMQCPP_API ActiveMQProducer : public cms::MessageProducer {
     private:
 
         // Disable sending timestamps
         bool disableTimestamps;
 
+        // Disable adding a Message Id
+        bool disableMessageId;
+
         // The default delivery Mode of this Producer
         int defaultDeliveryMode;
 
@@ -48,21 +53,39 @@
         // The default time to live value for messages in milliseconds
         long long defaultTimeToLive;
 
+        // The default Send Timeout for this Producer.
+        long long sendTimeout;
+
         // Session that this producer sends to.
         ActiveMQSession* session;
 
-        // This Producers protocal specific info object
-        connector::ProducerInfo* producerInfo;
+        // This Producers protocol specific info object
+        std::auto_ptr<commands::ProducerInfo> producerInfo;
 
         // Boolean that indicates if the consumer has been closed
         bool closed;
 
+        // Memory Usage Class, created only if the Producer is tracking its usage.
+        std::auto_ptr<util::MemoryUsage> memoryUsage;
+
+        // The Destination assigned at creation, NULL if not assigned.
+        std::auto_ptr<cms::Destination> destination;
+
     public:
 
         /**
-         * Constructor
+         * Constructor, creates an instance of an ActiveMQProducer
+         *
+         * @param producerInfo
+         *        Pointer to a ProducerInfo command which identifies this producer.
+         * @param destination
+         *        The assigned Destination this Producer sends to, or null if not set.
+         *        The Producer does not own the Pointer passed.
+         * @param session
+         *        The Session which is the parent of this Producer.
          */
-        ActiveMQProducer( connector::ProducerInfo* producerInfo,
+        ActiveMQProducer( commands::ProducerInfo* producerInfo,
+                          const cms::Destination* destination,
                           ActiveMQSession* session );
 
         virtual ~ActiveMQProducer();
@@ -126,7 +149,7 @@
          * @param The DeliveryMode
          */
         virtual void setDeliveryMode( int mode ) {
-            defaultDeliveryMode = mode;
+            this->defaultDeliveryMode = mode;
         }
 
         /**
@@ -134,45 +157,39 @@
          * @return The DeliveryMode
          */
         virtual int getDeliveryMode() const {
-            return defaultDeliveryMode;
+            return this->defaultDeliveryMode;
         }
 
         /**
-         * Sets if Message Ids are disbled for this Producer
+         * Sets if Message Ids are disabled for this Producer
          * @param boolean indicating enable / disable (true / false)
          */
         virtual void setDisableMessageID( bool value ) {
-            if( producerInfo != NULL ){
-                producerInfo->setDisableMessageId( value );
-            }
+            this->disableMessageId = value;
         }
 
         /**
-         * Sets if Message Ids are disbled for this Producer
+         * Sets if Message Ids are disabled for this Producer
          * @param boolean indicating enable / disable (true / false)
          */
         virtual bool getDisableMessageID() const {
-            if( this->producerInfo != NULL ) {
-                return this->producerInfo->isDisableMessageId();
-            }
-
-            return false;
+            return this->disableMessageId;
         }
 
         /**
-         * Sets if Message Time Stamps are disbled for this Producer
+         * Sets if Message Time Stamps are disabled for this Producer
          * @param boolean indicating enable / disable (true / false)
          */
         virtual void setDisableMessageTimeStamp( bool value ) {
-            disableTimestamps = value;
+            this->disableTimestamps = value;
         }
 
         /**
-         * Sets if Message Time Stamps are disbled for this Producer
+         * Sets if Message Time Stamps are disabled for this Producer
          * @param boolean indicating enable / disable (true / false)
          */
         virtual bool getDisableMessageTimeStamp() const {
-            return disableTimestamps;
+            return this->disableTimestamps;
         }
 
         /**
@@ -180,7 +197,7 @@
          * @param int value for Priority level
          */
         virtual void setPriority( int priority ) {
-            defaultPriority = priority;
+            this->defaultPriority = priority;
         }
 
         /**
@@ -188,7 +205,7 @@
          * @return int based priority level
          */
         virtual int getPriority() const {
-            return defaultPriority;
+            return this->defaultPriority;
         }
 
         /**
@@ -196,7 +213,7 @@
          * @param time The new default time to live value in milliseconds.
          */
         virtual void setTimeToLive( long long time ) {
-            defaultTimeToLive = time;
+            this->defaultTimeToLive = time;
         }
 
         /**
@@ -204,29 +221,45 @@
          * @return The default time to live value in milliseconds.
          */
         virtual long long getTimeToLive() const {
-            return defaultTimeToLive;
+            return this->defaultTimeToLive;
+        }
+
+        /**
+         * Sets the Send Timeout that this Producers sends messages with
+         * @param time The new default send timeout value in milliseconds.
+         */
+        virtual void setSendTimeout( long long time ) {
+            this->sendTimeout = time;
+        }
+
+        /**
+         * Gets the Send Timeout that this producer sends messages with
+         * @return The default send timeout value in milliseconds.
+         */
+        virtual long long getSendTimeout() const {
+            return this->sendTimeout;
         }
 
     public:
 
         /**
-         * Retrives this object ProducerInfo pointer
+         * Retries this object ProducerInfo pointer
          * @return ProducerInfo pointer
          */
-        virtual connector::ProducerInfo* getProducerInfo(){
-            return producerInfo;
+        virtual commands::ProducerInfo* getProducerInfo(){
+            return this->producerInfo.get();
         }
 
-    protected:   // ConnectorResourceListener
-
         /**
-         * When a Connector Resouce is closed it will notify any registered
-         * Listeners of its close so that they can take the appropriate
-         * action.
-         * @param resource - The ConnectorResource that was closed.
+         * Handles the work of Processing a ProducerAck Command from the Broker.
+         * @param ack - The ProducerAck message received from the Broker.
          */
-        virtual void onConnectorResourceClosed(
-            const connector::ConnectorResource* resource ) throw ( cms::CMSException );
+        virtual void onProducerAck( const commands::ProducerAck& ack );
+
+   private:
+
+       // Checks for the closed state and throws if so.
+       void checkClosed() throw( exceptions::ActiveMQException );
 
    };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Thu Jan 22 14:55:27 2009
@@ -16,33 +16,54 @@
 */
 #include "ActiveMQSession.h"
 
-#include <decaf/lang/exceptions/InvalidStateException.h>
-#include <decaf/lang/exceptions/NullPointerException.h>
-
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConstants.h>
 #include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQTransaction.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/ActiveMQConsumer.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/core/ActiveMQProducer.h>
 #include <activemq/core/ActiveMQSessionExecutor.h>
+#include <activemq/util/ActiveMQProperties.h>
+
+#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/DestinationInfo.h>
+#include <activemq/commands/ExceptionResponse.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/commands/ActiveMQTopic.h>
+#include <activemq/commands/ActiveMQQueue.h>
+#include <activemq/commands/ActiveMQTempDestination.h>
+#include <activemq/commands/ActiveMQMessage.h>
+#include <activemq/commands/ActiveMQBytesMessage.h>
+#include <activemq/commands/ActiveMQTextMessage.h>
+#include <activemq/commands/ActiveMQMapMessage.h>
+#include <activemq/commands/ActiveMQTempTopic.h>
+#include <activemq/commands/ActiveMQTempQueue.h>
+#include <activemq/commands/MessagePull.h>
+#include <activemq/commands/ProducerInfo.h>
+#include <activemq/commands/TransactionInfo.h>
+#include <activemq/commands/RemoveSubscriptionInfo.h>
+
 #include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
 #include <decaf/util/Queue.h>
-
-#include <activemq/connector/TransactionInfo.h>
+#include <decaf/lang/exceptions/InvalidStateException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
 
 using namespace std;
 using namespace cms;
 using namespace activemq;
+using namespace activemq::util;
 using namespace activemq::core;
-using namespace activemq::connector;
 using namespace activemq::exceptions;
 using namespace decaf::util;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
+ActiveMQSession::ActiveMQSession( commands::SessionInfo* sessionInfo,
+                                  cms::Session::AcknowledgeMode ackMode,
                                   const Properties& properties,
                                   ActiveMQConnection* connection ) {
 
@@ -52,19 +73,18 @@
             "ActiveMQSession::ActiveMQSession - Init with NULL data");
     }
 
-    this->sessionInfo = sessionInfo;
-    this->transaction = NULL;
+    this->sessionInfo.reset( sessionInfo );
     this->connection = connection;
     this->closed = false;
+    this->ackMode = ackMode;
 
     // Create a Transaction object only if the session is transacted
-    if( isTransacted() ) {
-        transaction =
-            new ActiveMQTransaction(connection, this, properties );
+    if( this->isTransacted() ) {
+        this->transaction.reset( new ActiveMQTransactionContext( this, properties ) );
     }
 
     // Create the session executor object.
-    executor = new ActiveMQSessionExecutor( this );
+    this->executor.reset( new ActiveMQSessionExecutor( this ) );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -78,7 +98,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::fire( activemq::exceptions::ActiveMQException& ex ) {
+void ActiveMQSession::fire( const activemq::exceptions::ActiveMQException& ex ) {
     if( connection != NULL ) {
         connection->fire( ex );
     }
@@ -97,38 +117,38 @@
         // Stop the dispatch executor.
         stop();
 
+        // TODO
         // Get the complete list of closeable session resources.
         // Get the complete list of closeable session resources.
-        synchronized( &closableSessionResources ) {
-
-            Iterator<cms::Closeable*>* iter = closableSessionResources.iterator();
-            while( iter->hasNext() ) {
-                cms::Closeable* resource = iter->next();
-                try{
-                    resource->close();
-                } catch( cms::CMSException& ex ){
-                    /* Absorb */
-                }
-            }
-            delete iter;
-        }
+//        synchronized( &closableSessionResources ) {
+//
+//            Iterator<cms::Closeable*>* iter = closableSessionResources.iterator();
+//            while( iter->hasNext() ) {
+//                cms::Closeable* resource = iter->next();
+//                try{
+//                    resource->close();
+//                } catch( cms::CMSException& ex ){
+//                    /* Absorb */
+//                }
+//            }
+//            delete iter;
+//        }
 
+        // TODO = Commit it first.
         // Destroy the Transaction
-        if( transaction != NULL ){
-            delete transaction;
-            transaction = NULL;
+        if( this->transaction.get() != NULL ){
+            this->transaction->commit();
+            this->transaction.release();
         }
 
+        // Remove this session from the Broker.
+        this->connection->disposeOf( this->sessionInfo->getSessionId() );
+
         // Remove this sessions from the connector
-        connection->removeSession( this );
-        delete sessionInfo;
-        sessionInfo = NULL;
+        this->connection->removeSession( this );
 
         // Now indicate that this session is closed.
         closed = true;
-
-        delete executor;
-        executor = NULL;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -140,14 +160,16 @@
 
     try {
 
-        if( closed || !isTransacted() ) {
+        this->checkClosed();
+
+        if( !this->isTransacted() ) {
             throw ActiveMQException(
                 __FILE__, __LINE__,
-                "ActiveMQSession::commit - This Session Can't Commit");
+                "ActiveMQSession::commit - This Session is not Transacted");
         }
 
         // Commit the Transaction
-        transaction->commit();
+        this->transaction->commit();
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -159,14 +181,16 @@
 
     try{
 
-        if( closed || !isTransacted() ) {
+        this->checkClosed();
+
+        if( !this->isTransacted() ) {
             throw ActiveMQException(
                 __FILE__, __LINE__,
-                "ActiveMQSession::rollback - This Session Can't Rollback" );
+                "ActiveMQSession::rollback - This Session is not Transacted" );
         }
 
         // Rollback the Transaction
-        transaction->rollback();
+        this->transaction->rollback();
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -180,13 +204,9 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createConsumer - Session Already Closed" );
-        }
+        this->checkClosed();
 
-        return createConsumer( destination, "", false );
+        return this->createConsumer( destination, "", false );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -200,13 +220,10 @@
         throw ( cms::CMSException ) {
 
     try{
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createConsumer - Session Already Closed" );
-        }
 
-        return createConsumer( destination, selector, false );
+        this->checkClosed();
+
+        return this->createConsumer( destination, selector, false );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -222,49 +239,35 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createConsumer - Session Already Closed" );
-        }
+        this->checkClosed();
 
-        ConsumerInfo* consumerInfo =
-            connection->getConnectionData()->getConnector()->
-                createConsumer( destination,
-                                sessionInfo,
-                                selector,
-                                noLocal );
-
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( consumerInfo ) );
+        std::auto_ptr<commands::ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
 
-        // Create the consumer instance.
-        ActiveMQConsumer* consumer = new ActiveMQConsumer(
-            consumerInfo, this, this->transaction );
+        consumerInfo->setSelector( selector );
+        consumerInfo->setNoLocal( noLocal );
 
-        // Add the consumer to the map.
-        synchronized( &consumers ) {
-            consumers.setValue( consumerInfo->getConsumerId(), consumer );
-        }
+        // Override default options with uri-encoded parameters.
+        this->applyDestinationOptions( consumerInfo.get() );
 
-        // Register this as a message dispatcher for the consumer.
-        connection->addDispatcher( consumerInfo, this );
+        // Register this as a message dispatcher for the consumer since we
+        // could start receiving messages from the broker right away once we
+        // send the ConsumerInfo command.
+        this->connection->addDispatcher( consumerInfo.get(), this );
 
-        // Start the Consumer, we are now ready to receive messages
-        try{
-            connection->getConnectionData()->getConnector()->startConsumer(
-                consumerInfo );
-        } catch( ActiveMQException& ex ) {
-            synchronized( &consumers ) {
-                consumers.remove( consumerInfo->getConsumerId() );
-            }
-            delete consumer;
-            ex.setMark( __FILE__, __LINE__ );
-            throw ex;
+        // Create the consumer instance.
+        std::auto_ptr<ActiveMQConsumer> consumer(
+            new ActiveMQConsumer( consumerInfo.release(), this, this->transaction.get() ) );
+
+        // Send the message to the broker.
+        this->connection->syncRequest( consumer->getConsumerInfo() );
+
+        // Add the consumer to the map.
+        synchronized( &this->consumers ) {
+            this->consumers.setValue(
+                consumer->getConsumerInfo()->getConsumerId()->getValue(), consumer.get() );
         }
 
-        return consumer;
+        return consumer.release();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -281,47 +284,36 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createDurableConsumer - Session Already Closed" );
-        }
+        this->checkClosed();
 
-        ConsumerInfo* consumerInfo =
-            connection->getConnectionData()->getConnector()->
-                createDurableConsumer(
-                    destination, sessionInfo, name, selector, noLocal );
+        std::auto_ptr<commands::ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
 
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( consumerInfo ) );
+        consumerInfo->setSelector( selector );
+        consumerInfo->setNoLocal( noLocal );
+        consumerInfo->setSubscriptionName( name );
 
-        // Create the consumer instance.
-        ActiveMQConsumer* consumer = new ActiveMQConsumer(
-            consumerInfo, this, this->transaction );
+        // Override default options with uri-encoded parameters.
+        this->applyDestinationOptions( consumerInfo.get() );
 
-        // Add the consumer to the map.
-        synchronized( &consumers ) {
-            consumers.setValue( consumerInfo->getConsumerId(), consumer );
-        }
+        // Register this as a message dispatcher for the consumer since we
+        // could start receiving messages from the broker right away once we
+        // send the ConsumerInfo command.
+        this->connection->addDispatcher( consumerInfo.get(), this );
+
+        // Create the consumer instance.
+        std::auto_ptr<ActiveMQConsumer> consumer(
+            new ActiveMQConsumer( consumerInfo.release(), this, this->transaction.get() ) );
 
-        // Register this as a message dispatcher for the consumer.
-        connection->addDispatcher( consumerInfo, this );
+        // Send the message to the broker.
+        this->connection->syncRequest( consumer->getConsumerInfo() );
 
-        // Start the Consumer, we are now ready to receive messages
-        try{
-            connection->getConnectionData()->getConnector()->startConsumer(
-                consumerInfo );
-        } catch( ActiveMQException& ex ) {
-            synchronized( &consumers ) {
-                consumers.remove( consumerInfo->getConsumerId() );
-            }
-            delete consumer;
-            ex.setMark( __FILE__, __LINE__ );
-            throw ex;
+        // Add the consumer to the map.
+        synchronized( &this->consumers ) {
+            this->consumers.setValue(
+                consumer->getConsumerInfo()->getConsumerId()->getValue(), consumer.get() );
         }
 
-        return consumer;
+        return consumer.release();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -335,25 +327,54 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createProducer - Session Already Closed" );
-        }
+        this->checkClosed();
 
-        ProducerInfo* producerInfo =
-            connection->getConnectionData()->getConnector()->
-                createProducer( destination, sessionInfo );
+        std::auto_ptr<commands::ProducerId> producerId( new commands::ProducerId() );
+        producerId->setConnectionId( this->getSessionInfo()->getSessionId()->getConnectionId() );
+        producerId->setSessionId( this->getSessionInfo()->getSessionId()->getValue() );
+        producerId->setValue( this->connection->getNextProducerId() );
+
+        std::auto_ptr<commands::ProducerInfo> producerInfo( new commands::ProducerInfo() );
+        producerInfo->setProducerId( producerId.release() );
+        producerInfo->setWindowSize( this->connection->getProducerWindowSize() );
+
+        // Producers are allowed to have NULL destinations.  In this case, the
+        // destination is specified by the messages as they are sent.
+        if( destination != NULL ) {
+
+            // Cast the destination to an OpenWire destination, so we can
+            // get all the goodies.
+            const commands::ActiveMQDestination* amqDestination =
+                dynamic_cast<const commands::ActiveMQDestination*>( destination );
+            if( amqDestination == NULL ) {
+                throw ActiveMQException( __FILE__, __LINE__,
+                    "Destination is not a valid Type: commands::ActiveMQDestination" );
+            }
 
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( producerInfo ) );
+            // Get any options specified in the destination and apply them to the
+            // ProducerInfo object.
+            producerInfo->setDestination( amqDestination->cloneDataStructure() );
+            const ActiveMQProperties& options = amqDestination->getOptions();
+            producerInfo->setDispatchAsync( Boolean::parseBoolean(
+                options.getProperty( "producer.dispatchAsync", "false" )) );
+        }
 
         // Create the producer instance.
-        ActiveMQProducer* producer = new ActiveMQProducer(
-            producerInfo, this );
+        std::auto_ptr<ActiveMQProducer> producer(
+            new ActiveMQProducer( producerInfo.release(), destination, this ) );
 
-        return producer;
+        producer->setSendTimeout( this->connection->getSendTimeout() );
+
+        // Send the message to the broker.
+        this->connection->syncRequest( producer->getProducerInfo() );
+
+        synchronized( &this->producers ) {
+            // Place the Producer into the Map.
+            this->producers.setValue(
+                producer->getProducerInfo()->getProducerId()->getValue(), producer.get() );
+        }
+
+        return producer.release();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -365,20 +386,9 @@
     throw ( cms::CMSException ) {
 
     try{
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createQueue - Session Already Closed" );
-        }
-
-        cms::Queue* queue = connection->getConnectionData()->
-            getConnector()->createQueue( queueName, sessionInfo );
 
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( queue ) );
-
-        return queue;
+        this->checkClosed();
+        return new commands::ActiveMQQueue( queueName );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -391,20 +401,8 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTopic - Session Already Closed");
-        }
-
-        cms::Topic* topic = connection->getConnectionData()->
-            getConnector()->createTopic( topicName, sessionInfo );
-
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( topic ) );
-
-        return topic;
+        this->checkClosed();
+        return new commands::ActiveMQTopic( topicName );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -417,23 +415,15 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTemporaryQueue - "
-                "Session Already Closed" );
-        }
+        this->checkClosed();
 
-        // Create the consumer instance.
-        cms::TemporaryQueue* queue =
-            connection->getConnectionData()->
-                getConnector()->createTemporaryQueue( sessionInfo );
-
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( queue ) );
+        std::auto_ptr<commands::ActiveMQTempQueue> queue( new
+            commands::ActiveMQTempQueue( this->createTemporaryDestinationName() ) );
+
+        // Register it with the Broker
+        this->createTemporaryDestination( queue.get() );
 
-        return queue;
+        return queue.release();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -443,25 +433,18 @@
 ////////////////////////////////////////////////////////////////////////////////
 cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic()
     throw ( cms::CMSException ) {
+
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTemporaryTopic - "
-                "Session Already Closed" );
-        }
+        this->checkClosed();
 
-        // Create the consumer instance.
-        cms::TemporaryTopic* topic =
-            connection->getConnectionData()->
-                getConnector()->createTemporaryTopic( sessionInfo );
-
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( topic ) );
+        std::auto_ptr<commands::ActiveMQTempTopic> topic( new
+            commands::ActiveMQTempTopic( createTemporaryDestinationName() ) );
+
+        // Register it with the Broker
+        this->createTemporaryDestination( topic.get() );
 
-        return topic;
+        return topic.release();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -473,20 +456,9 @@
     throw ( cms::CMSException ) {
 
     try{
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createMessage - Session Already Closed" );
-        }
-
-        cms::Message* message = connection->getConnectionData()->
-            getConnector()->createMessage( sessionInfo, transaction );
 
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( message ) );
-
-        return message;
+        this->checkClosed();
+        return new commands::ActiveMQMessage();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -498,20 +470,9 @@
     throw ( cms::CMSException ) {
 
     try{
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createBytesMessage - Session Already Closed" );
-        }
-
-        cms::BytesMessage* message = connection->getConnectionData()->
-            getConnector()->createBytesMessage( sessionInfo, transaction );
-
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( message ) );
 
-        return message;
+        this->checkClosed();
+        return new commands::ActiveMQBytesMessage();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -525,10 +486,10 @@
         throw ( cms::CMSException ) {
 
     try{
-        BytesMessage* msg = createBytesMessage();
 
+        this->checkClosed();
+        BytesMessage* msg = createBytesMessage();
         msg->setBodyBytes( bytes, bytesSize );
-
         return msg;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -542,20 +503,8 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createTextMessage - Session Already Closed" );
-        }
-
-        cms::TextMessage* message = connection->getConnectionData()->
-            getConnector()->createTextMessage( sessionInfo, transaction );
-
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( message ) );
-
-        return message;
+        this->checkClosed();
+        return new commands::ActiveMQTextMessage();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -568,10 +517,9 @@
 
     try {
 
+        this->checkClosed();
         TextMessage* msg = createTextMessage();
-
         msg->setText( text.c_str() );
-
         return msg;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -585,20 +533,8 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createMapMessage - Session Already Closed" );
-        }
-
-        cms::MapMessage* message = connection->getConnectionData()->
-                getConnector()->createMapMessage( sessionInfo, transaction );
-
-        // Add to Session Closeables and Monitor for close, if needed.
-        checkConnectorResource(
-            dynamic_cast<ConnectorResource*>( message ) );
-
-        return message;
+        this->checkClosed();
+        return new commands::ActiveMQMapMessage();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -606,113 +542,80 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode() const
-{
-    return sessionInfo != NULL ?
-        sessionInfo->getAckMode() : Session::AUTO_ACKNOWLEDGE;
+cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode() const {
+    return this->ackMode;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQSession::isTransacted() const
-{
-    return sessionInfo != NULL ?
-        sessionInfo->getAckMode() == Session::SESSION_TRANSACTED : false;
+bool ActiveMQSession::isTransacted() const {
+    return this->ackMode == Session::SESSION_TRANSACTED;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::send( cms::Message* message, ActiveMQProducer* producer )
-    throw ( cms::CMSException ) {
+void ActiveMQSession::send(
+    cms::Message* message, ActiveMQProducer* producer, util::Usage* usage )
+        throw ( cms::CMSException ) {
 
     try {
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::onProducerClose - Session Already Closed" );
-        }
-
-        // Send via the connection synchronously.
-        connection->getConnectionData()->
-            getConnector()->send( message, producer->getProducerInfo() );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::onConnectorResourceClosed(
-    const ConnectorResource* resource ) throw ( cms::CMSException ) {
+        this->checkClosed();
 
-    try{
+        commands::Message* amqMessage =
+            dynamic_cast< commands::Message* >( message );
 
-        if( closed ) {
+        if( amqMessage == NULL ) {
             throw ActiveMQException(
                 __FILE__, __LINE__,
-                "ActiveMQSession::onProducerClose - Session Already Closed");
+                "ActiveMQSession::send - "
+                "Message is not a valid Open Wire type.");
         }
 
-        const ConsumerInfo* consumer =
-            dynamic_cast<const ConsumerInfo*>( resource );
+        // Clear any old data that might be in the message object
+        delete amqMessage->getMessageId();
+        delete amqMessage->getProducerId();
+        delete amqMessage->getTransactionId();
 
-        if( consumer != NULL ) {
+        // Always assign the message ID, regardless of the disable
+        // flag.  Not adding a message ID will cause an NPE at the broker.
+        commands::MessageId* id = new commands::MessageId();
+        id->setProducerId(
+            producer->getProducerInfo()->getProducerId()->cloneDataStructure() );
+        id->setProducerSequenceId( this->connection->getNextProducerSequenceId() );
 
-            // If the executor thread is currently running, stop it.
-            bool wasStarted = isStarted();
-            if( wasStarted ) {
-                stop();
-            }
+        amqMessage->setMessageId( id );
+        amqMessage->setProducerId(
+            producer->getProducerInfo()->getProducerId()->cloneDataStructure() );
 
-            // Remove the dispatcher for the Connection
-            connection->removeDispatcher( consumer );
+        if( this->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
 
-            // Remove this consumer from the Transaction if we are transacted
-            if( transaction != NULL ) {
-                transaction->removeFromTransaction( consumer->getConsumerId() );
+            if( this->transaction.get() == NULL ) {
+                throw ActiveMQException(
+                    __FILE__, __LINE__,
+                    "ActiveMQException::send - "
+                    "Transacted Session, has no Transaction Info.");
             }
 
-            ActiveMQConsumer* obj = NULL;
-            synchronized( &consumers ) {
-
-                if( consumers.containsKey( consumer->getConsumerId() ) ) {
+            amqMessage->setTransactionId(
+                this->transaction->getTransactionId()->cloneDataStructure() );
+        }
 
-                    // Get the consumer reference
-                    obj = consumers.getValue( consumer->getConsumerId() );
+        if( this->connection->getSendTimeout() <= 0 &&
+            !amqMessage->isResponseRequired() &&
+            !this->connection->isAlwaysSyncSend() &&
+            ( !amqMessage->isPersistent() || this->connection->isUseAsyncSend() ||
+              amqMessage->getTransactionId() != NULL ) ) {
 
-                    // Remove this consumer from the map.
-                    consumers.remove( consumer->getConsumerId() );
-                }
+            if( usage != NULL ) {
+                usage->enqueueUsage( amqMessage->getSize() );
             }
 
-            // Clean up any resources in the executor for this consumer
-            if( obj != NULL && executor != NULL ) {
+            // No Response Required.
+            this->connection->oneway( amqMessage );
 
-                // Purge any pending messages for this consumer.
-                vector<ActiveMQMessage*> messages =
-                    executor->purgeConsumerMessages(obj);
-
-                // Destroy the messages.
-                for( unsigned int ix=0; ix<messages.size(); ++ix ) {
-                    delete messages[ix];
-                }
-            }
-
-            // If the executor thread was previously running, start it back
-            // up.
-            if( wasStarted ) {
-                start();
-            }
-        }
+        } else {
 
-        // Remove the entry from the session resource map if it's there
-        const cms::Closeable* closeable =
-            dynamic_cast<const cms::Closeable*>( resource );
-
-        if( closeable != NULL ){
-            synchronized( &closableSessionResources ) {
-                closableSessionResources.remove(
-                    const_cast<cms::Closeable*>( closeable ) );
-            }
+            // Send the message to the broker.
+            this->connection->syncRequest( amqMessage, this->connection->getSendTimeout() );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -721,6 +624,86 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+//void ActiveMQSession::onConnectorResourceClosed(
+//    const ConnectorResource* resource ) throw ( cms::CMSException ) {
+//
+//    try{
+//
+//        if( closed ) {
+//            throw ActiveMQException(
+//                __FILE__, __LINE__,
+//                "ActiveMQSession::onProducerClose - Session Already Closed");
+//        }
+//
+//        const ConsumerInfo* consumer =
+//            dynamic_cast<const ConsumerInfo*>( resource );
+//
+//        if( consumer != NULL ) {
+//
+//            // If the executor thread is currently running, stop it.
+//            bool wasStarted = isStarted();
+//            if( wasStarted ) {
+//                stop();
+//            }
+//
+//            // Remove the dispatcher for the Connection
+//            connection->removeDispatcher( consumer );
+//
+//            // Remove this consumer from the Transaction if we are transacted
+//            if( transaction != NULL ) {
+//                transaction->removeFromTransaction( consumer->getConsumerId() );
+//            }
+//
+//            ActiveMQConsumer* obj = NULL;
+//            synchronized( &consumers ) {
+//
+//                if( consumers.containsKey( consumer->getConsumerId() ) ) {
+//
+//                    // Get the consumer reference
+//                    obj = consumers.getValue( consumer->getConsumerId() );
+//
+//                    // Remove this consumer from the map.
+//                    consumers.remove( consumer->getConsumerId() );
+//                }
+//            }
+//
+//            // Clean up any resources in the executor for this consumer
+//            if( obj != NULL && executor != NULL ) {
+//
+//                // Purge any pending messages for this consumer.
+//                vector<ActiveMQMessage*> messages =
+//                    executor->purgeConsumerMessages(obj);
+//
+//                // Destroy the messages.
+//                for( unsigned int ix=0; ix<messages.size(); ++ix ) {
+//                    delete messages[ix];
+//                }
+//            }
+//
+//            // If the executor thread was previously running, start it back
+//            // up.
+//            if( wasStarted ) {
+//                start();
+//            }
+//        }
+//
+//        // Remove the entry from the session resource map if it's there
+//        const cms::Closeable* closeable =
+//            dynamic_cast<const cms::Closeable*>( resource );
+//
+//        if( closeable != NULL ){
+//            synchronized( &closableSessionResources ) {
+//                closableSessionResources.remove(
+//                    const_cast<cms::Closeable*>( closeable ) );
+//            }
+//        }
+//    }
+//    AMQ_CATCH_RETHROW( ActiveMQException )
+//    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+//    AMQ_CATCHALL_THROW( ActiveMQException )
+//}
+
+////////////////////////////////////////////////////////////////////////////////
 cms::ExceptionListener* ActiveMQSession::getExceptionListener()
 {
     if( connection != NULL ) {
@@ -736,33 +719,18 @@
 
     try{
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createConsumer - Session Already Closed" );
-        }
-
-        // Delegate to the connector.
-        connection->getConnectionData()->getConnector()->unsubscribe( name );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
+        this->checkClosed();
 
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
-    throw ( activemq::exceptions::ActiveMQException ) {
-
-    try {
+        std::auto_ptr<commands::RemoveSubscriptionInfo> rsi(
+            new commands::RemoveSubscriptionInfo() );
 
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::createConsumer - Session Already Closed" );
-        }
+        rsi->setConnectionId(
+            this->connection->getConnectionInfo()->getConnectionId()->cloneDataStructure() );
+        rsi->setSubcriptionName( name );
+        rsi->setClientId( this->connection->getConnectionInfo()->getClientId() );
 
-        this->connection->sendPullRequest( consumer, timeout );
+        // Send the message to the broker.
+        this->connection->syncRequest( rsi.get() );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -770,27 +738,10 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::checkConnectorResource(
-    connector::ConnectorResource* resource ) {
-
-    if( resource == NULL ) {
-        return;
-    }
-
-    // Add the consumer to the map of closeable session resources.
-    synchronized( &closableSessionResources ) {
-        closableSessionResources.add( resource );
-    }
-
-    // Register as a Listener
-    resource->addListener( this );
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::dispatch( DispatchData& message ) {
 
-    if( executor != NULL ) {
-        executor->execute( message );
+    if( this->executor.get() != NULL ) {
+        this->executor->execute( message );
     }
 }
 
@@ -815,25 +766,271 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::start() {
 
-    if( executor != NULL ) {
-        executor->start();
+    if( this->executor.get() != NULL ) {
+        this->executor->start();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::stop() {
 
-    if( executor != NULL ) {
-        executor->stop();
+    if( this->executor.get() != NULL ) {
+        this->executor->stop();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSession::isStarted() const {
 
-    if( executor == NULL ) {
+    if( this->executor.get() != NULL ) {
         return false;
     }
 
-    return executor->isStarted();
+    return this->executor->isStarted();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::ConsumerInfo* ActiveMQSession::createConsumerInfo(
+    const cms::Destination* destination )throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+
+        this->checkClosed();
+
+        std::auto_ptr<commands::ConsumerInfo> consumerInfo( new commands::ConsumerInfo() );
+        std::auto_ptr<commands::ConsumerId> consumerId( new commands::ConsumerId() );
+
+        consumerId->setConnectionId(
+            this->connection->getConnectionId()->getValue() );
+        consumerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
+        consumerId->setValue( this->connection->getNextSessionId() );
+
+        consumerInfo->setConsumerId( consumerId.release() );
+
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const commands::ActiveMQDestination* amqDestination =
+            dynamic_cast<const commands::ActiveMQDestination*>( destination );
+
+        if( amqDestination == NULL ) {
+            throw activemq::exceptions::ActiveMQException( __FILE__, __LINE__,
+                "Destination was either NULL or not created by this OpenWireConnector" );
+        }
+
+        consumerInfo->setDestination( amqDestination->cloneDataStructure() );
+
+        return consumerInfo.release();
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::applyDestinationOptions( commands::ConsumerInfo* info ) {
+
+    const commands::ActiveMQDestination* amqDestination = info->getDestination();
+
+    // Get any options specified in the destination and apply them to the
+    // ConsumerInfo object.
+    const ActiveMQProperties& options = amqDestination->getOptions();
+
+    std::string noLocalStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_NOLOCAL );
+    if( options.hasProperty( noLocalStr ) ) {
+        info->setNoLocal( Boolean::parseBoolean(
+            options.getProperty( noLocalStr ) ) );
+    }
+
+    std::string selectorStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_SELECTOR );
+    if( options.hasProperty( selectorStr ) ) {
+        info->setSelector( options.getProperty( selectorStr ) );
+    }
+
+    std::string priorityStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PRIORITY );
+    if( options.hasProperty( priorityStr ) ) {
+        info->setPriority( Integer::parseInt( options.getProperty( priorityStr ) ) );
+    }
+
+    std::string dispatchAsyncStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_DISPATCHASYNC );
+    if( options.hasProperty( dispatchAsyncStr ) ) {
+        info->setDispatchAsync(
+            Boolean::parseBoolean( options.getProperty( dispatchAsyncStr ) ) );
+    }
+
+    std::string exclusiveStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_EXCLUSIVE );
+    if( options.hasProperty( exclusiveStr ) ) {
+        info->setExclusive(
+            Boolean::parseBoolean( options.getProperty( exclusiveStr ) ) );
+    }
+
+    std::string maxPendingMsgLimitStr =
+        core::ActiveMQConstants::toString(
+            core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
+
+    if( options.hasProperty( maxPendingMsgLimitStr ) ) {
+        info->setMaximumPendingMessageLimit(
+            Integer::parseInt(
+                options.getProperty( maxPendingMsgLimitStr ) ) );
+    }
+
+    std::string prefetchSizeStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
+    if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr )  ) {
+        info->setPrefetchSize(
+            Integer::parseInt( options.getProperty( prefetchSizeStr, "1000" ) ) );
+    }
+
+    std::string retroactiveStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_RETROACTIVE );
+    if( options.hasProperty( retroactiveStr ) ) {
+        info->setRetroactive(
+            Boolean::parseBoolean( options.getProperty( retroactiveStr ) ) );
+    }
+
+    std::string browserStr = "consumer.browser";
+
+    if( options.hasProperty( browserStr ) ) {
+        info->setBrowser(
+            Boolean::parseBoolean(
+                options.getProperty( browserStr ) ) );
+    }
+
+    std::string networkSubscriptionStr = "consumer.networkSubscription";
+
+    if( options.hasProperty( networkSubscriptionStr ) ) {
+        info->setNetworkSubscription(
+            Boolean::parseBoolean(
+                options.getProperty( networkSubscriptionStr ) ) );
+    }
+
+    std::string optimizedAcknowledgeStr = "consumer.optimizedAcknowledge";
+
+    if( options.hasProperty( optimizedAcknowledgeStr ) ) {
+        info->setOptimizedAcknowledge(
+            Boolean::parseBoolean(
+                options.getProperty( optimizedAcknowledgeStr ) ) );
+    }
+
+    std::string noRangeAcksStr = "consumer.noRangeAcks";
+
+    if( options.hasProperty( noRangeAcksStr ) ) {
+        info->setNoRangeAcks(
+            Boolean::parseBoolean(
+                options.getProperty( noRangeAcksStr ) ) );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::createTemporaryDestination(
+    commands::ActiveMQTempDestination* tempDestination )
+        throw ( activemq::exceptions::ActiveMQException ) {
+
+    try {
+
+        commands::DestinationInfo command;
+        command.setConnectionId( this->connection->getConnectionId()->cloneDataStructure() );
+        command.setOperationType( ActiveMQConstants::DESTINATION_ADD_OPERATION );
+        command.setDestination( tempDestination->cloneDataStructure() );
+
+        // Send the message to the broker.
+        this->connection->syncRequest( &command );
+
+        // TODO - Manage Resources
+        // Now that its setup, link it to this Connector
+        // tempDestination->setConnector( this );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::destroyTemporaryDestination(
+    commands::ActiveMQTempDestination* tempDestination )
+        throw ( activemq::exceptions::ActiveMQException ) {
+
+    try {
+
+        commands::DestinationInfo command;
+
+        command.setConnectionId( this->connection->getConnectionId()->cloneDataStructure() );
+        command.setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
+        command.setDestination( tempDestination->cloneDataStructure() );
+
+        // Send the message to the broker.
+        this->connection->syncRequest( &command );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQSession::createTemporaryDestinationName()
+    throw ( activemq::exceptions::ActiveMQException )
+{
+    try {
+        return this->connection->getConnectionId()->getValue() + ":" +
+               Long::toString( this->connection->getNextTempDestinationId() );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::TransactionId* ActiveMQSession::createLocalTransactionId()
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+        std::auto_ptr<commands::LocalTransactionId> id( new commands::LocalTransactionId() );
+
+        id->setConnectionId( this->connection->getConnectionId()->cloneDataStructure() );
+        id->setValue( this->connection->getNextTransactionId() );
+
+        return id.release();
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::oneway( transport::Command* command )
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+        this->checkClosed();
+        this->connection->oneway( command );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::syncRequest( transport::Command* command, unsigned int timeout )
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+        this->checkClosed();
+        this->connection->syncRequest( command, timeout );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::checkClosed() throw( exceptions::ActiveMQException ) {
+    if( closed ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQSession - Session Already Closed" );
+    }
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Thu Jan 22 14:55:27 2009
@@ -19,20 +19,28 @@
 
 #include <cms/Session.h>
 #include <cms/ExceptionListener.h>
+
 #include <activemq/util/Config.h>
+#include <activemq/util/Usage.h>
 #include <activemq/exceptions/ActiveMQException.h>
-#include <activemq/connector/SessionInfo.h>
-#include <activemq/connector/ConnectorResourceListener.h>
+#include <activemq/commands/ActiveMQTempDestination.h>
+#include <activemq/commands/SessionInfo.h>
+#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/TransactionId.h>
 #include <activemq/core/Dispatcher.h>
+
 #include <decaf/util/Map.h>
 #include <decaf/util/Set.h>
 #include <decaf/util/Queue.h>
 #include <decaf/util/Properties.h>
 
+#include <string>
+#include <memory>
+
 namespace activemq{
 namespace core{
 
-    class ActiveMQTransaction;
+    class ActiveMQTransactionContext;
     class ActiveMQConnection;
     class ActiveMQConsumer;
     class ActiveMQMessage;
@@ -40,22 +48,19 @@
     class ActiveMQConsumer;
     class ActiveMQSessionExecutor;
 
-    class AMQCPP_API ActiveMQSession :
-        public cms::Session,
-        public Dispatcher,
-        public connector::ConnectorResourceListener
-    {
+    class AMQCPP_API ActiveMQSession : public cms::Session,
+                                       public Dispatcher {
     private:
 
         /**
          * SessionInfo for this Session
          */
-        connector::SessionInfo* sessionInfo;
+        std::auto_ptr<commands::SessionInfo> sessionInfo;
 
         /**
          * Transaction Management object
          */
-        ActiveMQTransaction* transaction;
+        std::auto_ptr<ActiveMQTransactionContext> transaction;
 
         /**
          * Connection
@@ -68,25 +73,34 @@
         bool closed;
 
         /**
-         * The set of closable session resources;
-         * This can consist of consumers and producers and sometimes
-         * destination.
+         * Map of consumers.
          */
-        decaf::util::Set<cms::Closeable*> closableSessionResources;
+        decaf::util::Map<long long, ActiveMQConsumer*> consumers;
 
         /**
          * Map of consumers.
          */
-        decaf::util::Map<long long, ActiveMQConsumer*> consumers;
+        decaf::util::Map<long long, ActiveMQProducer*> producers;
+
+        /**
+         * Map of consumers.
+         */
+        decaf::util::Map<long long, commands::ActiveMQTempDestination*> tempDestinations;
 
         /**
          * Sends incoming messages to the registered consumers.
          */
-        ActiveMQSessionExecutor* executor;
+        std::auto_ptr<ActiveMQSessionExecutor> executor;
+
+        /**
+         * This Sessions Acknowledgment mode.
+         */
+        cms::Session::AcknowledgeMode ackMode;
 
     public:
 
-        ActiveMQSession( connector::SessionInfo* sessionInfo,
+        ActiveMQSession( commands::SessionInfo* sessionInfo,
+                         cms::Session::AcknowledgeMode ackMode,
                          const decaf::util::Properties& properties,
                          ActiveMQConnection* connection );
 
@@ -119,19 +133,19 @@
         bool isStarted() const;
 
         bool isAutoAcknowledge() const {
-            return sessionInfo->getAckMode() == cms::Session::AUTO_ACKNOWLEDGE;
+            return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
         }
         bool isDupsOkAcknowledge() const {
-            return sessionInfo->getAckMode() == cms::Session::DUPS_OK_ACKNOWLEDGE;
+            return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
         }
         bool isClientAcknowledge() const {
-            return sessionInfo->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE;
+            return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
         }
 
         /**
          * Fires the given exception to the exception listener of the connection
          */
-        void fire( exceptions::ActiveMQException& ex );
+        void fire( const exceptions::ActiveMQException& ex );
 
     public:  // Methods from ActiveMQMessageDispatcher
 
@@ -271,7 +285,7 @@
             throw ( cms::CMSException );
 
         /**
-         * Creates a BytesMessage and sets the paylod to the passed value
+         * Creates a BytesMessage and sets the payload to the passed value
          * @param an array of bytes to set in the message
          * @param the size of the bytes array, or number of bytes to use
          * @throws CMSException
@@ -304,7 +318,7 @@
             throw ( cms::CMSException );
 
         /**
-         * Returns the acknowledgement mode of the session.
+         * Returns the acknowledgment mode of the session.
          * @return the Sessions Acknowledge Mode
          */
         virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
@@ -334,12 +348,23 @@
    public:   // ActiveMQSession specific Methods
 
         /**
-         * Sends a message from the Producer specified
-         * @param cms::Message pointer
-         * @param Producer Information
+         * Sends a message from the Producer specified using this session's connection
+         * the message will be sent using the best available means depending on the
+         * configuration of the connection.
+         * <p>
+         * Asynchronous sends will be chosen if at all possible.
+         *
+         * @param message
+         *        The message to send to the broker.
+         * @param prducer
+         *        The sending Producer
+         * @param usage
+         *        Pointer to a Usage tracker which if set will be increased by the size
+         *        of the given message.
+         *
          * @throws CMSException
          */
-        virtual void send( cms::Message* message, ActiveMQProducer* producer )
+        void send( cms::Message* message, ActiveMQProducer* producer, util::Usage* usage )
             throw ( cms::CMSException );
 
         /**
@@ -349,15 +374,15 @@
          * exceptions that occur in the context of another thread.
          * @returns cms::ExceptionListener pointer or NULL
          */
-        virtual cms::ExceptionListener* getExceptionListener();
+        cms::ExceptionListener* getExceptionListener();
 
         /**
          * Gets the Session Information object for this session, if the
          * session is closed than this returns null
          * @return SessionInfo Pointer
          */
-        virtual connector::SessionInfo* getSessionInfo() {
-            return this->sessionInfo;
+        commands::SessionInfo* getSessionInfo() {
+            return this->sessionInfo.get();
         }
 
         /**
@@ -368,40 +393,66 @@
         }
 
         /**
-         * If supported sends a message pull request to the service provider asking
-         * for the delivery of a new message.  This is used in the case where the
-         * service provider has been configured with a zero prefectch or is only
-         * capable of delivering messages on a pull basis.
-         * @param consumer - the ConsumerInfo for the requesting Consumer.
-         * @param timeout - the time that the client is willing to wait.
-         */
-        virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout )
-            throw ( exceptions::ActiveMQException );
-
-    protected:   // ConnectorResourceListener
-
-        /**
-         * When a Connector Resouce is closed it will notify any registered
-         * Listeners of its close so that they can take the appropriate
-         * action.
-         * @param resource - The ConnectorResource that was closed.
-         */
-        virtual void onConnectorResourceClosed(
-            const connector::ConnectorResource* resource ) throw ( cms::CMSException );
-
-    protected:
-
-        /**
-         * Given a ConnectorResource pointer, this method will add it to the map
-         * of closeable resources that this connection must close on shutdown
-         * and register itself as a ConnectorResourceListener so that it
-         * can be told when the resouce has been closed by someone else
-         * and remove it from its map of closeable resources.
-         * @param resource - ConnectorResouce to monitor, if NULL no action
-         *                   is taken and no exception is thrown.
-         */
-        virtual void checkConnectorResource(
-            connector::ConnectorResource* resource );
+         * Sends a oneway message.
+         * @param command The message to send.
+         * @throws ConnectorException if not currently connected, or
+         * if the operation fails for any reason.
+         */
+        void oneway( transport::Command* command )
+            throw ( activemq::exceptions::ActiveMQException );
+
+        /**
+         * Sends a synchronous request and returns the response from the broker.
+         * Converts any error responses into an exception.
+         * @param command The request command.
+         * @param timeout The time to wait for a response, default is zero or infinite.
+         * @throws ConnectorException thrown if an error response was received
+         * from the broker, or if any other error occurred.
+         */
+        void syncRequest( transport::Command* command, unsigned int timeout = 0 )
+            throw ( activemq::exceptions::ActiveMQException );
+
+   private:
+
+       // Checks for the closed state and throws if so.
+       void checkClosed() throw( exceptions::ActiveMQException );
+
+       // Performs the work of creating and configuring a valid Consumer Info, this
+       // can be used both by the normal createConsumer call and by a createDurableConsumer
+       // call as well.  Caller owns the returned ConsumerInfo object.
+       commands::ConsumerInfo* createConsumerInfo(
+           const cms::Destination* destination )
+               throw ( activemq::exceptions::ActiveMQException );
+
+       // Using options from the Destination URI override any settings that are
+       // defined for this consumer.
+       void applyDestinationOptions( commands::ConsumerInfo* info );
+
+       // Send the Destination Creation Request to the Broker, alerting it
+       // that we've created a new Temporary Destination.
+       // @param tempDestination - The new Temporary Destination
+       void createTemporaryDestination(
+           commands::ActiveMQTempDestination* tempDestination )
+               throw ( activemq::exceptions::ActiveMQException );
+
+       // Send the Destination Destruction Request to the Broker, alerting
+       // it that we've removed an existing Temporary Destination.
+       // @param tempDestination - The Temporary Destination to remove
+       void destroyTemporaryDestination(
+           commands::ActiveMQTempDestination* tempDestination )
+               throw ( activemq::exceptions::ActiveMQException );
+
+       // Creates a new Temporary Destination name using the connection id
+       // and a rolling count.
+       // @returns a unique Temporary Destination name
+       std::string createTemporaryDestinationName()
+           throw ( activemq::exceptions::ActiveMQException );
+
+       // Create a Transaction Id using the local context to create
+       // the LocalTransactionId Command.
+       // @returns a new TransactionId pointer, caller owns.
+       commands::TransactionId* createLocalTransactionId()
+           throw ( activemq::exceptions::ActiveMQException );
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=736842&r1=736841&r2=736842&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp Thu Jan 22 14:55:27 2009
@@ -19,7 +19,7 @@
 #include "ActiveMQSession.h"
 #include "ActiveMQMessage.h"
 #include "ActiveMQConsumer.h"
-#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/commands/ConsumerInfo.h>
 
 using namespace std;
 using namespace activemq;
@@ -94,7 +94,7 @@
 {
     vector<ActiveMQMessage*> retVal;
 
-    const connector::ConsumerInfo* consumerInfo = consumer->getConsumerInfo();
+    const commands::ConsumerInfo* consumerInfo = consumer->getConsumerInfo();
 
     synchronized( &mutex ) {
 
@@ -102,9 +102,7 @@
         while( iter != messageQueue.end() ) {
             list<DispatchData>::iterator currentIter = iter;
             DispatchData& dispatchData = *iter++;
-            if( consumerInfo == dispatchData.getConsumer() ||
-                consumerInfo->getConsumerId() == dispatchData.getConsumer()->getConsumerId() )
-            {
+            if( consumerInfo->getConsumerId() == dispatchData.getConsumerId() ) {
                 retVal.push_back( dispatchData.getMessage() );
                 messageQueue.erase(currentIter);
             }
@@ -174,7 +172,7 @@
         Map<long long, ActiveMQConsumer*>& consumers = session->getConsumers();
 
         synchronized( &consumers ) {
-            consumer = consumers.getValue( data.getConsumer()->getConsumerId() );
+            consumer = consumers.getValue( data.getConsumerId()->getValue() );
         }
 
         // If the consumer is not available, just delete the message.



Mime
View raw message