activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1305601 [2/5] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/cmsutil/ main/activemq/core/ main/activemq/core/kernels/ main/cms/ test/
Date Mon, 26 Mar 2012 21:11:14 GMT
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Mar 26 21:11:12 2012
@@ -16,33 +16,11 @@
  */
 #include "ActiveMQConsumer.h"
 
-#include <decaf/lang/exceptions/NullPointerException.h>
-#include <decaf/lang/exceptions/InvalidStateException.h>
-#include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/lang/Math.h>
-#include <decaf/lang/System.h>
-#include <decaf/lang/Boolean.h>
-#include <decaf/lang/Integer.h>
-#include <decaf/lang/Long.h>
 #include <activemq/util/Config.h>
 #include <activemq/util/CMSExceptionSupport.h>
-#include <activemq/util/ActiveMQProperties.h>
 #include <activemq/exceptions/ActiveMQException.h>
-#include <activemq/commands/Message.h>
-#include <activemq/commands/MessageAck.h>
-#include <activemq/commands/MessagePull.h>
-#include <activemq/commands/RemoveInfo.h>
-#include <activemq/commands/TransactionInfo.h>
-#include <activemq/commands/TransactionId.h>
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/core/ActiveMQConstants.h>
-#include <activemq/core/ActiveMQSession.h>
-#include <activemq/core/ActiveMQTransactionContext.h>
-#include <activemq/core/ActiveMQAckHandler.h>
-#include <activemq/core/FifoMessageDispatchChannel.h>
-#include <activemq/core/SimplePriorityMessageDispatchChannel.h>
+#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
 #include <activemq/core/RedeliveryPolicy.h>
-#include <activemq/threads/Scheduler.h>
 #include <cms/ExceptionListener.h>
 #include <memory>
 
@@ -50,535 +28,98 @@ using namespace std;
 using namespace activemq;
 using namespace activemq::util;
 using namespace activemq::core;
-using namespace activemq::commands;
+using namespace activemq::core::kernels;
 using namespace activemq::exceptions;
