activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1305601 [4/5] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/cmsutil/ main/activemq/core/ main/activemq/core/kernels/ main/cms/ test/
Date Mon, 26 Mar 2012 21:11:14 GMT
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1305601&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Mon Mar 26 21:11:12 2012
@@ -0,0 +1,1366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQConsumerKernel.h"
+
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/InvalidStateException.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/lang/Math.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
+#include <activemq/util/Config.h>
+#include <activemq/util/CMSExceptionSupport.h>
+#include <activemq/util/ActiveMQProperties.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/Message.h>
+#include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessagePull.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/commands/TransactionInfo.h>
+#include <activemq/commands/TransactionId.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConstants.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/core/ActiveMQAckHandler.h>
+#include <activemq/core/FifoMessageDispatchChannel.h>
+#include <activemq/core/SimplePriorityMessageDispatchChannel.h>
+#include <activemq/core/RedeliveryPolicy.h>
+#include <activemq/threads/Scheduler.h>
+#include <cms/ExceptionListener.h>
+#include <memory>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::core;
+using namespace activemq::core::kernels;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace activemq::threads;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core {
+namespace kernels {
+
+    class ActiveMQConsumerKernelConfig {
+    private:
+
+        ActiveMQConsumerKernelConfig( const ActiveMQConsumerKernelConfig& );
+        ActiveMQConsumerKernelConfig& operator= ( const ActiveMQConsumerKernelConfig& );
+
+    public:
+
+        cms::MessageListener* listener;
+        decaf::util::concurrent::Mutex listenerMutex;
+        AtomicBoolean deliveringAcks;
+        AtomicBoolean started;
+        Pointer<MessageDispatchChannel> unconsumedMessages;
+        decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
+        long long lastDeliveredSequenceId;
+        Pointer<commands::MessageAck> pendingAck;
+        int deliveredCounter;
+        int additionalWindowSize;
+        volatile bool synchronizationRegistered;
+        bool clearDispatchList;
+        bool inProgressClearRequiredFlag;
+        long long redeliveryDelay;
+        Pointer<RedeliveryPolicy> redeliveryPolicy;
+        Pointer<Exception> failureError;
+        Pointer<Scheduler> scheduler;
+
+        ActiveMQConsumerKernelConfig() : listener(NULL),
+                                         listenerMutex(),
+                                         deliveringAcks(),
+                                         started(),
+                                         unconsumedMessages(),
+                                         dispatchedMessages(),
+                                         lastDeliveredSequenceId(0),
+                                         pendingAck(),
+                                         deliveredCounter(0),
+                                         additionalWindowSize(0),
+                                         synchronizationRegistered(false),
+                                         clearDispatchList(false),
+                                         inProgressClearRequiredFlag(false),
+                                         redeliveryDelay(0),
+                                         redeliveryPolicy(),
+                                         failureError(),
+                                         scheduler() {
+        }
+    };
+
+    /**
+     * Class used to deal with consumers in an active transaction.  This
+     * class calls back into the consumer when the transaction is Committed or
+     * Rolled Back to process that event.
+     */
+    class TransactionSynhcronization : public Synchronization {
+    private:
+
+        ActiveMQConsumerKernel* consumer;
+
+    private:
+
+        TransactionSynhcronization(const TransactionSynhcronization&);
+        TransactionSynhcronization& operator=(const TransactionSynhcronization&);
+
+    public:
+
+        TransactionSynhcronization(ActiveMQConsumerKernel* consumer) : consumer(consumer) {
+            if (consumer == NULL) {
+                throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+            }
+        }
+
+        virtual ~TransactionSynhcronization() {}
+
+        virtual void beforeEnd() {
+            consumer->acknowledge();
+            consumer->setSynchronizationRegistered(false);
+        }
+
+        virtual void afterCommit() {
+            consumer->commit();
+            consumer->setSynchronizationRegistered(false);
+        }
+
+        virtual void afterRollback() {
+            consumer->rollback();
+            consumer->setSynchronizationRegistered(false);
+        }
+    };
+
+    /**
+     * Class used to Hook a consumer that has been closed into the Transaction
+     * it is currently a part of.  Once the Transaction has been Committed or
+     * Rolled back this Synchronization can finish the Close of the consumer.
+     */
+    class CloseSynhcronization : public Synchronization {
+    private:
+
+        ActiveMQConsumerKernel* consumer;
+
+    private:
+
+        CloseSynhcronization(const CloseSynhcronization&);
+        CloseSynhcronization& operator=(const CloseSynhcronization&);
+
+    public:
+
+        CloseSynhcronization(ActiveMQConsumerKernel* consumer) : consumer(consumer) {
+            if (consumer == NULL) {
+                throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+            }
+        }
+
+        virtual ~CloseSynhcronization() {}
+
+        virtual void beforeEnd() {
+        }
+
+        virtual void afterCommit() {
+            consumer->doClose();
+        }
+
+        virtual void afterRollback() {
+            consumer->doClose();
+        }
+    };
+
+    /**
+     * ActiveMQAckHandler used to support Client Acknowledge mode.
+     */
+    class ClientAckHandler : public ActiveMQAckHandler {
+    private:
+
+        ActiveMQSession* session;
+
+    private:
+
+        ClientAckHandler( const ClientAckHandler& );
+        ClientAckHandler& operator= ( const ClientAckHandler& );
+
+    public:
+
+        ClientAckHandler( ActiveMQSession* session ) : session(session) {
+            if( session == NULL ) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Ack Handler Created with NULL Session.");
+            }
+        }
+
+        void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
+
+            try {
+                this->session->acknowledge();
+            }
+            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+        }
+    };
+
+    /**
+     * ActiveMQAckHandler used to enable the Individual Acknowledge mode.
+     */
+    class IndividualAckHandler : public ActiveMQAckHandler {
+    private:
+
+        ActiveMQConsumerKernel* consumer;
+        Pointer<commands::MessageDispatch> dispatch;
+
+    private:
+
+        IndividualAckHandler( const IndividualAckHandler& );
+        IndividualAckHandler& operator= ( const IndividualAckHandler& );
+
+    public:
+
+        IndividualAckHandler( ActiveMQConsumerKernel* consumer, const Pointer<MessageDispatch>& dispatch ) :
+            consumer(consumer), dispatch(dispatch) {
+
+            if( consumer == NULL ) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Ack Handler Created with NULL consumer.");
+            }
+        }
+
+        void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
+
+            try {
+
+                if( this->dispatch != NULL ) {
+                    this->consumer->acknowledge( this->dispatch );
+                    this->dispatch.reset( NULL );
+                }
+            }
+            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+        }
+    };
+
+    /**
+     * Class used to Start a Consumer's dispatch queue asynchronously from the
+     * configured Scheduler.
+     */
+    class StartConsumerTask : public Runnable {
+    private:
+
+        ActiveMQConsumerKernel* consumer;
+
+    private:
+
+        StartConsumerTask( const StartConsumerTask& );
+        StartConsumerTask& operator= ( const StartConsumerTask& );
+
+    public:
+
+        StartConsumerTask( ActiveMQConsumerKernel* consumer ) : Runnable(), consumer(NULL) {
+
+            if( consumer == NULL ) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+            }
+
+            this->consumer = consumer;
+        }
+
+        virtual ~StartConsumerTask() {}
+
+        virtual void run() {
+            try{
+                if(!this->consumer->isClosed()) {
+                    this->consumer->start();
+                }
+            } catch(cms::CMSException& ex) {
+                // TODO - Need Connection onAsyncException method.
+            }
+        }
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSession* session,
+                                               const Pointer<ConsumerId>& id,
+                                               const Pointer<ActiveMQDestination>& destination,
+                                               const std::string& name,
+                                               const std::string& selector,
+                                               int prefetch,
+                                               int maxPendingMessageCount,
+                                               bool noLocal,
+                                               bool browser,
+                                               bool dispatchAsync,
+                                               cms::MessageListener* listener ) : internal(NULL), session(NULL), consumerInfo() {
+
+    if (session == NULL) {
+        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConsumerKernel::ActiveMQConsumerKernel - Init with NULL Session");
+    }
+
+    if (destination == NULL) {
+        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConsumerKernel::ActiveMQConsumerKernel - Init with NULL Destination");
+    }
+
+    if (destination->getPhysicalName() == "") {
+        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConsumerKernel::ActiveMQConsumerKernel - Destination given has no Physical Name.");
+    }
+
+    this->internal = new ActiveMQConsumerKernelConfig();
+
+    Pointer<ConsumerInfo> consumerInfo(new ConsumerInfo());
+
+    consumerInfo->setConsumerId(id);
+    consumerInfo->setDestination(destination);
+    consumerInfo->setSubscriptionName(name);
+    consumerInfo->setSelector(selector);
+    consumerInfo->setPrefetchSize(prefetch);
+    consumerInfo->setMaximumPendingMessageLimit(maxPendingMessageCount);
+    consumerInfo->setBrowser(browser);
+    consumerInfo->setDispatchAsync(dispatchAsync);
+    consumerInfo->setNoLocal(noLocal);
+
+    // Initialize Consumer Data
+    this->session = session;
+    this->consumerInfo = consumerInfo;
+    this->internal->lastDeliveredSequenceId = -1;
+    this->internal->synchronizationRegistered = false;
+    this->internal->additionalWindowSize = 0;
+    this->internal->deliveredCounter = 0;
+    this->internal->clearDispatchList = false;
+    this->internal->inProgressClearRequiredFlag = false;
+    this->internal->listener = NULL;
+    this->internal->redeliveryDelay = 0;
+    this->internal->redeliveryPolicy.reset(this->session->getConnection()->getRedeliveryPolicy()->clone());
+    this->internal->scheduler = this->session->getScheduler();
+
+    if (this->session->getConnection()->isMessagePrioritySupported()) {
+        this->internal->unconsumedMessages.reset(new SimplePriorityMessageDispatchChannel());
+    } else {
+        this->internal->unconsumedMessages.reset(new FifoMessageDispatchChannel());
+    }
+
+    if (listener != NULL) {
+        this->setMessageListener(listener);
+    }
+
+    applyDestinationOptions(this->consumerInfo);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConsumerKernel::~ActiveMQConsumerKernel() throw() {
+
+    try {
+
+        try{
+            this->close();
+        } catch(...) {}
+
+        delete this->internal;
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::start() {
+
+    if (this->internal->unconsumedMessages->isClosed()) {
+        return;
+    }
+
+    this->internal->started.set(true);
+    this->internal->unconsumedMessages->start();
+    this->session->wakeup();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::stop() {
+    this->internal->started.set( false );
+    this->internal->unconsumedMessages->stop();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isClosed() const {
+    return this->internal->unconsumedMessages->isClosed();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::close() {
+
+    try{
+        if (!this->isClosed()) {
+
+            if (this->session->getTransactionContext() != NULL &&
+                this->session->getTransactionContext()->isInTransaction()) {
+
+                Pointer<Synchronization> sync(new CloseSynhcronization(this));
+                this->session->getTransactionContext()->addSynchronization(sync);
+
+                doClose();
+            } else {
+                doClose();
+            }
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::doClose() {
+
+    try {
+
+        dispose();
+        // Remove at the Broker Side, consumer has been removed from the local
+        // Session and Connection objects so if the remote call to remove throws
+        // it is okay to propagate to the client.
+        Pointer<RemoveInfo> info(new RemoveInfo);
+        info->setObjectId(this->consumerInfo->getConsumerId());
+        info->setLastDeliveredSequenceId(this->internal->lastDeliveredSequenceId);
+        this->session->oneway(info);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::dispose() {
+
+    try{
+        if (!this->isClosed()) {
+
+            if (!session->isTransacted()) {
+                deliverAcks();
+            }
+
+            this->internal->started.set(false);
+
+            // Identifies any errors encountered during shutdown.
+            bool haveException = false;
+            ActiveMQException error;
+
+            // Purge all the pending messages
+            try{
+                this->internal->unconsumedMessages->clear();
+            } catch ( ActiveMQException& ex ){
+                if( !haveException ){
+                    ex.setMark( __FILE__, __LINE__ );
+                    error = ex;
+                    haveException = true;
+                }
+            }
+
+            // Stop and Wakeup all sync consumers.
+            this->internal->unconsumedMessages->close();
+
+            if (this->session->isIndividualAcknowledge()) {
+                // For IndividualAck Mode we need to unlink the ack handler to remove a
+                // cyclic reference to the MessageDispatch that brought the message to us.
+                synchronized(&internal->dispatchedMessages) {
+                    std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->dispatchedMessages.iterator());
+                    while (iter->hasNext()) {
+                        iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
+                    }
+
+                    this->internal->dispatchedMessages.clear();
+                }
+            }
+
+            // Remove this Consumer from the Connections set of Dispatchers
+            this->session->removeConsumer(this->consumerInfo->getConsumerId());
+
+            // If we encountered an error, propagate it.
+            if (haveException) {
+                error.setMark(__FILE__, __LINE__);
+                throw error;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQConsumerKernel::getMessageSelector() const {
+
+    try {
+        // Fetch the Selector
+        return this->consumerInfo->getSelector();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue( long long timeout ) {
+
+    try {
+
+        this->checkClosed();
+
+        // Calculate the deadline
+        long long deadline = 0;
+        if (timeout > 0) {
+            deadline = System::currentTimeMillis() + timeout;
+        }
+
+        // Loop until the time is up or we get a non-expired message
+        while (true) {
+
+            Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue( timeout );
+            if (dispatch == NULL) {
+
+                if( timeout > 0 && !this->internal->unconsumedMessages->isClosed() ) {
+                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
+                } else {
+                    if( this->internal->failureError != NULL ) {
+                        throw CMSExceptionSupport::create(*this->internal->failureError);
+                    } else {
+                        return Pointer<MessageDispatch>();
+                    }
+                }
+
+            } else if( dispatch->getMessage() == NULL ) {
+
+                return Pointer<MessageDispatch>();
+
+            } else if( dispatch->getMessage()->isExpired() ) {
+
+                beforeMessageIsConsumed( dispatch );
+                afterMessageIsConsumed( dispatch, true );
+                if( timeout > 0 ) {
+                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
+                }
+
+                continue;
+            }
+
+            // Return the message.
+            return dispatch;
+        }
+
+        return Pointer<MessageDispatch>();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQConsumerKernel::receive() {
+
+    try{
+
+        this->checkClosed();
+
+        // Send a request for a new message if needed
+        this->sendPullRequest( 0 );
+
+        // Wait for the next message.
+        Pointer<MessageDispatch> message = dequeue( -1 );
+        if( message == NULL ) {
+            return NULL;
+        }
+
+        // Message pre-processing
+        beforeMessageIsConsumed( message );
+
+        // Need to clone the message because the user is responsible for freeing
+        // its copy of the message.
+        cms::Message* clonedMessage =
+            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
+
+        // Post processing (may result in the message being deleted)
+        afterMessageIsConsumed( message, false );
+
+        // Return the cloned message.
+        return clonedMessage;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQConsumerKernel::receive( int millisecs ) {
+
+    try {
+
+        this->checkClosed();
+
+        // Send a request for a new message if needed
+        this->sendPullRequest( millisecs );
+
+        // Wait for the next message.
+        Pointer<MessageDispatch> message = dequeue( millisecs );
+        if( message == NULL ) {
+            return NULL;
+        }
+
+        // Message preprocessing
+        beforeMessageIsConsumed( message );
+
+        // Need to clone the message because the user is responsible for freeing
+        // its copy of the message.
+        cms::Message* clonedMessage =
+            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
+
+        // Post processing (may result in the message being deleted)
+        afterMessageIsConsumed( message, false );
+
+        // Return the cloned message.
+        return clonedMessage;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQConsumerKernel::receiveNoWait() {
+
+    try {
+
+        this->checkClosed();
+
+        // Send a request for a new message if needed
+        this->sendPullRequest( -1 );
+
+        // Get the next available message, if there is one.
+        Pointer<MessageDispatch> message = dequeue( 0 );
+        if( message == NULL ) {
+            return NULL;
+        }
+
+        // Message preprocessing
+        beforeMessageIsConsumed( message );
+
+        // Need to clone the message because the user is responsible for freeing
+        // its copy of the message.
+        cms::Message* clonedMessage =
+            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
+
+        // Post processing (may result in the message being deleted)
+        afterMessageIsConsumed( message, false );
+
+        // Return the cloned message.
+        return clonedMessage;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setMessageListener( cms::MessageListener* listener ) {
+
+    try{
+
+        this->checkClosed();
+
+        if( this->consumerInfo->getPrefetchSize() == 0 && listener != NULL ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Cannot deliver async when Prefetch is Zero, set Prefecth to at least One.");
+        }
+
+        if( listener != NULL ) {
+
+            // Now that we have a valid message listener,
+            // redispatch all the messages that it missed.
+
+            bool wasStarted = session->isStarted();
+            if( wasStarted ) {
+                session->stop();
+            }
+
+            synchronized( &(this->internal->listenerMutex) ) {
+                this->internal->listener = listener;
+            }
+
+            this->session->redispatch( *(this->internal->unconsumedMessages) );
+
+            if( wasStarted ) {
+                this->session->start();
+            }
+        } else {
+            synchronized( &(this->internal->listenerMutex) ) {
+                this->internal->listener = NULL;
+            }
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::beforeMessageIsConsumed( const Pointer<MessageDispatch>& dispatch ) {
+
+    // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
+    // we set the handler in the message to this object and send it out.
+    if( session->isClientAcknowledge() ) {
+        Pointer<ActiveMQAckHandler> ackHandler( new ClientAckHandler( this->session ) );
+        dispatch->getMessage()->setAckHandler( ackHandler );
+    } else if( session->isIndividualAcknowledge() ) {
+        Pointer<ActiveMQAckHandler> ackHandler( new IndividualAckHandler( this, dispatch ) );
+        dispatch->getMessage()->setAckHandler( ackHandler );
+    }
+
+    this->internal->lastDeliveredSequenceId =
+        dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
+
+    if( !isAutoAcknowledgeBatch() ) {
+
+        // When not in an Auto
+        synchronized( &this->internal->dispatchedMessages ) {
+            this->internal->dispatchedMessages.addFirst( dispatch );
+        }
+
+        if( this->session->isTransacted() ) {
+            ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::afterMessageIsConsumed( const Pointer<MessageDispatch>& message,
+                                               bool messageExpired ) {
+
+    try{
+
+        if( this->internal->unconsumedMessages->isClosed() ) {
+            return;
+        }
+
+        if( messageExpired == true ) {
+            ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        }
+
+        if( session->isTransacted() ) {
+            return;
+        } else if( isAutoAcknowledgeEach() ) {
+
+            if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
+
+                synchronized( &this->internal->dispatchedMessages ) {
+                    if( !this->internal->dispatchedMessages.isEmpty() ) {
+                        Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
+                            ActiveMQConstants::ACK_TYPE_CONSUMED );
+
+                        if( ack != NULL ) {
+                            this->internal->dispatchedMessages.clear();
+                            session->oneway( ack );
+                        }
+                    }
+                }
+
+                this->internal->deliveringAcks.set( false );
+            }
+
+        } else if( isAutoAcknowledgeBatch() ) {
+            ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
+        } else if( session->isClientAcknowledge() || session->isIndividualAcknowledge() ) {
+
+            bool messageUnackedByConsumer = false;
+
+            synchronized( &this->internal->dispatchedMessages ) {
+                messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message);
+            }
+
+            if( messageUnackedByConsumer ) {
+                this->ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+            }
+
+        } else {
+            throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" );
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::deliverAcks() {
+
+    try{
+
+        Pointer<MessageAck> ack;
+
+        if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
+
+            if( isAutoAcknowledgeEach() ) {
+
+                synchronized( &this->internal->dispatchedMessages ) {
+
+                    ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
+
+                    if( ack != NULL ) {
+                        this->internal->dispatchedMessages.clear();
+                    } else {
+                        ack.swap( internal->pendingAck );
+                    }
+                }
+
+            } else if( this->internal->pendingAck != NULL &&
+                       this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
+
+                ack.swap( this->internal->pendingAck );
+            }
+
+            if( ack != NULL ) {
+
+                try{
+                    this->session->oneway( ack );
+                } catch(...) {}
+
+            } else {
+                this->internal->deliveringAcks.set( false );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::ackLater( const Pointer<MessageDispatch>& dispatch, int ackType ) {
+
+    // Don't acknowledge now, but we may need to let the broker know the
+    // consumer got the message to expand the pre-fetch window
+    if( session->isTransacted() ) {
+        session->doStartTransaction();
+        if( !this->internal->synchronizationRegistered ) {
+            this->internal->synchronizationRegistered = true;
+
+            Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
+            this->session->getTransactionContext()->addSynchronization( sync );
+        }
+    }
+
+    // The delivered message list is only needed for the recover method
+    // which is only used with client ack.
+    this->internal->deliveredCounter++;
+
+    Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
+    this->internal->pendingAck.reset( new MessageAck() );
+    this->internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
+    this->internal->pendingAck->setAckType( (unsigned char)ackType );
+    this->internal->pendingAck->setDestination( dispatch->getDestination() );
+    this->internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() );
+    this->internal->pendingAck->setMessageCount( internal->deliveredCounter );
+
+    if( oldPendingAck == NULL ) {
+        this->internal->pendingAck->setFirstMessageId( this->internal->pendingAck->getLastMessageId() );
+    } else if ( oldPendingAck->getAckType() == this->internal->pendingAck->getAckType() ) {
+        this->internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
+    } else {
+        // old pending ack being superseded by ack of another type, if is is not a delivered
+        // ack and hence important, send it now so it is not lost.
+        if( oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED ) {
+            session->oneway( oldPendingAck );
+        }
+    }
+
+    if( session->isTransacted() ) {
+        this->internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId() );
+    }
+
+    if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( internal->deliveredCounter - internal->additionalWindowSize ) ) {
+        session->oneway( this->internal->pendingAck );
+        this->internal->pendingAck.reset( NULL );
+        this->internal->deliveredCounter = 0;
+        this->internal->additionalWindowSize = 0;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageAck> ActiveMQConsumerKernel::makeAckForAllDeliveredMessages( int type ) {
+
+    synchronized( &this->internal->dispatchedMessages ) {
+
+        if( !this->internal->dispatchedMessages.isEmpty() ) {
+
+            Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
+
+            Pointer<MessageAck> ack( new MessageAck() );
+            ack->setAckType( (unsigned char)type );
+            ack->setConsumerId( dispatched->getConsumerId() );
+            ack->setDestination( dispatched->getDestination() );
+            ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
+            ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
+            ack->setFirstMessageId( this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId() );
+
+            return ack;
+        }
+    }
+
+    return Pointer<MessageAck>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::acknowledge( const Pointer<commands::MessageDispatch>& dispatch ) {
+
+    try{
+
+        this->checkClosed();
+
+        if( this->session->isIndividualAcknowledge() ) {
+
+            Pointer<MessageAck> ack( new MessageAck() );
+            ack->setAckType( ActiveMQConstants::ACK_TYPE_CONSUMED );
+            ack->setConsumerId( this->consumerInfo->getConsumerId() );
+            ack->setDestination( this->consumerInfo->getDestination() );
+            ack->setMessageCount( 1 );
+            ack->setLastMessageId( dispatch->getMessage()->getMessageId() );
+            ack->setFirstMessageId( dispatch->getMessage()->getMessageId() );
+
+            session->oneway( ack );
+
+            synchronized( &this->internal->dispatchedMessages ) {
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
+                while( iter->hasNext() ) {
+                    if( iter->next() == dispatch ) {
+                        iter->remove();
+                        break;
+                    }
+                }
+            }
+
+        } else {
+            throw IllegalStateException(
+                __FILE__, __LINE__,
+                "Session is not in IndividualAcknowledge mode." );
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::acknowledge() {
+
+    try{
+
+        synchronized( &this->internal->dispatchedMessages ) {
+
+            // Acknowledge all messages so far.
+            Pointer<MessageAck> ack =
+                makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
+
+            if( ack == NULL ) {
+                return;
+            }
+
+            if( session->isTransacted() ) {
+                session->doStartTransaction();
+                ack->setTransactionId( session->getTransactionContext()->getTransactionId() );
+            }
+
+            session->oneway( ack );
+            this->internal->pendingAck.reset( NULL );
+
+            // Adjust the counters
+            this->internal->deliveredCounter =
+                Math::max( 0, this->internal->deliveredCounter - (int)this->internal->dispatchedMessages.size());
+            this->internal->additionalWindowSize =
+                Math::max(0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size());
+
+            if( !session->isTransacted() ) {
+                this->internal->dispatchedMessages.clear();
+            }
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::commit() {
+
+    synchronized( &(this->internal->dispatchedMessages) ) {
+        this->internal->dispatchedMessages.clear();
+    }
+    this->internal->redeliveryDelay = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::rollback() {
+
+    synchronized( this->internal->unconsumedMessages.get() ) {
+
+        synchronized( &this->internal->dispatchedMessages ) {
+            if( this->internal->dispatchedMessages.isEmpty() ) {
+                return;
+            }
+
+            // Only increase the redelivery delay after the first redelivery..
+            Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
+            const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
+            if( currentRedeliveryCount > 0 ) {
+                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay( internal->redeliveryDelay );
+            } else {
+                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
+            }
+
+            Pointer<MessageId> firstMsgId =
+                this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
+
+            std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( internal->dispatchedMessages.iterator() );
+
+            while( iter->hasNext() ) {
+                Pointer<Message> message = iter->next()->getMessage();
+                message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 );
+            }
+
+            if( this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
+                lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries() ) {
+
+                // We need to NACK the messages so that they get sent to the DLQ.
+                // Acknowledge the last message.
+                Pointer<MessageAck> ack( new MessageAck() );
+                ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
+                ack->setConsumerId( this->consumerInfo->getConsumerId() );
+                ack->setDestination( lastMsg->getDestination() );
+                ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
+                ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
+                ack->setFirstMessageId( firstMsgId );
+
+                session->oneway( ack );
+                // Adjust the window size.
+                this->internal->additionalWindowSize =
+                    Math::max( 0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size() );
+                this->internal->redeliveryDelay = 0;
+
+            } else {
+
+                // only redelivery_ack after first delivery
+                if( currentRedeliveryCount > 0 ) {
+                    Pointer<MessageAck> ack( new MessageAck() );
+                    ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
+                    ack->setConsumerId( this->consumerInfo->getConsumerId() );
+                    ack->setDestination( lastMsg->getDestination() );
+                    ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
+                    ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
+                    ack->setFirstMessageId( firstMsgId );
+
+                    session->oneway( ack );
+                }
+
+                // stop the delivery of messages.
+                this->internal->unconsumedMessages->stop();
+
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
+
+                while( iter->hasNext() ) {
+                    this->internal->unconsumedMessages->enqueueFirst( iter->next() );
+                }
+
+                if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed() ) {
+                    // TODO - Can't do this until we can control object lifetime.
+                    // Start up the delivery again a little later.
+                    // this->internal->scheduler->executeAfterDelay(
+                    //    new StartConsumerTask(this), internal->redeliveryDelay);
+                    start();
+                } else {
+                    start();
+                }
+
+            }
+            this->internal->deliveredCounter -= (int)internal->dispatchedMessages.size();
+            this->internal->dispatchedMessages.clear();
+        }
+    }
+
+    if( this->internal->listener != NULL ) {
+        session->redispatch( *this->internal->unconsumedMessages );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::dispatch( const Pointer<MessageDispatch>& dispatch ) {
+
+    try {
+
+        synchronized( this->internal->unconsumedMessages.get() ) {
+
+            clearMessagesInProgress();
+            if( this->internal->clearDispatchList ) {
+                // we are reconnecting so lets flush the in progress
+                // messages
+                this->internal->clearDispatchList = false;
+                this->internal->unconsumedMessages->clear();
+            }
+
+            if( !this->internal->unconsumedMessages->isClosed() ) {
+
+                // Don't dispatch expired messages, ack it and then destroy it
+                if( dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired() ) {
+                    this->ackLater( dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED );
+
+                    // stop now, don't queue
+                    return;
+                }
+
+                synchronized( &this->internal->listenerMutex ) {
+                    // If we have a listener, send the message.
+                    if( this->internal->listener != NULL && internal->unconsumedMessages->isRunning() ) {
+
+                        // Preprocessing.
+                        beforeMessageIsConsumed( dispatch );
+
+                        // Notify the listener
+                        this->internal->listener->onMessage(
+                            dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
+
+                        // Postprocessing
+                        afterMessageIsConsumed( dispatch, false );
+
+                    } else {
+
+                        // No listener, add it to the unconsumed messages list
+                        this->internal->unconsumedMessages->enqueue( dispatch );
+                    }
+                }
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::sendPullRequest( long long timeout ) {
+
+    try {
+
+        this->checkClosed();
+
+        // There are still local message, consume them first.
+        if( !this->internal->unconsumedMessages->isEmpty() ) {
+            return;
+        }
+
+        if( this->consumerInfo->getPrefetchSize() == 0 ) {
+
+            Pointer<MessagePull> messagePull( new MessagePull() );
+            messagePull->setConsumerId( this->consumerInfo->getConsumerId() );
+            messagePull->setDestination( this->consumerInfo->getDestination() );
+            messagePull->setTimeout( timeout );
+
+            this->session->oneway( messagePull );
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::checkClosed() const {
+    if( this->isClosed() ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQConsumerKernel - Consumer Already Closed" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::iterate() {
+
+    synchronized( &this->internal->listenerMutex ) {
+
+        if( this->internal->listener != NULL ) {
+
+            Pointer<MessageDispatch> dispatch = internal->unconsumedMessages->dequeueNoWait();
+            if( dispatch != NULL ) {
+
+                try {
+                    beforeMessageIsConsumed( dispatch );
+                    this->internal->listener->onMessage(
+                        dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
+                    afterMessageIsConsumed( dispatch, false );
+                } catch( ActiveMQException& ex ) {
+                    this->session->fire( ex );
+                }
+
+                return true;
+            }
+        }
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::inProgressClearRequired() {
+
+    this->internal->inProgressClearRequiredFlag = true;
+    // Clears dispatched messages async to avoid lock contention with inprogress acks.
+    this->internal->clearDispatchList = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::clearMessagesInProgress() {
+    if( this->internal->inProgressClearRequiredFlag ) {
+        synchronized( this->internal->unconsumedMessages.get() ) {
+            if( this->internal->inProgressClearRequiredFlag ) {
+
+                // TODO - Rollback duplicates.
+
+                // allow dispatch on this connection to resume
+                this->session->getConnection()->setTransportInterruptionProcessingComplete();
+                this->internal->inProgressClearRequiredFlag = false;
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isAutoAcknowledgeEach() const {
+    return this->session->isAutoAcknowledge() ||
+           (this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isAutoAcknowledgeBatch() const {
+    return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConsumerKernel::getMessageAvailableCount() const {
+    return this->internal->unconsumedMessages->size();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::applyDestinationOptions( const Pointer<ConsumerInfo>& info ) {
+
+    decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
+
+    // Get any options specified in the destination and apply them to the
+    // ConsumerInfo object.
+    const ActiveMQProperties& options = amqDestination->getOptions();
+
+    std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL);
+    if (options.hasProperty(noLocalStr)) {
+        info->setNoLocal(Boolean::parseBoolean(options.getProperty(noLocalStr)));
+    }
+
+    std::string selectorStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_SELECTOR);
+    if (options.hasProperty(selectorStr)) {
+        info->setSelector(options.getProperty(selectorStr));
+    }
+
+    std::string priorityStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PRIORITY);
+    if (options.hasProperty(priorityStr)) {
+        info->setPriority((unsigned char) Integer::parseInt(options.getProperty(priorityStr)));
+    }
+
+    std::string dispatchAsyncStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_DISPATCHASYNC);
+    if (options.hasProperty(dispatchAsyncStr)) {
+        info->setDispatchAsync(Boolean::parseBoolean(options.getProperty(dispatchAsyncStr)));
+    }
+
+    std::string exclusiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_EXCLUSIVE);
+    if (options.hasProperty(exclusiveStr)) {
+        info->setExclusive(Boolean::parseBoolean(options.getProperty(exclusiveStr)));
+    }
+
+    std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT);
+
+    if (options.hasProperty(maxPendingMsgLimitStr)) {
+        info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr)));
+    }
+
+    std::string prefetchSizeStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE);
+    if (info->getPrefetchSize() <= 0 || options.hasProperty(prefetchSizeStr)) {
+        info->setPrefetchSize(Integer::parseInt(options.getProperty(prefetchSizeStr, "1000")));
+    }
+
+    std::string retroactiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_RETROACTIVE);
+    if (options.hasProperty(retroactiveStr)) {
+        info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr)));
+    }
+
+    std::string networkSubscriptionStr = "consumer.networkSubscription";
+
+    if (options.hasProperty(networkSubscriptionStr)) {
+        info->setNetworkSubscription(Boolean::parseBoolean(options.getProperty(networkSubscriptionStr)));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
+    if( policy != NULL ) {
+        this->internal->redeliveryPolicy.reset(policy);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RedeliveryPolicy* ActiveMQConsumerKernel::getRedeliveryPolicy() const {
+    return this->internal->redeliveryPolicy.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageListener* ActiveMQConsumerKernel::getMessageListener() const {
+    return this->internal->listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const Pointer<commands::ConsumerInfo>& ActiveMQConsumerKernel::getConsumerInfo() const {
+    this->checkClosed();
+    return this->consumerInfo;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const Pointer<commands::ConsumerId>& ActiveMQConsumerKernel::getConsumerId() const {
+    this->checkClosed();
+    return this->consumerInfo->getConsumerId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isSynchronizationRegistered() const {
+    return this->internal->synchronizationRegistered;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setSynchronizationRegistered( bool value ) {
+    this->internal->synchronizationRegistered = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConsumerKernel::getLastDeliveredSequenceId() const {
+    return this->internal->lastDeliveredSequenceId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setLastDeliveredSequenceId( long long value ) {
+    this->internal->lastDeliveredSequenceId = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setFailureError( decaf::lang::Exception* error ) {
+    if (error != NULL) {
+        this->internal->failureError.reset( error->clone() );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Exception* ActiveMQConsumerKernel::getFailureError() const {
+    if (this->internal->failureError == NULL) {
+        return NULL;
+    }
+
+    return this->internal->failureError.get();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1305601&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Mon Mar 26 21:11:12 2012
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
+#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_
+
+#include <cms/MessageConsumer.h>
+#include <cms/MessageListener.h>
+#include <cms/Message.h>
+#include <cms/CMSException.h>
+
+#include <activemq/util/Config.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessageDispatch.h>
+#include <activemq/core/Dispatcher.h>
+#include <activemq/core/RedeliveryPolicy.h>
+#include <activemq/core/MessageDispatchChannel.h>
+
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/concurrent/Mutex.h>
+
+namespace activemq {
+namespace core {
+    class ActiveMQSession;
+namespace kernels {
+
+    using decaf::lang::Pointer;
+    using decaf::util::concurrent::atomic::AtomicBoolean;
+
+    class ActiveMQConsumerKernelConfig;
+
+    class AMQCPP_API ActiveMQConsumerKernel : public cms::MessageConsumer, public Dispatcher {
+    private:
+
+        /**
+         * Internal Class that holds Members of this class, allows for changes without API breakage.
+         */
+        ActiveMQConsumerKernelConfig* internal;
+
+        /**
+         * The ActiveMQSession that owns this class instance.
+         */
+        ActiveMQSession* session;
+
+        /**
+         * The ConsumerInfo object for this class instance.
+         */
+        Pointer<commands::ConsumerInfo> consumerInfo;
+
+    private:
+
+        ActiveMQConsumerKernel( const ActiveMQConsumerKernel& );
+        ActiveMQConsumerKernel& operator= ( const ActiveMQConsumerKernel& );
+
+    public:
+
+        ActiveMQConsumerKernel(ActiveMQSession* session,
+                               const Pointer<commands::ConsumerId>& id,
+                               const Pointer<commands::ActiveMQDestination>& destination,
+                               const std::string& name,
+                               const std::string& selector,
+                               int prefetch,
+                               int maxPendingMessageCount,
+                               bool noLocal,
+                               bool browser,
+                               bool dispatchAsync,
+                               cms::MessageListener* listener);
+
+        virtual ~ActiveMQConsumerKernel() throw();
+
+    public:  // Interface Implementation
+
+        virtual void start();
+
+        virtual void stop();
+
+        virtual void close();
+
+        virtual cms::Message* receive();
+
+        virtual cms::Message* receive( int millisecs );
+
+        virtual cms::Message* receiveNoWait();
+
+        virtual void setMessageListener( cms::MessageListener* listener );
+
+        virtual cms::MessageListener* getMessageListener() const;
+
+        virtual std::string getMessageSelector() const;
+
+        virtual void acknowledge( const Pointer<commands::MessageDispatch>& dispatch );
+
+    public:  // Dispatcher Methods
+
+        virtual void dispatch( const Pointer<MessageDispatch>& message );
+
+    public:  // ActiveMQConsumer Methods
+
+        /**
+         * Method called to acknowledge all messages that have been received so far.
+         *
+         * @throw CMSException if an error occurs while ack'ing the message.
+         */
+        void acknowledge();
+
+        /**
+         * Called to Commit the current set of messages in this Transaction
+         *
+         * @throw ActiveMQException if an error occurs while performing the operation.
+         */
+        void commit();
+
+        /**
+         * Called to Roll back the current set of messages in this Transaction
+         *
+         * @throw ActiveMQException if an error occurs while performing the operation.
+         */
+        void rollback();
+
+        /**
+         * Performs the actual close operation on this consumer
+         *
+         * @throw ActiveMQException if an error occurs while performing the operation.
+         */
+        void doClose();
+
+        /**
+         * Cleans up this objects internal resources.
+         *
+         * @throw ActiveMQException if an error occurs while performing the operation.
+         */
+        void dispose();
+
+        /**
+         * Get the Consumer information for this consumer
+         * @return Reference to a Consumer Info Object
+         */
+        const Pointer<commands::ConsumerInfo>& getConsumerInfo() const;
+
+        /**
+         * Get the Consumer Id for this consumer
+         * @return Reference to a Consumer Id Object
+         */
+        const Pointer<commands::ConsumerId>& getConsumerId() const;
+
+        /**
+         * @returns if this Consumer has been closed.
+         */
+        bool isClosed() const;
+
+        /**
+         * Has this Consumer Transaction Synchronization been added to the transaction
+         * @return true if the synchronization has been added.
+         */
+        bool isSynchronizationRegistered() const ;
+
+        /**
+         * Sets the Synchronization Registered state of this consumer.
+         * @param value - true if registered false otherwise.
+         */
+        void setSynchronizationRegistered( bool value );
+
+        /**
+         * Deliver any pending messages to the registered MessageListener if there
+         * is one, return true if not all dispatched, or false if no listener or all
+         * pending messages have been dispatched.
+         */
+        bool iterate();
+
+        /**
+         * Forces this consumer to send all pending acks to the broker.
+         *
+         * @throw ActiveMQException if an error occurs while performing the operation.
+         */
+        void deliverAcks();
+
+        /**
+         * Called on a Failover to clear any pending messages.
+         */
+        void clearMessagesInProgress();
+
+        /**
+         * Signals that a Failure occurred and that anything in-progress in the
+         * consumer should be cleared.
+         */
+        void inProgressClearRequired();
+
+        /**
+         * Gets the currently set Last Delivered Sequence Id
+         *
+         * @returns long long containing the sequence id of the last delivered Message.
+         */
+        long long getLastDeliveredSequenceId() const;
+
+        /**
+         * Sets the value of the Last Delivered Sequence Id
+         *
+         * @param value
+         *      The new value to assign to the Last Delivered Sequence Id property.
+         */
+        void setLastDeliveredSequenceId( long long value );
+
+        /**
+         * @returns the number of Message's this consumer is waiting to Dispatch.
+         */
+        int getMessageAvailableCount() const;
+
+        /**
+         * Sets the RedeliveryPolicy this Consumer should use when a rollback is
+         * performed on a transacted Consumer.  The Consumer takes ownership of the
+         * passed pointer.  The Consumer's redelivery policy can never be null, a
+         * call to this method with a NULL pointer is ignored.
+         *
+         * @param policy
+         *      Pointer to a Redelivery Policy object that his Consumer will use.
+         */
+        void setRedeliveryPolicy( RedeliveryPolicy* policy );
+
+        /**
+         * Gets a pointer to this Consumer's Redelivery Policy object, the Consumer
+         * retains ownership of this pointer so the caller should not delete it.
+         *
+         * @returns a Pointer to a RedeliveryPolicy that is in use by this Consumer.
+         */
+        RedeliveryPolicy* getRedeliveryPolicy() const;
+
+        /**
+         * Sets the Exception that has caused this Consumer to be in a failed state.
+         *
+         * @param error
+         *      The error that is to be thrown when a Receive call is made.
+         */
+        void setFailureError( decaf::lang::Exception* error );
+
+        /**
+         * Gets the error that caused this Consumer to be in a Failed state, or NULL if
+         * there is no Error.
+         *
+         * @returns pointer to the error that faulted this Consumer or NULL.
+         */
+        decaf::lang::Exception* getFailureError() const;
+
+    protected:
+
+        /**
+         * Used by synchronous receive methods to wait for messages to come in.
+         * @param timeout - The maximum number of milliseconds to wait before
+         * returning.
+         *
+         * If -1, it will block until a messages is received or this consumer
+         * is closed.
+         * If 0, will not block at all.  If > 0, will wait at a maximum the
+         * specified number of milliseconds before returning.
+         * @return the message, if received within the allotted time.
+         * Otherwise NULL.
+         *
+         * @throws InvalidStateException if this consumer is closed upon
+         *         entering this method.
+         */
+        Pointer<MessageDispatch> dequeue( long long timeout );
+
+        /**
+         * Pre-consume processing
+         * @param dispatch - the message being consumed.
+         */
+        void beforeMessageIsConsumed(
+            const Pointer<commands::MessageDispatch>& dispatch );
+
+        /**
+         * Post-consume processing
+         * @param dispatch - the consumed message
+         * @param messageExpired - flag indicating if the message has expired.
+         */
+        void afterMessageIsConsumed(
+            const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired );
+
+    private:
+
+        // Using options from the Destination URI override any settings that are
+        // defined for this consumer.
+        void applyDestinationOptions( const Pointer<commands::ConsumerInfo>& info );
+
+        // If supported sends a message pull request to the service provider asking
+        // for the delivery of a new message.  This is used in the case where the
+        // service provider has been configured with a zero prefetch or is only
+        // capable of delivering messages on a pull basis.  No request is made if
+        // there are already messages in the unconsumed queue since there's no need
+        // for a server round-trip in that instance.
+        void sendPullRequest( long long timeout );
+
+        // Checks for the closed state and throws if so.
+        void checkClosed() const;
+
+        // Sends an ack as needed in order to keep them coming in if the current
+        // ack mode allows the consumer to receive up to the prefetch limit before
+        // an real ack is sent.
+        void ackLater( const Pointer<commands::MessageDispatch>& message, int ackType );
+
+        // Create an Ack Message that acks all messages that have been delivered so far.
+        Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
+
+        // Should Acks be sent on each dispatched message
+        bool isAutoAcknowledgeEach() const;
+
+        // Can Acks be batched for less network overhead.
+        bool isAutoAcknowledgeBatch() const;
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQCONSUMERKERNEL_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp?rev=1305601&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp Mon Mar 26 21:11:12 2012
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQProducerKernel.h"
+
+#include <cms/Message.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/util/CMSExceptionSupport.h>
+#include <activemq/util/ActiveMQProperties.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/InvalidStateException.h>
+#include <decaf/lang/exceptions/IllegalArgumentException.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/Boolean.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::core;
+using namespace activemq::core::kernels;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQProducerKernel::ActiveMQProducerKernel(ActiveMQSession* session,
+                                               const Pointer<commands::ProducerId>& producerId,
+                                               const Pointer<ActiveMQDestination>& destination,
+                                               long long sendTimeout) : disableTimestamps(false),
+                                                                        disableMessageId(false),
+                                                                        defaultDeliveryMode(cms::Message::DEFAULT_DELIVERY_MODE),
+                                                                        defaultPriority(cms::Message::DEFAULT_MSG_PRIORITY),
+                                                                        defaultTimeToLive(cms::Message::DEFAULT_TIME_TO_LIVE),
+                                                                        sendTimeout(sendTimeout),
+                                                                        session(session),
+                                                                        producerInfo(),
+                                                                        closed(false),
+                                                                        memoryUsage(),
+                                                                        destination() {
+
+    if (session == NULL || producerId == NULL) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQProducerKernel::ActiveMQProducerKernel - Init with NULL Session" );
+    }
+
+    this->producerInfo.reset(new ProducerInfo());
+
+    this->producerInfo->setProducerId(producerId);
+    this->producerInfo->setDestination(destination);
+    this->producerInfo->setWindowSize(session->getConnection()->getProducerWindowSize());
+
+    // Get any options specified in the destination and apply them to the
+    // ProducerInfo object.
+    if (destination != NULL) {
+        const ActiveMQProperties& options = destination->getOptions();
+        this->producerInfo->setDispatchAsync(
+            Boolean::parseBoolean(options.getProperty("producer.dispatchAsync", "false")));
+
+        this->destination = destination.dynamicCast<cms::Destination> ();
+    }
+
+    // TODO - Check for need of MemoryUsage if there's a producer Windows size
+    //        and the Protocol version is greater than 3.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQProducerKernel::~ActiveMQProducerKernel() {
+    try {
+        close();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::close() {
+
+    try{
+
+        if (!this->isClosed()) {
+
+            dispose();
+
+            // Remove at the Broker Side, if this fails the producer has already
+            // been removed from the session and connection objects so its safe
+            // for an exception to be thrown.
+            Pointer<RemoveInfo> info(new RemoveInfo);
+            info->setObjectId(this->producerInfo->getProducerId());
+            this->session->oneway(info);
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::dispose() {
+
+    if (!this->isClosed()) {
+        this->session->removeProducer(this->producerInfo->getProducerId());
+        this->closed = true;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::send(cms::Message* message) {
+
+    try {
+        this->checkClosed();
+        this->send(this->destination.get(), message);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::send(cms::Message* message, int deliveryMode, int priority, long long timeToLive) {
+
+    try {
+        this->checkClosed();
+        this->send(this->destination.get(), message, deliveryMode, priority, timeToLive);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Message* message) {
+
+    try {
+        this->checkClosed();
+        this->send(destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::send(const cms::Destination* destination, cms::Message* message,
+                                  int deliveryMode, int priority, long long timeToLive) {
+
+    try {
+
+        this->checkClosed();
+
+        if (destination == NULL) {
+
+            if (this->producerInfo->getDestination() == NULL) {
+                throw cms::UnsupportedOperationException("A destination must be specified.", NULL);
+            }
+
+            throw cms::InvalidDestinationException("Don't understand null destinations", NULL);
+        }
+
+        const cms::Destination* dest;
+        if (destination == dynamic_cast<cms::Destination*> (this->producerInfo->getDestination().get())) {
+            dest = destination;
+        } else if (this->producerInfo->getDestination() == NULL) {
+            // TODO - We should apply a Transform so ensure the user hasn't create some
+            //        external cms::Destination implementation.
+            dest = destination;
+        } else {
+            throw cms::UnsupportedOperationException(
+                string("This producer can only send messages to: ") +
+                this->producerInfo->getDestination()->getPhysicalName(), NULL);
+        }
+
+        if (dest == NULL) {
+            throw cms::CMSException("No destination specified", NULL);
+        }
+
+        // configure the message
+        message->setCMSDestination(dest);
+        message->setCMSDeliveryMode(deliveryMode);
+        message->setCMSPriority(priority);
+
+        long long expiration = 0LL;
+
+        if (!disableTimestamps) {
+
+            long long timeStamp = System::currentTimeMillis();
+            message->setCMSTimestamp(timeStamp);
+            if (timeToLive > 0LL) {
+                expiration = timeToLive + timeStamp;
+            }
+        }
+
+        message->setCMSExpiration(expiration);
+
+        // Delegate send to the session so that it can choose how to
+        // send the message.
+        this->session->send(message, this, this->memoryUsage.get());
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::onProducerAck(const commands::ProducerAck& ack) {
+
+    try{
+
+        if (this->memoryUsage.get() != NULL) {
+            this->memoryUsage->decreaseUsage(ack.getSize());
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducerKernel::checkClosed() const {
+    if (closed) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQProducerKernel - Producer Already Closed" );
+    }
+}
+

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h?rev=1305601&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h Mon Mar 26 21:11:12 2012
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQPRODUCERKERNEL_H_
+#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQPRODUCERKERNEL_H_
+
+#include <cms/MessageProducer.h>
+#include <cms/Message.h>
+#include <cms/Destination.h>
+#include <cms/DeliveryMode.h>
+
+#include <activemq/util/Config.h>
+#include <activemq/util/MemoryUsage.h>
+#include <activemq/commands/ProducerInfo.h>
+#include <activemq/commands/ProducerAck.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <memory>
+
+namespace activemq {
+namespace core {
+    class ActiveMQSession;
+namespace kernels {
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API ActiveMQProducerKernel : public cms::MessageProducer {
+    private:
+
+        // Disable sending timestamps
+        bool disableTimestamps;
+
+        // Disable adding a Message Id
+        bool disableMessageId;
+
+        // The default delivery Mode of this Producer
+        int defaultDeliveryMode;
+
+        // The default priority Level to send at
+        int defaultPriority;
+
+        // The default time to live value for messages in milliseconds
+        long long defaultTimeToLive;
+
+        // The default Send Timeout for this Producer.
+        long long sendTimeout;
+
+        // Session that this producer sends to.
+        ActiveMQSession* session;
+
+        // This Producers protocol specific info object
+        Pointer<commands::ProducerInfo> producerInfo;
+
+        // Boolean that indicates if the consumer has been closed
+        bool closed;
+
+        // Memory Usage Class, created only if the Producer is tracking its usage.
+        std::auto_ptr<util::MemoryUsage> memoryUsage;
+
+        // The Destination assigned at creation, NULL if not assigned.
+        Pointer<cms::Destination> destination;
+
+    private:
+
+        ActiveMQProducerKernel(const ActiveMQProducerKernel&);
+        ActiveMQProducerKernel& operator=(const ActiveMQProducerKernel&);
+
+    public:
+
+        /**
+         * Constructor, creates an instance of an ActiveMQProducerKernel
+         *
+         * @param session
+         *        The Session which is the parent of this Producer.
+         * @param producerId
+         *        Pointer to a ProducerId object which identifies this producer.
+         * @param destination
+         *        The assigned Destination this Producer sends to, or null if not set.
+         *        The Producer does not own the Pointer passed.
+         * @param sendTimeout
+         *        The configured send timeout for this Producer.
+         */
+        ActiveMQProducerKernel(ActiveMQSession* session,
+                               const Pointer<commands::ProducerId>& producerId,
+                               const Pointer<commands::ActiveMQDestination>& destination,
+                               long long sendTimeout);
+
+        virtual ~ActiveMQProducerKernel();
+
+    public:  // cms::MessageProducer methods.
+
+        virtual void close();
+
+        virtual void send(cms::Message* message);
+
+        virtual void send(cms::Message* message, int deliveryMode, int priority, long long timeToLive);
+
+        virtual void send(const cms::Destination* destination, cms::Message* message);
+
+        virtual void send(const cms::Destination* destination, cms::Message* message,
+                          int deliveryMode, int priority, long long timeToLive);
+
+        /**
+         * Sets the delivery mode for this Producer
+         * @param mode - The DeliveryMode to use for Message sends.
+         */
+        virtual void setDeliveryMode(int mode) {
+            this->defaultDeliveryMode = mode;
+        }
+
+        /**
+         * Gets the delivery mode for this Producer
+         * @return The DeliveryMode
+         */
+        virtual int getDeliveryMode() const {
+            return this->defaultDeliveryMode;
+        }
+
+        /**
+         * Sets if Message Ids are disabled for this Producer
+         * @param value - boolean indicating enable / disable (true / false)
+         */
+        virtual void setDisableMessageID(bool value) {
+            this->disableMessageId = value;
+        }
+
+        /**
+         * Gets if Message Ids are disabled for this Producer
+         * @return a boolean indicating state enable / disable (true / false) for MessageIds.
+         */
+        virtual bool getDisableMessageID() const {
+            return this->disableMessageId;
+        }
+
+        /**
+         * Sets if Message Time Stamps are disabled for this Producer
+         * @param value - boolean indicating enable / disable (true / false)
+         */
+        virtual void setDisableMessageTimeStamp(bool value) {
+            this->disableTimestamps = value;
+        }
+
+        /**
+         * Gets if Message Time Stamps are disabled for this Producer
+         * @returns boolean indicating state of enable / disable (true / false)
+         */
+        virtual bool getDisableMessageTimeStamp() const {
+            return this->disableTimestamps;
+        }
+
+        /**
+         * Sets the Priority that this Producers sends messages at
+         * @param priority int value for Priority level
+         */
+        virtual void setPriority(int priority) {
+            this->defaultPriority = priority;
+        }
+
+        /**
+         * Gets the Priority level that this producer sends messages at
+         * @return int based priority level
+         */
+        virtual int getPriority() const {
+            return this->defaultPriority;
+        }
+
+        /**
+         * Sets the Time to Live that this Producers sends messages with
+         * @param time The new default time to live value in milliseconds.
+         */
+        virtual void setTimeToLive(long long time) {
+            this->defaultTimeToLive = time;
+        }
+
+        /**
+         * Gets the Time to Live that this producer sends messages with
+         * @return The default time to live value in milliseconds.
+         */
+        virtual long long getTimeToLive() const {
+            return this->defaultTimeToLive;
+        }
+
+        /**
+         * Sets the Send Timeout that this Producers sends messages with
+         * @param time The new default send timeout value in milliseconds.
+         */
+        virtual void setSendTimeout(long long time) {
+            this->sendTimeout = time;
+        }
+
+        /**
+         * Gets the Send Timeout that this producer sends messages with
+         * @return The default send timeout value in milliseconds.
+         */
+        virtual long long getSendTimeout() const {
+            return this->sendTimeout;
+        }
+
+    public:
+
+        /**
+         * @returns true if this Producer has been closed.
+         */
+        bool isClosed() const {
+            return this->closed;
+        }
+
+        /**
+         * Retries this object ProducerInfo pointer
+         * @return ProducerInfo Reference
+         */
+        const Pointer<commands::ProducerInfo>& getProducerInfo() const {
+            this->checkClosed();
+            return this->producerInfo;
+        }
+
+        /**
+         * Retries this object ProducerId or NULL if closed.
+         * @return ProducerId Reference
+         */
+        const Pointer<commands::ProducerId>& getProducerId() const {
+            this->checkClosed();
+            return this->producerInfo->getProducerId();
+        }
+
+        /**
+         * Handles the work of Processing a ProducerAck Command from the Broker.
+         * @param ack - The ProducerAck message received from the Broker.
+         */
+        virtual void onProducerAck(const commands::ProducerAck& ack);
+
+        /**
+         * Performs Producer object cleanup but doesn't attempt to send the Remove command
+         * to the broker.  Called when the parent resource if closed first to avoid the message
+         * send and avoid any exceptions that might be thrown from an attempt to send a remove
+         * command to a failed transport.
+         */
+        void dispose();
+
+    private:
+
+       // Checks for the closed state and throws if so.
+       void checkClosed() const;
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQPRODUCERKERNEL_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.cpp Mon Mar 26 21:11:12 2012
@@ -20,7 +20,7 @@
 using namespace cms;
 
 ////////////////////////////////////////////////////////////////////////////////
-Closeable::~Closeable() throw() {
+Closeable::~Closeable() {
 
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.h?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.h Mon Mar 26 21:11:12 2012
@@ -36,7 +36,7 @@ namespace cms{
 
     public:
 
-        virtual ~Closeable() throw();
+        virtual ~Closeable();
 
         /**
          * Closes this object and deallocates the appropriate resources.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.cpp Mon Mar 26 21:11:12 2012
@@ -20,6 +20,6 @@
 using namespace cms;
 
 ////////////////////////////////////////////////////////////////////////////////
-Connection::~Connection() throw() {
+Connection::~Connection() {
 
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h Mon Mar 26 21:11:12 2012
@@ -72,7 +72,7 @@ namespace cms{
     {
     public:
 
-        virtual ~Connection() throw();
+        virtual ~Connection();
 
         /**
          * Closes this connection as well as any Sessions created from it

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageProducer.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageProducer.cpp Mon Mar 26 21:11:12 2012
@@ -20,7 +20,7 @@
 using namespace cms;
 
 ////////////////////////////////////////////////////////////////////////////////
-MessageProducer::~MessageProducer() throw() {
+MessageProducer::~MessageProducer() {
 
 }
 



Mime
View raw message