-using namespace activemq::threads;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
+namespace activemq {
 namespace core {
 
-    class ActiveMQConsumerMembers {
+    class ActiveMQConsumerData {
     private:
 
-        ActiveMQConsumerMembers( const ActiveMQConsumerMembers& );
-        ActiveMQConsumerMembers& operator= ( const ActiveMQConsumerMembers& );
+        ActiveMQConsumerData(const ActiveMQConsumerData&);
+        ActiveMQConsumerData& operator=(const ActiveMQConsumerData&);
 
     public:
 
-        cms::MessageListener* listener;
-        decaf::util::concurrent::Mutex listenerMutex;
-        AtomicBoolean deliveringAcks;
-        AtomicBoolean started;
-        Pointer<MessageDispatchChannel> unconsumedMessages;
-        decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
-        long long lastDeliveredSequenceId;
-        Pointer<commands::MessageAck> pendingAck;
-        int deliveredCounter;
-        int additionalWindowSize;
-        volatile bool synchronizationRegistered;
-        bool clearDispatchList;
-        bool inProgressClearRequiredFlag;
-        long long redeliveryDelay;
-        Pointer<RedeliveryPolicy> redeliveryPolicy;
-        Pointer<Exception> failureError;
-        Pointer<Scheduler> scheduler;
-
-        ActiveMQConsumerMembers() : listener(NULL),
-                                    listenerMutex(),
-                                    deliveringAcks(),
-                                    started(),
-                                    unconsumedMessages(),
-                                    dispatchedMessages(),
-                                    lastDeliveredSequenceId(0),
-                                    pendingAck(),
-                                    deliveredCounter(0),
-                                    additionalWindowSize(0),
-                                    synchronizationRegistered(false),
-                                    clearDispatchList(false),
-                                    inProgressClearRequiredFlag(false),
-                                    redeliveryDelay(0),
-                                    redeliveryPolicy(),
-                                    failureError(),
-                                    scheduler() {
-        }
-
-    };
-
-    /**
-     * Class used to deal with consumers in an active transaction.  This
-     * class calls back into the consumer when the transaction is Committed or
-     * Rolled Back to process that event.
-     */
-    class TransactionSynhcronization : public Synchronization {
-    private:
-
-        ActiveMQConsumer* consumer;
-
-    private:
-
-        TransactionSynhcronization( const TransactionSynhcronization& );
-        TransactionSynhcronization& operator= ( const TransactionSynhcronization& );
-
-    public:
-
-        TransactionSynhcronization( ActiveMQConsumer* consumer ) : consumer(consumer) {
-
-            if( consumer == NULL ) {
-                throw NullPointerException(
-                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
-            }
-        }
-
-        virtual ~TransactionSynhcronization() {}
-
-        virtual void beforeEnd() {
-            consumer->acknowledge();
-            consumer->setSynchronizationRegistered( false );
-        }
-
-        virtual void afterCommit() {
-            consumer->commit();
-            consumer->setSynchronizationRegistered( false );
-        }
-
-        virtual void afterRollback() {
-            consumer->rollback();
-            consumer->setSynchronizationRegistered( false );
-        }
-
-    };
-
-    /**
-     * Class used to Hook a consumer that has been closed into the Transaction
-     * it is currently a part of.  Once the Transaction has been Committed or
-     * Rolled back this Synchronization can finish the Close of the consumer.
-     */
-    class CloseSynhcronization : public Synchronization {
-    private:
-
-        ActiveMQConsumer* consumer;
-
-    private:
-
-        CloseSynhcronization( const CloseSynhcronization& );
-        CloseSynhcronization& operator= ( const CloseSynhcronization& );
-
-    public:
-
-        CloseSynhcronization( ActiveMQConsumer* consumer ) : consumer(consumer) {
-
-            if( consumer == NULL ) {
-                throw NullPointerException(
-                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
-            }
-        }
-
-        virtual ~CloseSynhcronization() {}
-
-        virtual void beforeEnd() {
-        }
-
-        virtual void afterCommit() {
-            consumer->doClose();
-        }
-
-        virtual void afterRollback() {
-            consumer->doClose();
-        }
-
-    };
-
-    /**
-     * ActiveMQAckHandler used to support Client Acknowledge mode.
-     */
-    class ClientAckHandler : public ActiveMQAckHandler {
-    private:
-
-        ActiveMQSession* session;
-
-    private:
+        Pointer<ActiveMQConsumerKernel> kernel;
 
-        ClientAckHandler( const ClientAckHandler& );
-        ClientAckHandler& operator= ( const ClientAckHandler& );
-
-    public:
-
-        ClientAckHandler( ActiveMQSession* session ) : session(session) {
-            if( session == NULL ) {
-                throw NullPointerException(
-                    __FILE__, __LINE__, "Ack Handler Created with NULL Session.");
-            }
-        }
-
-        void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
-
-            try {
-                this->session->acknowledge();
-            }
-            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-        }
-    };
-
-    /**
-     * ActiveMQAckHandler used to enable the Individual Acknowledge mode.
-     */
-    class IndividualAckHandler : public ActiveMQAckHandler {
-    private:
-
-        ActiveMQConsumer* consumer;
-        Pointer<commands::MessageDispatch> dispatch;
-
-    private:
-
-        IndividualAckHandler( const IndividualAckHandler& );
-        IndividualAckHandler& operator= ( const IndividualAckHandler& );
-
-    public:
-
-        IndividualAckHandler( ActiveMQConsumer* consumer, const Pointer<MessageDispatch>& dispatch ) :
-            consumer(consumer), dispatch(dispatch) {
-
-            if( consumer == NULL ) {
-                throw NullPointerException(
-                    __FILE__, __LINE__, "Ack Handler Created with NULL consumer.");
-            }
-        }
-
-        void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
-
-            try {
-
-                if( this->dispatch != NULL ) {
-                    this->consumer->acknowledge( this->dispatch );
-                    this->dispatch.reset( NULL );
-                }
-            }
-            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-        }
-    };
-
-    /**
-     * Class used to Start a Consumer's dispatch queue asynchronously from the
-     * configured Scheduler.
-     */
-    class StartConsumerTask : public Runnable {
-    private:
-
-        ActiveMQConsumer* consumer;
-
-    private:
-
-        StartConsumerTask( const StartConsumerTask& );
-        StartConsumerTask& operator= ( const StartConsumerTask& );
-
-    public:
-
-        StartConsumerTask( ActiveMQConsumer* consumer ) : Runnable(), consumer(NULL) {
-
-            if( consumer == NULL ) {
-                throw NullPointerException(
-                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
-            }
-
-            this->consumer = consumer;
-        }
-
-        virtual ~StartConsumerTask() {}
-
-        virtual void run() {
-            try{
-                if(!this->consumer->isClosed()) {
-                    this->consumer->start();
-                }
-            } catch(cms::CMSException& ex) {
-                // TODO - Need Connection onAsyncException method.
-            }
+        ActiveMQConsumerData(const Pointer<ActiveMQConsumerKernel>& kernel) : kernel(kernel) {
         }
     };
 
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer::ActiveMQConsumer( ActiveMQSession* session,
-                                    const Pointer<ConsumerId>& id,
-                                    const Pointer<ActiveMQDestination>& destination,
-                                    const std::string& name,
-                                    const std::string& selector,
-                                    int prefetch,
-                                    int maxPendingMessageCount,
-                                    bool noLocal,
-                                    bool browser,
-                                    bool dispatchAsync,
-                                    cms::MessageListener* listener ) : internal(NULL), session(NULL), consumerInfo() {
-
-    if( session == NULL ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session" );
-    }
-
-    if( destination == NULL ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Destination" );
-    }
-
-    if( destination->getPhysicalName() == "" ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQConsumer::ActiveMQConsumer - Destination given has no Physical Name." );
-    }
-
-    this->internal = new ActiveMQConsumerMembers();
-
-    Pointer<ConsumerInfo> consumerInfo( new ConsumerInfo() );
+ActiveMQConsumer::ActiveMQConsumer(const Pointer<ActiveMQConsumerKernel>& kernel) : MessageConsumer(), config(NULL) {
 
-    consumerInfo->setConsumerId( id );
-    consumerInfo->setDestination( destination );
-    consumerInfo->setSubscriptionName( name );
-    consumerInfo->setSelector( selector );
-    consumerInfo->setPrefetchSize( prefetch );
-    consumerInfo->setMaximumPendingMessageLimit( maxPendingMessageCount );
-    consumerInfo->setBrowser( browser );
-    consumerInfo->setDispatchAsync( dispatchAsync );
-    consumerInfo->setNoLocal( noLocal );
-
-    // Initialize Consumer Data
-    this->session = session;
-    this->consumerInfo = consumerInfo;
-    this->internal->lastDeliveredSequenceId = -1;
-    this->internal->synchronizationRegistered = false;
-    this->internal->additionalWindowSize = 0;
-    this->internal->deliveredCounter = 0;
-    this->internal->clearDispatchList = false;
-    this->internal->inProgressClearRequiredFlag = false;
-    this->internal->listener = NULL;
-    this->internal->redeliveryDelay = 0;
-    this->internal->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone() );
-    this->internal->scheduler = this->session->getScheduler();
-
-    if( this->session->getConnection()->isMessagePrioritySupported() ) {
-        this->internal->unconsumedMessages.reset( new SimplePriorityMessageDispatchChannel() );
-    } else {
-        this->internal->unconsumedMessages.reset( new FifoMessageDispatchChannel() );
+    if (kernel == NULL) {
+        throw ActiveMQException(__FILE__, __LINE__,
+            "ActiveMQConsumer::ActiveMQConsumer - Constructor called with NULL Kernel");
     }
 
-    if( listener != NULL ) {
-        this->setMessageListener( listener );
-    }
-
-    applyDestinationOptions(this->consumerInfo);
+    this->config = new ActiveMQConsumerData(kernel);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer::~ActiveMQConsumer() throw() {
+ActiveMQConsumer::~ActiveMQConsumer() throw () {
 
     try {
 
-        try{
-            this->close();
-        } catch(...) {}
+        try {
+            this->config->kernel->close();
+        } catch (...) {
+        }
 
-        delete this->internal;
+        delete this->config;
     }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
+    AMQ_CATCH_NOTHROW(ActiveMQException)
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::start() {
 
-    if( this->internal->unconsumedMessages->isClosed() ) {
-        return;
-    }
-
-    this->internal->started.set( true );
-    this->internal->unconsumedMessages->start();
-    this->session->wakeup();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::stop() {
-    this->internal->started.set( false );
-    this->internal->unconsumedMessages->stop();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQConsumer::isClosed() const {
-    return this->internal->unconsumedMessages->isClosed();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::close() {
+    try {
 
-    try{
-        if( !this->isClosed() ) {
-            if( this->session->getTransactionContext() != NULL &&
-                this->session->getTransactionContext()->isInTransaction() ) {
-
-                // TODO - Currently we can do this since the consumer could be
-                // deleted right after the close call so it won't stick around
-                // long enough to clean up the transaction data.  For now we
-                // just have to close badly.
-                //
-                //Pointer<Synchronization> sync( new CloseSynhcronization( this ) );
-                //this->transaction->addSynchronization( sync );
-
-                doClose();
-
-                throw UnsupportedOperationException(
-                    __FILE__, __LINE__,
-                    "The Consumer is still in an Active Transaction, commit it first." );
-
-            } else {
-                doClose();
-            }
+        if (this->config->kernel->isClosed()) {
+            return;
         }
+
+        this->config->kernel->start();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::doClose() {
-
-    try {
-
-        dispose();
-        // Remove at the Broker Side, consumer has been removed from the local
-        // Session and Connection objects so if the remote call to remove throws
-        // it is okay to propagate to the client.
-        Pointer<RemoveInfo> info( new RemoveInfo );
-        info->setObjectId( this->consumerInfo->getConsumerId() );
-        info->setLastDeliveredSequenceId( this->internal->lastDeliveredSequenceId );
-        this->session->oneway( info );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+void ActiveMQConsumer::stop() {
+    this->config->kernel->stop();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::dispose() {
-
-    try{
-        if( !this->isClosed() ) {
-
-            if( !session->isTransacted() ) {
-                deliverAcks();
-            }
-
-            this->internal->started.set( false );
-
-            // Identifies any errors encountered during shutdown.
-            bool haveException = false;
-            ActiveMQException error;
-
-            // Purge all the pending messages
-            try{
-                this->internal->unconsumedMessages->clear();
-            } catch ( ActiveMQException& ex ){
-                if( !haveException ){
-                    ex.setMark( __FILE__, __LINE__ );
-                    error = ex;
-                    haveException = true;
-                }
-            }
-
-            // Stop and Wakeup all sync consumers.
-            this->internal->unconsumedMessages->close();
-
-            if( this->session->isIndividualAcknowledge() ) {
-                // For IndividualAck Mode we need to unlink the ack handler to remove a
-                // cyclic reference to the MessageDispatch that brought the message to us.
-                synchronized( &internal->dispatchedMessages ) {
-                    std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
-                    while( iter->hasNext() ) {
-                        iter->next()->getMessage()->setAckHandler( Pointer<ActiveMQAckHandler>() );
-                    }
-
-                    this->internal->dispatchedMessages.clear();
-                }
-            }
-
-            // Remove this Consumer from the Connections set of Dispatchers
-            this->session->removeConsumer( this->consumerInfo->getConsumerId() );
-
-            // If we encountered an error, propagate it.
-            if( haveException ){
-                error.setMark( __FILE__, __LINE__ );
-                throw error;
-            }
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
+bool ActiveMQConsumer::isClosed() const {
+    return this->config->kernel->isClosed();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-std::string ActiveMQConsumer::getMessageSelector() const {
+void ActiveMQConsumer::close() {
 
     try {
-        // Fetch the Selector
-        return this->consumerInfo->getSelector();
+        this->config->kernel->close();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-decaf::lang::Pointer<MessageDispatch> ActiveMQConsumer::dequeue( long long timeout ) {
+std::string ActiveMQConsumer::getMessageSelector() const {
 
     try {
-
-        this->checkClosed();
-
-        // Calculate the deadline
-        long long deadline = 0;
-        if( timeout > 0 ) {
-            deadline = System::currentTimeMillis() + timeout;
-        }
-
-        // Loop until the time is up or we get a non-expired message
-        while( true ) {
-
-            Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue( timeout );
-            if( dispatch == NULL ) {
-
-                if( timeout > 0 && !this->internal->unconsumedMessages->isClosed() ) {
-                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
-                } else {
-                    if( this->internal->failureError != NULL ) {
-                        throw CMSExceptionSupport::create(*this->internal->failureError);
-                    } else {
-                        return Pointer<MessageDispatch>();
-                    }
-                }
-
-            } else if( dispatch->getMessage() == NULL ) {
-
-                return Pointer<MessageDispatch>();
-
-            } else if( dispatch->getMessage()->isExpired() ) {
-
-                beforeMessageIsConsumed( dispatch );
-                afterMessageIsConsumed( dispatch, true );
-                if( timeout > 0 ) {
-                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
-                }
-
-                continue;
-            }
-
-            // Return the message.
-            return dispatch;
-        }
-
-        return Pointer<MessageDispatch>();
+        return this->config->kernel->getMessageSelector();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -586,65 +127,17 @@ decaf::lang::Pointer<MessageDispatch> Ac
 ////////////////////////////////////////////////////////////////////////////////
 cms::Message* ActiveMQConsumer::receive() {
 
-    try{
-
-        this->checkClosed();
-
-        // Send a request for a new message if needed
-        this->sendPullRequest( 0 );
-
-        // Wait for the next message.
-        Pointer<MessageDispatch> message = dequeue( -1 );
-        if( message == NULL ) {
-            return NULL;
-        }
-
-        // Message pre-processing
-        beforeMessageIsConsumed( message );
-
-        // Need to clone the message because the user is responsible for freeing
-        // its copy of the message.
-        cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
-
-        // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( message, false );
-
-        // Return the cloned message.
-        return clonedMessage;
+    try {
+        return this->config->kernel->receive();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receive( int millisecs ) {
+cms::Message* ActiveMQConsumer::receive(int millisecs) {
 
     try {
-
-        this->checkClosed();
-
-        // Send a request for a new message if needed
-        this->sendPullRequest( millisecs );
-
-        // Wait for the next message.
-        Pointer<MessageDispatch> message = dequeue( millisecs );
-        if( message == NULL ) {
-            return NULL;
-        }
-
-        // Message preprocessing
-        beforeMessageIsConsumed( message );
-
-        // Need to clone the message because the user is responsible for freeing
-        // its copy of the message.
-        cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
-
-        // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( message, false );
-
-        // Return the cloned message.
-        return clonedMessage;
+        return this->config->kernel->receive(millisecs);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -653,751 +146,46 @@ cms::Message* ActiveMQConsumer::receive(
 cms::Message* ActiveMQConsumer::receiveNoWait() {
 
     try {
-
-        this->checkClosed();
-
-        // Send a request for a new message if needed
-        this->sendPullRequest( -1 );
-
-        // Get the next available message, if there is one.
-        Pointer<MessageDispatch> message = dequeue( 0 );
-        if( message == NULL ) {
-            return NULL;
-        }
-
-        // Message preprocessing
-        beforeMessageIsConsumed( message );
-
-        // Need to clone the message because the user is responsible for freeing
-        // its copy of the message.
-        cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
-
-        // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( message, false );
-
-        // Return the cloned message.
-        return clonedMessage;
+        return this->config->kernel->receiveNoWait();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::setMessageListener( cms::MessageListener* listener ) {
-
-    try{
-
-        this->checkClosed();
-
-        if( this->consumerInfo->getPrefetchSize() == 0 && listener != NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "Cannot deliver async when Prefetch is Zero, set Prefecth to at least One.");
-        }
-
-        if( listener != NULL ) {
-
-            // Now that we have a valid message listener,
-            // redispatch all the messages that it missed.
-
-            bool wasStarted = session->isStarted();
-            if( wasStarted ) {
-                session->stop();
-            }
-
-            synchronized( &(this->internal->listenerMutex) ) {
-                this->internal->listener = listener;
-            }
-
-            this->session->redispatch( *(this->internal->unconsumedMessages) );
-
-            if( wasStarted ) {
-                this->session->start();
-            }
-        } else {
-            synchronized( &(this->internal->listenerMutex) ) {
-                this->internal->listener = NULL;
-            }
-        }
-    }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<MessageDispatch>& dispatch ) {
-
-    // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
-    // we set the handler in the message to this object and send it out.
-    if( session->isClientAcknowledge() ) {
-        Pointer<ActiveMQAckHandler> ackHandler( new ClientAckHandler( this->session ) );
-        dispatch->getMessage()->setAckHandler( ackHandler );
-    } else if( session->isIndividualAcknowledge() ) {
-        Pointer<ActiveMQAckHandler> ackHandler( new IndividualAckHandler( this, dispatch ) );
-        dispatch->getMessage()->setAckHandler( ackHandler );
-    }
-
-    this->internal->lastDeliveredSequenceId =
-        dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
-
-    if( !isAutoAcknowledgeBatch() ) {
-
-        // When not in an Auto
-        synchronized( &this->internal->dispatchedMessages ) {
-            this->internal->dispatchedMessages.addFirst( dispatch );
-        }
-
-        if( this->session->isTransacted() ) {
-            ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
-        }
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<MessageDispatch>& message,
-                                               bool messageExpired ) {
-
-    try{
-
-        if( this->internal->unconsumedMessages->isClosed() ) {
-            return;
-        }
-
-        if( messageExpired == true ) {
-            ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
-        }
-
-        if( session->isTransacted() ) {
-            return;
-        } else if( isAutoAcknowledgeEach() ) {
-
-            if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
-
-                synchronized( &this->internal->dispatchedMessages ) {
-                    if( !this->internal->dispatchedMessages.isEmpty() ) {
-                        Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
-                            ActiveMQConstants::ACK_TYPE_CONSUMED );
-
-                        if( ack != NULL ) {
-                            this->internal->dispatchedMessages.clear();
-                            session->oneway( ack );
-                        }
-                    }
-                }
-
-                this->internal->deliveringAcks.set( false );
-            }
-
-        } else if( isAutoAcknowledgeBatch() ) {
-            ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
-        } else if( session->isClientAcknowledge() || session->isIndividualAcknowledge() ) {
-
-            bool messageUnackedByConsumer = false;
-
-            synchronized( &this->internal->dispatchedMessages ) {
-                messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message);
-            }
-
-            if( messageUnackedByConsumer ) {
-                this->ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
-            }
-
-        } else {
-            throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" );
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::deliverAcks() {
-
-    try{
-
-        Pointer<MessageAck> ack;
-
-        if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
-
-            if( isAutoAcknowledgeEach() ) {
-
-                synchronized( &this->internal->dispatchedMessages ) {
-
-                    ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
-
-                    if( ack != NULL ) {
-                        this->internal->dispatchedMessages.clear();
-                    } else {
-                        ack.swap( internal->pendingAck );
-                    }
-                }
-
-            } else if( this->internal->pendingAck != NULL &&
-                       this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
-
-                ack.swap( this->internal->pendingAck );
-            }
-
-            if( ack != NULL ) {
-
-                try{
-                    this->session->oneway( ack );
-                } catch(...) {}
-
-            } else {
-                this->internal->deliveringAcks.set( false );
-            }
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::ackLater( const Pointer<MessageDispatch>& dispatch, int ackType ) {
-
-    // Don't acknowledge now, but we may need to let the broker know the
-    // consumer got the message to expand the pre-fetch window
-    if( session->isTransacted() ) {
-        session->doStartTransaction();
-        if( !this->internal->synchronizationRegistered ) {
-            this->internal->synchronizationRegistered = true;
-
-            Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
-            this->session->getTransactionContext()->addSynchronization( sync );
-        }
-    }
-
-    // The delivered message list is only needed for the recover method
-    // which is only used with client ack.
-    this->internal->deliveredCounter++;
-
-    Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
-    this->internal->pendingAck.reset( new MessageAck() );
-    this->internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
-    this->internal->pendingAck->setAckType( (unsigned char)ackType );
-    this->internal->pendingAck->setDestination( dispatch->getDestination() );
-    this->internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() );
-    this->internal->pendingAck->setMessageCount( internal->deliveredCounter );
-
-    if( oldPendingAck == NULL ) {
-        this->internal->pendingAck->setFirstMessageId( this->internal->pendingAck->getLastMessageId() );
-    } else if ( oldPendingAck->getAckType() == this->internal->pendingAck->getAckType() ) {
-        this->internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
-    } else {
-        // old pending ack being superseded by ack of another type, if is is not a delivered
-        // ack and hence important, send it now so it is not lost.
-        if( oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED ) {
-            session->oneway( oldPendingAck );
-        }
-    }
-
-    if( session->isTransacted() ) {
-        this->internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId() );
-    }
-
-    if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( internal->deliveredCounter - internal->additionalWindowSize ) ) {
-        session->oneway( this->internal->pendingAck );
-        this->internal->pendingAck.reset( NULL );
-        this->internal->deliveredCounter = 0;
-        this->internal->additionalWindowSize = 0;
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Pointer<MessageAck> ActiveMQConsumer::makeAckForAllDeliveredMessages( int type ) {
-
-    synchronized( &this->internal->dispatchedMessages ) {
-
-        if( !this->internal->dispatchedMessages.isEmpty() ) {
-
-            Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
-
-            Pointer<MessageAck> ack( new MessageAck() );
-            ack->setAckType( (unsigned char)type );
-            ack->setConsumerId( dispatched->getConsumerId() );
-            ack->setDestination( dispatched->getDestination() );
-            ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
-            ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
-            ack->setFirstMessageId( this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId() );
-
-            return ack;
-        }
-    }
-
-    return Pointer<MessageAck>();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledge( const Pointer<commands::MessageDispatch>& dispatch ) {
-
-    try{
-
-        this->checkClosed();
-
-        if( this->session->isIndividualAcknowledge() ) {
-
-            Pointer<MessageAck> ack( new MessageAck() );
-            ack->setAckType( ActiveMQConstants::ACK_TYPE_CONSUMED );
-            ack->setConsumerId( this->consumerInfo->getConsumerId() );
-            ack->setDestination( this->consumerInfo->getDestination() );
-            ack->setMessageCount( 1 );
-            ack->setLastMessageId( dispatch->getMessage()->getMessageId() );
-            ack->setFirstMessageId( dispatch->getMessage()->getMessageId() );
-
-            session->oneway( ack );
-
-            synchronized( &this->internal->dispatchedMessages ) {
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
-                while( iter->hasNext() ) {
-                    if( iter->next() == dispatch ) {
-                        iter->remove();
-                        break;
-                    }
-                }
-            }
-
-        } else {
-            throw IllegalStateException(
-                __FILE__, __LINE__,
-                "Session is not in IndividualAcknowledge mode." );
-        }
-    }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::acknowledge() {
-
-    try{
-
-        synchronized( &this->internal->dispatchedMessages ) {
-
-            // Acknowledge all messages so far.
-            Pointer<MessageAck> ack =
-                makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
-
-            if( ack == NULL ) {
-                return;
-            }
-
-            if( session->isTransacted() ) {
-                session->doStartTransaction();
-                ack->setTransactionId( session->getTransactionContext()->getTransactionId() );
-            }
-
-            session->oneway( ack );
-            this->internal->pendingAck.reset( NULL );
-
-            // Adjust the counters
-            this->internal->deliveredCounter =
-                Math::max( 0, this->internal->deliveredCounter - (int)this->internal->dispatchedMessages.size());
-            this->internal->additionalWindowSize =
-                Math::max(0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size());
-
-            if( !session->isTransacted() ) {
-                this->internal->dispatchedMessages.clear();
-            }
-        }
-    }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::commit() {
-
-    synchronized( &(this->internal->dispatchedMessages) ) {
-        this->internal->dispatchedMessages.clear();
-    }
-    this->internal->redeliveryDelay = 0;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::rollback() {
-
-    synchronized( this->internal->unconsumedMessages.get() ) {
-
-        synchronized( &this->internal->dispatchedMessages ) {
-            if( this->internal->dispatchedMessages.isEmpty() ) {
-                return;
-            }
-
-            // Only increase the redelivery delay after the first redelivery..
-            Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
-            const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
-            if( currentRedeliveryCount > 0 ) {
-                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay( internal->redeliveryDelay );
-            } else {
-                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
-            }
-
-            Pointer<MessageId> firstMsgId =
-                this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
-
-            std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( internal->dispatchedMessages.iterator() );
-
-            while( iter->hasNext() ) {
-                Pointer<Message> message = iter->next()->getMessage();
-                message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 );
-            }
-
-            if( this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
-                lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries() ) {
-
-                // We need to NACK the messages so that they get sent to the DLQ.
-                // Acknowledge the last message.
-                Pointer<MessageAck> ack( new MessageAck() );
-                ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
-                ack->setConsumerId( this->consumerInfo->getConsumerId() );
-                ack->setDestination( lastMsg->getDestination() );
-                ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
-                ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
-                ack->setFirstMessageId( firstMsgId );
-
-                session->oneway( ack );
-                // Adjust the window size.
-                this->internal->additionalWindowSize =
-                    Math::max( 0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size() );
-                this->internal->redeliveryDelay = 0;
-
-            } else {
-
-                // only redelivery_ack after first delivery
-                if( currentRedeliveryCount > 0 ) {
-                    Pointer<MessageAck> ack( new MessageAck() );
-                    ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
-                    ack->setConsumerId( this->consumerInfo->getConsumerId() );
-                    ack->setDestination( lastMsg->getDestination() );
-                    ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
-                    ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
-                    ack->setFirstMessageId( firstMsgId );
-
-                    session->oneway( ack );
-                }
-
-                // stop the delivery of messages.
-                this->internal->unconsumedMessages->stop();
-
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
-
-                while( iter->hasNext() ) {
-                    this->internal->unconsumedMessages->enqueueFirst( iter->next() );
-                }
-
-                if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed() ) {
-                    // TODO - Can't do this until we can control object lifetime.
-                    // Start up the delivery again a little later.
-                    // this->internal->scheduler->executeAfterDelay(
-                    //    new StartConsumerTask(this), internal->redeliveryDelay);
-                    start();
-                } else {
-                    start();
-                }
-
-            }
-            this->internal->deliveredCounter -= (int)internal->dispatchedMessages.size();
-            this->internal->dispatchedMessages.clear();
-        }
-    }
-
-    if( this->internal->listener != NULL ) {
-        session->redispatch( *this->internal->unconsumedMessages );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::dispatch( const Pointer<MessageDispatch>& dispatch ) {
-
-    try {
-
-        synchronized( this->internal->unconsumedMessages.get() ) {
-
-            clearMessagesInProgress();
-            if( this->internal->clearDispatchList ) {
-                // we are reconnecting so lets flush the in progress
-                // messages
-                this->internal->clearDispatchList = false;
-                this->internal->unconsumedMessages->clear();
-            }
-
-            if( !this->internal->unconsumedMessages->isClosed() ) {
-
-                // Don't dispatch expired messages, ack it and then destroy it
-                if( dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired() ) {
-                    this->ackLater( dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED );
-
-                    // stop now, don't queue
-                    return;
-                }
-
-                synchronized( &this->internal->listenerMutex ) {
-                    // If we have a listener, send the message.
-                    if( this->internal->listener != NULL && internal->unconsumedMessages->isRunning() ) {
-
-                        // Preprocessing.
-                        beforeMessageIsConsumed( dispatch );
-
-                        // Notify the listener
-                        this->internal->listener->onMessage(
-                            dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
-
-                        // Postprocessing
-                        afterMessageIsConsumed( dispatch, false );
-
-                    } else {
-
-                        // No listener, add it to the unconsumed messages list
-                        this->internal->unconsumedMessages->enqueue( dispatch );
-                    }
-                }
-            }
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::sendPullRequest( long long timeout ) {
+void ActiveMQConsumer::setMessageListener(cms::MessageListener* listener) {
 
     try {
-
-        this->checkClosed();
-
-        // There are still local message, consume them first.
-        if( !this->internal->unconsumedMessages->isEmpty() ) {
-            return;
-        }
-
-        if( this->consumerInfo->getPrefetchSize() == 0 ) {
-
-            Pointer<MessagePull> messagePull( new MessagePull() );
-            messagePull->setConsumerId( this->consumerInfo->getConsumerId() );
-            messagePull->setDestination( this->consumerInfo->getDestination() );
-            messagePull->setTimeout( timeout );
-
-            this->session->oneway( messagePull );
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::checkClosed() const {
-    if( this->isClosed() ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQConsumer - Consumer Already Closed" );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQConsumer::iterate() {
-
-    synchronized( &this->internal->listenerMutex ) {
-
-        if( this->internal->listener != NULL ) {
-
-            Pointer<MessageDispatch> dispatch = internal->unconsumedMessages->dequeueNoWait();
-            if( dispatch != NULL ) {
-
-                try {
-                    beforeMessageIsConsumed( dispatch );
-                    this->internal->listener->onMessage(
-                        dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
-                    afterMessageIsConsumed( dispatch, false );
-                } catch( ActiveMQException& ex ) {
-                    this->session->fire( ex );
-                }
-
-                return true;
-            }
-        }
-    }
-
-    return false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::inProgressClearRequired() {
-
-    this->internal->inProgressClearRequiredFlag = true;
-    // Clears dispatched messages async to avoid lock contention with inprogress acks.
-    this->internal->clearDispatchList = true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::clearMessagesInProgress() {
-    if( this->internal->inProgressClearRequiredFlag ) {
-        synchronized( this->internal->unconsumedMessages.get() ) {
-            if( this->internal->inProgressClearRequiredFlag ) {
-
-                // TODO - Rollback duplicates.
-
-                // allow dispatch on this connection to resume
-                this->session->getConnection()->setTransportInterruptionProcessingComplete();
-                this->internal->inProgressClearRequiredFlag = false;
-            }
-        }
+        this->config->kernel->setMessageListener(listener);
     }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQConsumer::isAutoAcknowledgeEach() const {
-    return this->session->isAutoAcknowledge() ||
-           ( this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue() );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQConsumer::isAutoAcknowledgeBatch() const {
-    return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue();
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int ActiveMQConsumer::getMessageAvailableCount() const {
-    return this->internal->unconsumedMessages->size();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::applyDestinationOptions( const Pointer<ConsumerInfo>& info ) {
-
-    decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
-
-    // Get any options specified in the destination and apply them to the
-    // ConsumerInfo object.
-    const ActiveMQProperties& options = amqDestination->getOptions();
-
-    std::string noLocalStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_NOLOCAL );
-    if( options.hasProperty( noLocalStr ) ) {
-        info->setNoLocal( Boolean::parseBoolean(
-            options.getProperty( noLocalStr ) ) );
-    }
-
-    std::string selectorStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_SELECTOR );
-    if( options.hasProperty( selectorStr ) ) {
-        info->setSelector( options.getProperty( selectorStr ) );
-    }
-
-    std::string priorityStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PRIORITY );
-    if( options.hasProperty( priorityStr ) ) {
-        info->setPriority( (unsigned char)Integer::parseInt( options.getProperty( priorityStr ) ) );
-    }
-
-    std::string dispatchAsyncStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_DISPATCHASYNC );
-    if( options.hasProperty( dispatchAsyncStr ) ) {
-        info->setDispatchAsync(
-            Boolean::parseBoolean( options.getProperty( dispatchAsyncStr ) ) );
-    }
-
-    std::string exclusiveStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_EXCLUSIVE );
-    if( options.hasProperty( exclusiveStr ) ) {
-        info->setExclusive(
-            Boolean::parseBoolean( options.getProperty( exclusiveStr ) ) );
-    }
-
-    std::string maxPendingMsgLimitStr =
-        core::ActiveMQConstants::toString(
-            core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
-
-    if( options.hasProperty( maxPendingMsgLimitStr ) ) {
-        info->setMaximumPendingMessageLimit(
-            Integer::parseInt(
-                options.getProperty( maxPendingMsgLimitStr ) ) );
-    }
-
-    std::string prefetchSizeStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
-    if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr )  ) {
-        info->setPrefetchSize(
-            Integer::parseInt( options.getProperty( prefetchSizeStr, "1000" ) ) );
-    }
-
-    std::string retroactiveStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_RETROACTIVE );
-    if( options.hasProperty( retroactiveStr ) ) {
-        info->setRetroactive(
-            Boolean::parseBoolean( options.getProperty( retroactiveStr ) ) );
-    }
-
-    std::string networkSubscriptionStr = "consumer.networkSubscription";
-
-    if( options.hasProperty( networkSubscriptionStr ) ) {
-        info->setNetworkSubscription(
-            Boolean::parseBoolean(
-                options.getProperty( networkSubscriptionStr ) ) );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
-    if( policy != NULL ) {
-        this->internal->redeliveryPolicy.reset( policy );
-    }
+    return this->config->kernel->getMessageAvailableCount();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 RedeliveryPolicy* ActiveMQConsumer::getRedeliveryPolicy() const {
-    return this->internal->redeliveryPolicy.get();
+    return this->config->kernel->getRedeliveryPolicy();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::MessageListener* ActiveMQConsumer::getMessageListener() const {
-    return this->internal->listener;
+    return this->config->kernel->getMessageListener();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 const Pointer<commands::ConsumerInfo>& ActiveMQConsumer::getConsumerInfo() const {
-    this->checkClosed();
-    return this->consumerInfo;
+    return this->config->kernel->getConsumerInfo();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 const Pointer<commands::ConsumerId>& ActiveMQConsumer::getConsumerId() const {
-    this->checkClosed();
-    return this->consumerInfo->getConsumerId();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQConsumer::isSynchronizationRegistered() const {
-    return this->internal->synchronizationRegistered;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::setSynchronizationRegistered( bool value ) {
-    this->internal->synchronizationRegistered = value;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-long long ActiveMQConsumer::getLastDeliveredSequenceId() const {
-    return this->internal->lastDeliveredSequenceId;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::setLastDeliveredSequenceId( long long value ) {
-    this->internal->lastDeliveredSequenceId = value;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::setFailureError( decaf::lang::Exception* error ) {
-    if( error != NULL ) {
-        this->internal->failureError.reset( error->clone() );
-    }
+    return this->config->kernel->getConsumerId();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 decaf::lang::Exception* ActiveMQConsumer::getFailureError() const {
-    if( this->internal->failureError == NULL ) {
-        return NULL;
-    }
-
-    return this->internal->failureError.get();
+    return this->config->kernel->getFailureError();
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Mar 26 21:11:12 2012
@@ -23,46 +23,25 @@
 #include <cms/CMSException.h>
 
 #include <activemq/util/Config.h>
+#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/ConsumerInfo.h>
-#include <activemq/commands/MessageAck.h>
-#include <activemq/commands/MessageDispatch.h>
-#include <activemq/core/Dispatcher.h>
 #include <activemq/core/RedeliveryPolicy.h>
-#include <activemq/core/MessageDispatchChannel.h>
 
-#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/concurrent/Mutex.h>
 
 namespace activemq{
 namespace core{
 
     using decaf::lang::Pointer;
-    using decaf::util::concurrent::atomic::AtomicBoolean;
 
     class ActiveMQSession;
-    class ActiveMQConsumerMembers;
+    class ActiveMQConsumerData;
 
-    class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
-                                        public Dispatcher
-    {
+    class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer {
     private:
 
-        /**
-         * Internal Class that holds Members of this class, allows for changes without API breakage.
-         */
-        ActiveMQConsumerMembers* internal;
-
-        /**
-         * The ActiveMQSession that owns this class instance.
-         */
-        ActiveMQSession* session;
-
-        /**
-         * The ConsumerInfo object for this class instance.
-         */
-        Pointer<commands::ConsumerInfo> consumerInfo;
+        ActiveMQConsumerData* config;
 
     private:
 
@@ -72,23 +51,17 @@ namespace core{
     public:
 
         /**
-         * Constructor
+         * Create a new ActiveMQConsumer that contains the pointer to the Kernel
+         * that implement the real MessageConsumer functionality.
+         *
+         * @param ActiveMQConsumerKernel
+         *      This Consumer's functionality kernel.
          */
-        ActiveMQConsumer( ActiveMQSession* session,
-                          const Pointer<commands::ConsumerId>& id,
-                          const Pointer<commands::ActiveMQDestination>& destination,
-                          const std::string& name,
-                          const std::string& selector,
-                          int prefetch,
-                          int maxPendingMessageCount,
-                          bool noLocal,
-                          bool browser,
-                          bool dispatchAsync,
-                          cms::MessageListener* listener );
+        ActiveMQConsumer(const Pointer<activemq::core::kernels::ActiveMQConsumerKernel>& kernel);
 
         virtual ~ActiveMQConsumer() throw();
 
-    public:  // Interface Implementation
+    public:  // Interface Implementation for cms::MessageConsumer
 
         virtual void start();
 
@@ -98,58 +71,17 @@ namespace core{
 
         virtual cms::Message* receive();
 
-        virtual cms::Message* receive( int millisecs );
+        virtual cms::Message* receive(int millisecs);
 
         virtual cms::Message* receiveNoWait();
 
-        virtual void setMessageListener( cms::MessageListener* listener );
+        virtual void setMessageListener(cms::MessageListener* listener);
 
         virtual cms::MessageListener* getMessageListener() const;
 
         virtual std::string getMessageSelector() const;
 
-        virtual void acknowledge( const Pointer<commands::MessageDispatch>& dispatch );
-
-    public:  // Dispatcher Methods
-
-        virtual void dispatch( const Pointer<MessageDispatch>& message );
-
-    public:  // ActiveMQConsumer Methods
-
-        /**
-         * Method called to acknowledge all messages that have been received so far.
-         *
-         * @throw CMSException if an error occurs while ack'ing the message.
-         */
-        void acknowledge();
-
-        /**
-         * Called to Commit the current set of messages in this Transaction
-         *
-         * @throw ActiveMQException if an error occurs while performing the operation.
-         */
-        void commit();
-
-        /**
-         * Called to Roll back the current set of messages in this Transaction
-         *
-         * @throw ActiveMQException if an error occurs while performing the operation.
-         */
-        void rollback();
-
-        /**
-         * Performs the actual close operation on this consumer
-         *
-         * @throw ActiveMQException if an error occurs while performing the operation.
-         */
-        void doClose();
-
-        /**
-         * Cleans up this objects internal resources.
-         *
-         * @throw ActiveMQException if an error occurs while performing the operation.
-         */
-        void dispose();
+    public:
 
         /**
          * Get the Consumer information for this consumer
@@ -169,58 +101,6 @@ namespace core{
         bool isClosed() const;
 
         /**
-         * Has this Consumer Transaction Synchronization been added to the transaction
-         * @return true if the synchronization has been added.
-         */
-        bool isSynchronizationRegistered() const ;
-
-        /**
-         * Sets the Synchronization Registered state of this consumer.
-         * @param value - true if registered false otherwise.
-         */
-        void setSynchronizationRegistered( bool value );
-
-        /**
-         * Deliver any pending messages to the registered MessageListener if there
-         * is one, return true if not all dispatched, or false if no listener or all
-         * pending messages have been dispatched.
-         */
-        bool iterate();
-
-        /**
-         * Forces this consumer to send all pending acks to the broker.
-         *
-         * @throw ActiveMQException if an error occurs while performing the operation.
-         */
-        void deliverAcks();
-
-        /**
-         * Called on a Failover to clear any pending messages.
-         */
-        void clearMessagesInProgress();
-
-        /**
-         * Signals that a Failure occurred and that anything in-progress in the
-         * consumer should be cleared.
-         */
-        void inProgressClearRequired();
-
-        /**
-         * Gets the currently set Last Delivered Sequence Id
-         *
-         * @returns long long containing the sequence id of the last delivered Message.
-         */
-        long long getLastDeliveredSequenceId() const;
-
-        /**
-         * Sets the value of the Last Delivered Sequence Id
-         *
-         * @param value
-         *      The new value to assign to the Last Delivered Sequence Id property.
-         */
-        void setLastDeliveredSequenceId( long long value );
-
-        /**
          * @returns the number of Message's this consumer is waiting to Dispatch.
          */
         int getMessageAvailableCount() const;
@@ -234,7 +114,7 @@ namespace core{
          * @param policy
          *      Pointer to a Redelivery Policy object that his Consumer will use.
          */
-        void setRedeliveryPolicy( RedeliveryPolicy* policy );
+        void setRedeliveryPolicy(RedeliveryPolicy* policy);
 
         /**
          * Gets a pointer to this Consumer's Redelivery Policy object, the Consumer
@@ -245,14 +125,6 @@ namespace core{
         RedeliveryPolicy* getRedeliveryPolicy() const;
 
         /**
-         * Sets the Exception that has caused this Consumer to be in a failed state.
-         *
-         * @param error
-         *      The error that is to be thrown when a Receive call is made.
-         */
-        void setFailureError( decaf::lang::Exception* error );
-
-        /**
          * Gets the error that caused this Consumer to be in a Failed state, or NULL if
          * there is no Error.
          *
@@ -260,71 +132,6 @@ namespace core{
          */
         decaf::lang::Exception* getFailureError() const;
 
-    protected:
-
-        /**
-         * Used by synchronous receive methods to wait for messages to come in.
-         * @param timeout - The maximum number of milliseconds to wait before
-         * returning.
-         *
-         * If -1, it will block until a messages is received or this consumer
-         * is closed.
-         * If 0, will not block at all.  If > 0, will wait at a maximum the
-         * specified number of milliseconds before returning.
-         * @return the message, if received within the allotted time.
-         * Otherwise NULL.
-         *
-         * @throws InvalidStateException if this consumer is closed upon
-         *         entering this method.
-         */
-        Pointer<MessageDispatch> dequeue( long long timeout );
-
-        /**
-         * Pre-consume processing
-         * @param dispatch - the message being consumed.
-         */
-        void beforeMessageIsConsumed(
-            const Pointer<commands::MessageDispatch>& dispatch );
-
-        /**
-         * Post-consume processing
-         * @param dispatch - the consumed message
-         * @param messageExpired - flag indicating if the message has expired.
-         */
-        void afterMessageIsConsumed(
-            const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired );
-
-    private:
-
-        // Using options from the Destination URI override any settings that are
-        // defined for this consumer.
-        void applyDestinationOptions( const Pointer<commands::ConsumerInfo>& info );
-
-        // If supported sends a message pull request to the service provider asking
-        // for the delivery of a new message.  This is used in the case where the
-        // service provider has been configured with a zero prefetch or is only
-        // capable of delivering messages on a pull basis.  No request is made if
-        // there are already messages in the unconsumed queue since there's no need
-        // for a server round-trip in that instance.
-        void sendPullRequest( long long timeout );
-
-        // Checks for the closed state and throws if so.
-        void checkClosed() const;
-
-        // Sends an ack as needed in order to keep them coming in if the current
-        // ack mode allows the consumer to receive up to the prefetch limit before
-        // an real ack is sent.
-        void ackLater( const Pointer<commands::MessageDispatch>& message, int ackType );
-
-        // Create an Ack Message that acks all messages that have been delivered so far.
-        Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
-
-        // Should Acks be sent on each dispatched message
-        bool isAutoAcknowledgeEach() const;
-
-        // Can Acks be batched for less network overhead.
-        bool isAutoAcknowledgeBatch() const;
-
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Mon Mar 26 21:11:12 2012
@@ -17,73 +17,32 @@
 #include "ActiveMQProducer.h"
 
 #include <cms/Message.h>
-#include <activemq/core/ActiveMQSession.h>
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/commands/RemoveInfo.h>
 #include <activemq/util/CMSExceptionSupport.h>
-#include <activemq/util/ActiveMQProperties.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
-#include <decaf/lang/exceptions/InvalidStateException.h>
-#include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/lang/System.h>
-#include <decaf/lang/Boolean.h>
 
 using namespace std;
 using namespace activemq;
 using namespace activemq::util;
 using namespace activemq::core;
-using namespace activemq::commands;
+using namespace activemq::core::kernels;
 using namespace activemq::exceptions;
-using namespace decaf::util;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQProducer::ActiveMQProducer( ActiveMQSession* session,
-                                    const Pointer<commands::ProducerId>& producerId,
-                                    const Pointer<ActiveMQDestination>& destination,
-                                    long long sendTimeout ) : disableTimestamps(false),
-                                                              disableMessageId(false),
-                                                              defaultDeliveryMode(cms::Message::DEFAULT_DELIVERY_MODE),
-                                                              defaultPriority(cms::Message::DEFAULT_MSG_PRIORITY),
-                                                              defaultTimeToLive(cms::Message::DEFAULT_TIME_TO_LIVE),
-                                                              sendTimeout(sendTimeout),
-                                                              session(session),
-                                                              producerInfo(),
-                                                              closed(false),
-                                                              memoryUsage(),
-                                                              destination() {
+ActiveMQProducer::ActiveMQProducer(Pointer<ActiveMQProducerKernel> kernel) : kernel(kernel) {
 
-    if( session == NULL || producerId == NULL ) {
+    if (kernel == NULL) {
         throw ActiveMQException(
             __FILE__, __LINE__,
-            "ActiveMQProducer::ActiveMQProducer - Init with NULL Session" );
+            "ActiveMQProducer::ActiveMQProducer - Initialized with NULL Kernel");
     }
-
-    this->producerInfo.reset( new ProducerInfo() );
-
-    this->producerInfo->setProducerId( producerId );
-    this->producerInfo->setDestination( destination );
-    this->producerInfo->setWindowSize( session->getConnection()->getProducerWindowSize() );
-
-    // Get any options specified in the destination and apply them to the
-    // ProducerInfo object.
-    if( destination != NULL ) {
-        const ActiveMQProperties& options = destination->getOptions();
-        this->producerInfo->setDispatchAsync( Boolean::parseBoolean(
-            options.getProperty( "producer.dispatchAsync", "false" )) );
-
-        this->destination = destination.dynamicCast<cms::Destination>();
-    }
-
-    // TODO - Check for need of MemoryUsage if there's a producer Windows size
-    //        and the Protocol version is greater than 3.
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQProducer::~ActiveMQProducer() throw() {
+ActiveMQProducer::~ActiveMQProducer() {
     try {
-        close();
+        this->kernel->close();
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -93,145 +52,44 @@ ActiveMQProducer::~ActiveMQProducer() th
 void ActiveMQProducer::close() {
 
     try{
-
-        if( !this->isClosed() ) {
-
-            dispose();
-
-            // Remove at the Broker Side, if this fails the producer has already
-            // been removed from the session and connection objects so its safe
-            // for an exception to be thrown.
-            Pointer<RemoveInfo> info( new RemoveInfo );
-            info->setObjectId( this->producerInfo->getProducerId() );
-            this->session->oneway( info );
-        }
+        this->kernel->close();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::dispose() {
-
-    if( !this->isClosed() ) {
-        this->session->removeProducer( this );
-        this->closed = true;
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::send( cms::Message* message ) {
+void ActiveMQProducer::send(cms::Message* message) {
 
     try {
-
-        this->checkClosed();
-
-        this->send( this->destination.get(), message );
+        this->kernel->send(message);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::send( cms::Message* message, int deliveryMode, int priority, long long timeToLive ) {
+void ActiveMQProducer::send(cms::Message* message, int deliveryMode, int priority, long long timeToLive) {
 
     try {
-
-        this->checkClosed();
-
-        this->send( this->destination.get(), message, deliveryMode, priority, timeToLive );
+        this->kernel->send(message, deliveryMode, priority, timeToLive);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::send( const cms::Destination* destination, cms::Message* message ) {
+void ActiveMQProducer::send(const cms::Destination* destination, cms::Message* message) {
 
     try {
-
-        this->checkClosed();
-
-        this->send( destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive );
+        this->kernel->send(destination, message);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::send( const cms::Destination* destination, cms::Message* message,
-                             int deliveryMode, int priority, long long timeToLive ) {
+void ActiveMQProducer::send(const cms::Destination* destination, cms::Message* message,
+                            int deliveryMode, int priority, long long timeToLive) {
 
     try {
-
-        this->checkClosed();
-
-        if( destination == NULL ) {
-
-            if( this->producerInfo->getDestination() == NULL ) {
-                throw cms::UnsupportedOperationException( "A destination must be specified.", NULL );
-            }
-
-            throw cms::InvalidDestinationException( "Don't understand null destinations", NULL );
-        }
-
-        const cms::Destination* dest;
-        if( destination == dynamic_cast<cms::Destination*>( this->producerInfo->getDestination().get() ) ) {
-            dest = destination;
-        } else if( this->producerInfo->getDestination() == NULL ) {
-
-            // TODO - We should apply a Transform so ensure the user hasn't create some
-            //        external cms::Destination implementation.
-            dest = destination;
-        } else {
-            throw cms::UnsupportedOperationException( string( "This producer can only send messages to: " ) +
-                                                              this->producerInfo->getDestination()->getPhysicalName(), NULL );
-        }
-
-        if( dest == NULL ) {
-            throw cms::CMSException( "No destination specified", NULL );
-        }
-
-        // configure the message
-        message->setCMSDestination( dest );
-        message->setCMSDeliveryMode( deliveryMode );
-        message->setCMSPriority( priority );
-
-        long long expiration = 0LL;
-
-        if( !disableTimestamps ) {
-
-            long long timeStamp = System::currentTimeMillis();
-            message->setCMSTimestamp( timeStamp );
-            if( timeToLive > 0LL ) {
-                expiration = timeToLive + timeStamp;
-            }
-        }
-
-        message->setCMSExpiration( expiration );
-
-        // Delegate send to the session so that it can choose how to
-        // send the message.
-        this->session->send( message, this, this->memoryUsage.get() );
+        this->kernel->send(destination, message, deliveryMode, priority, timeToLive);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::onProducerAck( const commands::ProducerAck& ack ) {
-
-    try{
-
-        if( this->memoryUsage.get() != NULL ) {
-            this->memoryUsage->decreaseUsage( ack.getSize() );
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQProducer::checkClosed() const {
-    if( closed ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQProducer - Producer Already Closed" );
-    }
-}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Mon Mar 26 21:11:12 2012
@@ -23,12 +23,8 @@
 #include <cms/DeliveryMode.h>
 
 #include <activemq/util/Config.h>
-#include <activemq/util/MemoryUsage.h>
 #include <activemq/commands/ProducerInfo.h>
-#include <activemq/commands/ProducerAck.h>
-#include <activemq/exceptions/ActiveMQException.h>
-
-#include <memory>
+#include <activemq/core/kernels/ActiveMQProducerKernel.h>
 
 namespace activemq{
 namespace core{
@@ -40,85 +36,45 @@ namespace core{
     class AMQCPP_API ActiveMQProducer : public cms::MessageProducer {
     private:
 
-        // Disable sending timestamps
-        bool disableTimestamps;
-
-        // Disable adding a Message Id
-        bool disableMessageId;
-
-        // The default delivery Mode of this Producer
-        int defaultDeliveryMode;
-
-        // The default priority Level to send at
-        int defaultPriority;
-
-        // The default time to live value for messages in milliseconds
-        long long defaultTimeToLive;
-
-        // The default Send Timeout for this Producer.
-        long long sendTimeout;
-
-        // Session that this producer sends to.
-        ActiveMQSession* session;
-
-        // This Producers protocol specific info object
-        Pointer<commands::ProducerInfo> producerInfo;
-
-        // Boolean that indicates if the consumer has been closed
-        bool closed;
-
-        // Memory Usage Class, created only if the Producer is tracking its usage.
-        std::auto_ptr<util::MemoryUsage> memoryUsage;
-
-        // The Destination assigned at creation, NULL if not assigned.
-        Pointer<cms::Destination> destination;
+        Pointer<activemq::core::kernels::ActiveMQProducerKernel> kernel;
 
     private:
 
-        ActiveMQProducer( const ActiveMQProducer& );
-        ActiveMQProducer& operator= ( const ActiveMQProducer& );
+        ActiveMQProducer(const ActiveMQProducer&);
+        ActiveMQProducer& operator=(const ActiveMQProducer&);
 
     public:
 
         /**
-         * Constructor, creates an instance of an ActiveMQProducer
+         * Constructor, creates an instance of an ActiveMQProducer to wrap the
+         * provided ActiveMQProducerKernel.
          *
-         * @param session
-         *        The Session which is the parent of this Producer.
-         * @param producerId
-         *        Pointer to a ProducerId object which identifies this producer.
-         * @param destination
-         *        The assigned Destination this Producer sends to, or null if not set.
-         *        The Producer does not own the Pointer passed.
-         * @param sendTimeout
-         *        The configured send timeout for this Producer.
-         */
-        ActiveMQProducer( ActiveMQSession* session,
-                          const Pointer<commands::ProducerId>& producerId,
-                          const Pointer<commands::ActiveMQDestination>& destination,
-                          long long sendTimeout );
+         * @param kernel
+         *        The Producer kernel pointer that implements the producers functionality.
+         */
+        ActiveMQProducer(Pointer<activemq::core::kernels::ActiveMQProducerKernel> kernel);
 
-        virtual ~ActiveMQProducer() throw();
+        virtual ~ActiveMQProducer();
 
     public:  // cms::MessageProducer methods.
 
         virtual void close();
 
-        virtual void send( cms::Message* message );
+        virtual void send(cms::Message* message);
 
-        virtual void send( cms::Message* message, int deliveryMode, int priority, long long timeToLive );
+        virtual void send(cms::Message* message, int deliveryMode, int priority, long long timeToLive);
 
-        virtual void send( const cms::Destination* destination, cms::Message* message );
+        virtual void send(const cms::Destination* destination, cms::Message* message);
 
-        virtual void send( const cms::Destination* destination, cms::Message* message,
-                           int deliveryMode, int priority, long long timeToLive );
+        virtual void send(const cms::Destination* destination, cms::Message* message,
+                          int deliveryMode, int priority, long long timeToLive);
 
         /**
          * Sets the delivery mode for this Producer
          * @param mode - The DeliveryMode to use for Message sends.
          */
-        virtual void setDeliveryMode( int mode ) {
-            this->defaultDeliveryMode = mode;
+        virtual void setDeliveryMode(int mode) {
+            this->kernel->setDeliveryMode(mode);
         }
 
         /**
@@ -126,15 +82,15 @@ namespace core{
          * @return The DeliveryMode
          */
         virtual int getDeliveryMode() const {
-            return this->defaultDeliveryMode;
+            return this->kernel->getDeliveryMode();
         }
 
         /**
          * Sets if Message Ids are disabled for this Producer
          * @param value - boolean indicating enable / disable (true / false)
          */
-        virtual void setDisableMessageID( bool value ) {
-            this->disableMessageId = value;
+        virtual void setDisableMessageID(bool value) {
+            this->kernel->setDisableMessageID(value);
         }
 
         /**
@@ -142,15 +98,15 @@ namespace core{
          * @return a boolean indicating state enable / disable (true / false) for MessageIds.
          */
         virtual bool getDisableMessageID() const {
-            return this->disableMessageId;
+            return this->kernel->getDisableMessageID();
         }
 
         /**
          * Sets if Message Time Stamps are disabled for this Producer
          * @param value - boolean indicating enable / disable (true / false)
          */
-        virtual void setDisableMessageTimeStamp( bool value ) {
-            this->disableTimestamps = value;
+        virtual void setDisableMessageTimeStamp(bool value) {
+            this->kernel->setDisableMessageTimeStamp(value);
         }
 
         /**
@@ -158,15 +114,15 @@ namespace core{
          * @returns boolean indicating state of enable / disable (true / false)
          */
         virtual bool getDisableMessageTimeStamp() const {
-            return this->disableTimestamps;
+            return this->kernel->getDisableMessageTimeStamp();
         }
 
         /**
          * Sets the Priority that this Producers sends messages at
          * @param priority int value for Priority level
          */
-        virtual void setPriority( int priority ) {
-            this->defaultPriority = priority;
+        virtual void setPriority(int priority) {
+            this->kernel->setPriority(priority);
         }
 
         /**
@@ -174,15 +130,15 @@ namespace core{
          * @return int based priority level
          */
         virtual int getPriority() const {
-            return this->defaultPriority;
+            return this->kernel->getPriority();
         }
 
         /**
          * Sets the Time to Live that this Producers sends messages with
          * @param time The new default time to live value in milliseconds.
          */
-        virtual void setTimeToLive( long long time ) {
-            this->defaultTimeToLive = time;
+        virtual void setTimeToLive(long long time) {
+            this->kernel->setTimeToLive(time);
         }
 
         /**
@@ -190,15 +146,15 @@ namespace core{
          * @return The default time to live value in milliseconds.
          */
         virtual long long getTimeToLive() const {
-            return this->defaultTimeToLive;
+            return this->kernel->getTimeToLive();
         }
 
         /**
          * Sets the Send Timeout that this Producers sends messages with
          * @param time The new default send timeout value in milliseconds.
          */
-        virtual void setSendTimeout( long long time ) {
-            this->sendTimeout = time;
+        virtual void setSendTimeout(long long time) {
+            this->kernel->setSendTimeout(time);
         }
 
         /**
@@ -206,7 +162,7 @@ namespace core{
          * @return The default send timeout value in milliseconds.
          */
         virtual long long getSendTimeout() const {
-            return this->sendTimeout;
+            return this->kernel->getSendTimeout();
         }
 
     public:
@@ -215,7 +171,7 @@ namespace core{
          * @returns true if this Producer has been closed.
          */
         bool isClosed() const {
-            return this->closed;
+            return this->kernel->isClosed();
         }
 
         /**
@@ -223,8 +179,7 @@ namespace core{
          * @return ProducerInfo Reference
          */
         const Pointer<commands::ProducerInfo>& getProducerInfo() const {
-            this->checkClosed();
-            return this->producerInfo;
+            return this->kernel->getProducerInfo();
         }
 
         /**
@@ -232,29 +187,8 @@ namespace core{
          * @return ProducerId Reference
          */
         const Pointer<commands::ProducerId>& getProducerId() const {
-            this->checkClosed();
-            return this->producerInfo->getProducerId();
+            return this->kernel->getProducerId();
         }
-
-        /**
-         * Handles the work of Processing a ProducerAck Command from the Broker.
-         * @param ack - The ProducerAck message received from the Broker.
-         */
-        virtual void onProducerAck( const commands::ProducerAck& ack );
-
-        /**
-         * Performs Producer object cleanup but doesn't attempt to send the Remove command
-         * to the broker.  Called when the parent resource if closed first to avoid the message
-         * send and avoid any exceptions that might be thrown from an attempt to send a remove
-         * command to a failed transport.
-         */
-        void dispose();
-
-   private:
-
-       // Checks for the closed state and throws if so.
-       void checkClosed() const;
-
    };
 
 }}



Mime
View raw message