Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 43CD49879 for ; Mon, 26 Mar 2012 21:11:50 +0000 (UTC) Received: (qmail 5748 invoked by uid 500); 26 Mar 2012 21:11:50 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 5718 invoked by uid 500); 26 Mar 2012 21:11:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 5711 invoked by uid 99); 26 Mar 2012 21:11:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Mar 2012 21:11:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Mar 2012 21:11:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 773382388BA6 for ; Mon, 26 Mar 2012 21:11:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1305601 [2/5] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/cmsutil/ main/activemq/core/ main/activemq/core/kernels/ main/cms/ test/ Date: Mon, 26 Mar 2012 21:11:14 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120326211115.773382388BA6@eris.apache.org> Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Mar 26 21:11:12 2012 @@ -16,33 +16,11 @@ */ #include "ActiveMQConsumer.h" -#include -#include -#include -#include -#include -#include -#include -#include #include #include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include #include -#include #include #include @@ -50,535 +28,98 @@ using namespace std; using namespace activemq; using namespace activemq::util; using namespace activemq::core; -using namespace activemq::commands; +using namespace activemq::core::kernels; using namespace activemq::exceptions; -using namespace activemq::threads; using namespace decaf::lang; using namespace decaf::lang::exceptions; using namespace decaf::util; using namespace decaf::util::concurrent; //////////////////////////////////////////////////////////////////////////////// -namespace activemq{ +namespace activemq { namespace core { - class ActiveMQConsumerMembers { + class ActiveMQConsumerData { private: - ActiveMQConsumerMembers( const ActiveMQConsumerMembers& ); - ActiveMQConsumerMembers& operator= ( const ActiveMQConsumerMembers& ); + ActiveMQConsumerData(const ActiveMQConsumerData&); + ActiveMQConsumerData& operator=(const ActiveMQConsumerData&); public: - cms::MessageListener* listener; - decaf::util::concurrent::Mutex listenerMutex; - AtomicBoolean deliveringAcks; - AtomicBoolean started; - Pointer unconsumedMessages; - decaf::util::LinkedList< decaf::lang::Pointer > dispatchedMessages; - long long lastDeliveredSequenceId; - Pointer pendingAck; - int deliveredCounter; - int additionalWindowSize; - volatile bool synchronizationRegistered; - bool clearDispatchList; - bool inProgressClearRequiredFlag; - long long redeliveryDelay; - Pointer redeliveryPolicy; - Pointer failureError; - Pointer scheduler; - - ActiveMQConsumerMembers() : listener(NULL), - listenerMutex(), - deliveringAcks(), - started(), - unconsumedMessages(), - dispatchedMessages(), - lastDeliveredSequenceId(0), - pendingAck(), - deliveredCounter(0), - additionalWindowSize(0), - synchronizationRegistered(false), - clearDispatchList(false), - inProgressClearRequiredFlag(false), - redeliveryDelay(0), - redeliveryPolicy(), - failureError(), - scheduler() { - } - - }; - - /** - * Class used to deal with consumers in an active transaction. This - * class calls back into the consumer when the transaction is Committed or - * Rolled Back to process that event. - */ - class TransactionSynhcronization : public Synchronization { - private: - - ActiveMQConsumer* consumer; - - private: - - TransactionSynhcronization( const TransactionSynhcronization& ); - TransactionSynhcronization& operator= ( const TransactionSynhcronization& ); - - public: - - TransactionSynhcronization( ActiveMQConsumer* consumer ) : consumer(consumer) { - - if( consumer == NULL ) { - throw NullPointerException( - __FILE__, __LINE__, "Synchronization Created with NULL Consumer."); - } - } - - virtual ~TransactionSynhcronization() {} - - virtual void beforeEnd() { - consumer->acknowledge(); - consumer->setSynchronizationRegistered( false ); - } - - virtual void afterCommit() { - consumer->commit(); - consumer->setSynchronizationRegistered( false ); - } - - virtual void afterRollback() { - consumer->rollback(); - consumer->setSynchronizationRegistered( false ); - } - - }; - - /** - * Class used to Hook a consumer that has been closed into the Transaction - * it is currently a part of. Once the Transaction has been Committed or - * Rolled back this Synchronization can finish the Close of the consumer. - */ - class CloseSynhcronization : public Synchronization { - private: - - ActiveMQConsumer* consumer; - - private: - - CloseSynhcronization( const CloseSynhcronization& ); - CloseSynhcronization& operator= ( const CloseSynhcronization& ); - - public: - - CloseSynhcronization( ActiveMQConsumer* consumer ) : consumer(consumer) { - - if( consumer == NULL ) { - throw NullPointerException( - __FILE__, __LINE__, "Synchronization Created with NULL Consumer."); - } - } - - virtual ~CloseSynhcronization() {} - - virtual void beforeEnd() { - } - - virtual void afterCommit() { - consumer->doClose(); - } - - virtual void afterRollback() { - consumer->doClose(); - } - - }; - - /** - * ActiveMQAckHandler used to support Client Acknowledge mode. - */ - class ClientAckHandler : public ActiveMQAckHandler { - private: - - ActiveMQSession* session; - - private: + Pointer kernel; - ClientAckHandler( const ClientAckHandler& ); - ClientAckHandler& operator= ( const ClientAckHandler& ); - - public: - - ClientAckHandler( ActiveMQSession* session ) : session(session) { - if( session == NULL ) { - throw NullPointerException( - __FILE__, __LINE__, "Ack Handler Created with NULL Session."); - } - } - - void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) { - - try { - this->session->acknowledge(); - } - AMQ_CATCH_ALL_THROW_CMSEXCEPTION() - } - }; - - /** - * ActiveMQAckHandler used to enable the Individual Acknowledge mode. - */ - class IndividualAckHandler : public ActiveMQAckHandler { - private: - - ActiveMQConsumer* consumer; - Pointer dispatch; - - private: - - IndividualAckHandler( const IndividualAckHandler& ); - IndividualAckHandler& operator= ( const IndividualAckHandler& ); - - public: - - IndividualAckHandler( ActiveMQConsumer* consumer, const Pointer& dispatch ) : - consumer(consumer), dispatch(dispatch) { - - if( consumer == NULL ) { - throw NullPointerException( - __FILE__, __LINE__, "Ack Handler Created with NULL consumer."); - } - } - - void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) { - - try { - - if( this->dispatch != NULL ) { - this->consumer->acknowledge( this->dispatch ); - this->dispatch.reset( NULL ); - } - } - AMQ_CATCH_ALL_THROW_CMSEXCEPTION() - } - }; - - /** - * Class used to Start a Consumer's dispatch queue asynchronously from the - * configured Scheduler. - */ - class StartConsumerTask : public Runnable { - private: - - ActiveMQConsumer* consumer; - - private: - - StartConsumerTask( const StartConsumerTask& ); - StartConsumerTask& operator= ( const StartConsumerTask& ); - - public: - - StartConsumerTask( ActiveMQConsumer* consumer ) : Runnable(), consumer(NULL) { - - if( consumer == NULL ) { - throw NullPointerException( - __FILE__, __LINE__, "Synchronization Created with NULL Consumer."); - } - - this->consumer = consumer; - } - - virtual ~StartConsumerTask() {} - - virtual void run() { - try{ - if(!this->consumer->isClosed()) { - this->consumer->start(); - } - } catch(cms::CMSException& ex) { - // TODO - Need Connection onAsyncException method. - } + ActiveMQConsumerData(const Pointer& kernel) : kernel(kernel) { } }; }} //////////////////////////////////////////////////////////////////////////////// -ActiveMQConsumer::ActiveMQConsumer( ActiveMQSession* session, - const Pointer& id, - const Pointer& destination, - const std::string& name, - const std::string& selector, - int prefetch, - int maxPendingMessageCount, - bool noLocal, - bool browser, - bool dispatchAsync, - cms::MessageListener* listener ) : internal(NULL), session(NULL), consumerInfo() { - - if( session == NULL ) { - throw ActiveMQException( - __FILE__, __LINE__, - "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session" ); - } - - if( destination == NULL ) { - throw ActiveMQException( - __FILE__, __LINE__, - "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Destination" ); - } - - if( destination->getPhysicalName() == "" ) { - throw ActiveMQException( - __FILE__, __LINE__, - "ActiveMQConsumer::ActiveMQConsumer - Destination given has no Physical Name." ); - } - - this->internal = new ActiveMQConsumerMembers(); - - Pointer consumerInfo( new ConsumerInfo() ); +ActiveMQConsumer::ActiveMQConsumer(const Pointer& kernel) : MessageConsumer(), config(NULL) { - consumerInfo->setConsumerId( id ); - consumerInfo->setDestination( destination ); - consumerInfo->setSubscriptionName( name ); - consumerInfo->setSelector( selector ); - consumerInfo->setPrefetchSize( prefetch ); - consumerInfo->setMaximumPendingMessageLimit( maxPendingMessageCount ); - consumerInfo->setBrowser( browser ); - consumerInfo->setDispatchAsync( dispatchAsync ); - consumerInfo->setNoLocal( noLocal ); - - // Initialize Consumer Data - this->session = session; - this->consumerInfo = consumerInfo; - this->internal->lastDeliveredSequenceId = -1; - this->internal->synchronizationRegistered = false; - this->internal->additionalWindowSize = 0; - this->internal->deliveredCounter = 0; - this->internal->clearDispatchList = false; - this->internal->inProgressClearRequiredFlag = false; - this->internal->listener = NULL; - this->internal->redeliveryDelay = 0; - this->internal->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone() ); - this->internal->scheduler = this->session->getScheduler(); - - if( this->session->getConnection()->isMessagePrioritySupported() ) { - this->internal->unconsumedMessages.reset( new SimplePriorityMessageDispatchChannel() ); - } else { - this->internal->unconsumedMessages.reset( new FifoMessageDispatchChannel() ); + if (kernel == NULL) { + throw ActiveMQException(__FILE__, __LINE__, + "ActiveMQConsumer::ActiveMQConsumer - Constructor called with NULL Kernel"); } - if( listener != NULL ) { - this->setMessageListener( listener ); - } - - applyDestinationOptions(this->consumerInfo); + this->config = new ActiveMQConsumerData(kernel); } //////////////////////////////////////////////////////////////////////////////// -ActiveMQConsumer::~ActiveMQConsumer() throw() { +ActiveMQConsumer::~ActiveMQConsumer() throw () { try { - try{ - this->close(); - } catch(...) {} + try { + this->config->kernel->close(); + } catch (...) { + } - delete this->internal; + delete this->config; } - AMQ_CATCH_NOTHROW( ActiveMQException ) - AMQ_CATCHALL_NOTHROW( ) + AMQ_CATCH_NOTHROW(ActiveMQException) + AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// void ActiveMQConsumer::start() { - if( this->internal->unconsumedMessages->isClosed() ) { - return; - } - - this->internal->started.set( true ); - this->internal->unconsumedMessages->start(); - this->session->wakeup(); -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::stop() { - this->internal->started.set( false ); - this->internal->unconsumedMessages->stop(); -} - -//////////////////////////////////////////////////////////////////////////////// -bool ActiveMQConsumer::isClosed() const { - return this->internal->unconsumedMessages->isClosed(); -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::close() { + try { - try{ - if( !this->isClosed() ) { - if( this->session->getTransactionContext() != NULL && - this->session->getTransactionContext()->isInTransaction() ) { - - // TODO - Currently we can do this since the consumer could be - // deleted right after the close call so it won't stick around - // long enough to clean up the transaction data. For now we - // just have to close badly. - // - //Pointer sync( new CloseSynhcronization( this ) ); - //this->transaction->addSynchronization( sync ); - - doClose(); - - throw UnsupportedOperationException( - __FILE__, __LINE__, - "The Consumer is still in an Active Transaction, commit it first." ); - - } else { - doClose(); - } + if (this->config->kernel->isClosed()) { + return; } + + this->config->kernel->start(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::doClose() { - - try { - - dispose(); - // Remove at the Broker Side, consumer has been removed from the local - // Session and Connection objects so if the remote call to remove throws - // it is okay to propagate to the client. - Pointer info( new RemoveInfo ); - info->setObjectId( this->consumerInfo->getConsumerId() ); - info->setLastDeliveredSequenceId( this->internal->lastDeliveredSequenceId ); - this->session->oneway( info ); - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) +void ActiveMQConsumer::stop() { + this->config->kernel->stop(); } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::dispose() { - - try{ - if( !this->isClosed() ) { - - if( !session->isTransacted() ) { - deliverAcks(); - } - - this->internal->started.set( false ); - - // Identifies any errors encountered during shutdown. - bool haveException = false; - ActiveMQException error; - - // Purge all the pending messages - try{ - this->internal->unconsumedMessages->clear(); - } catch ( ActiveMQException& ex ){ - if( !haveException ){ - ex.setMark( __FILE__, __LINE__ ); - error = ex; - haveException = true; - } - } - - // Stop and Wakeup all sync consumers. - this->internal->unconsumedMessages->close(); - - if( this->session->isIndividualAcknowledge() ) { - // For IndividualAck Mode we need to unlink the ack handler to remove a - // cyclic reference to the MessageDispatch that brought the message to us. - synchronized( &internal->dispatchedMessages ) { - std::auto_ptr< Iterator< Pointer > > iter( this->internal->dispatchedMessages.iterator() ); - while( iter->hasNext() ) { - iter->next()->getMessage()->setAckHandler( Pointer() ); - } - - this->internal->dispatchedMessages.clear(); - } - } - - // Remove this Consumer from the Connections set of Dispatchers - this->session->removeConsumer( this->consumerInfo->getConsumerId() ); - - // If we encountered an error, propagate it. - if( haveException ){ - error.setMark( __FILE__, __LINE__ ); - throw error; - } - } - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) +bool ActiveMQConsumer::isClosed() const { + return this->config->kernel->isClosed(); } //////////////////////////////////////////////////////////////////////////////// -std::string ActiveMQConsumer::getMessageSelector() const { +void ActiveMQConsumer::close() { try { - // Fetch the Selector - return this->consumerInfo->getSelector(); + this->config->kernel->close(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -decaf::lang::Pointer ActiveMQConsumer::dequeue( long long timeout ) { +std::string ActiveMQConsumer::getMessageSelector() const { try { - - this->checkClosed(); - - // Calculate the deadline - long long deadline = 0; - if( timeout > 0 ) { - deadline = System::currentTimeMillis() + timeout; - } - - // Loop until the time is up or we get a non-expired message - while( true ) { - - Pointer dispatch = this->internal->unconsumedMessages->dequeue( timeout ); - if( dispatch == NULL ) { - - if( timeout > 0 && !this->internal->unconsumedMessages->isClosed() ) { - timeout = Math::max( deadline - System::currentTimeMillis(), 0LL ); - } else { - if( this->internal->failureError != NULL ) { - throw CMSExceptionSupport::create(*this->internal->failureError); - } else { - return Pointer(); - } - } - - } else if( dispatch->getMessage() == NULL ) { - - return Pointer(); - - } else if( dispatch->getMessage()->isExpired() ) { - - beforeMessageIsConsumed( dispatch ); - afterMessageIsConsumed( dispatch, true ); - if( timeout > 0 ) { - timeout = Math::max( deadline - System::currentTimeMillis(), 0LL ); - } - - continue; - } - - // Return the message. - return dispatch; - } - - return Pointer(); + return this->config->kernel->getMessageSelector(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } @@ -586,65 +127,17 @@ decaf::lang::Pointer Ac //////////////////////////////////////////////////////////////////////////////// cms::Message* ActiveMQConsumer::receive() { - try{ - - this->checkClosed(); - - // Send a request for a new message if needed - this->sendPullRequest( 0 ); - - // Wait for the next message. - Pointer message = dequeue( -1 ); - if( message == NULL ) { - return NULL; - } - - // Message pre-processing - beforeMessageIsConsumed( message ); - - // Need to clone the message because the user is responsible for freeing - // its copy of the message. - cms::Message* clonedMessage = - dynamic_cast( message->getMessage()->cloneDataStructure() ); - - // Post processing (may result in the message being deleted) - afterMessageIsConsumed( message, false ); - - // Return the cloned message. - return clonedMessage; + try { + return this->config->kernel->receive(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -cms::Message* ActiveMQConsumer::receive( int millisecs ) { +cms::Message* ActiveMQConsumer::receive(int millisecs) { try { - - this->checkClosed(); - - // Send a request for a new message if needed - this->sendPullRequest( millisecs ); - - // Wait for the next message. - Pointer message = dequeue( millisecs ); - if( message == NULL ) { - return NULL; - } - - // Message preprocessing - beforeMessageIsConsumed( message ); - - // Need to clone the message because the user is responsible for freeing - // its copy of the message. - cms::Message* clonedMessage = - dynamic_cast( message->getMessage()->cloneDataStructure() ); - - // Post processing (may result in the message being deleted) - afterMessageIsConsumed( message, false ); - - // Return the cloned message. - return clonedMessage; + return this->config->kernel->receive(millisecs); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } @@ -653,751 +146,46 @@ cms::Message* ActiveMQConsumer::receive( cms::Message* ActiveMQConsumer::receiveNoWait() { try { - - this->checkClosed(); - - // Send a request for a new message if needed - this->sendPullRequest( -1 ); - - // Get the next available message, if there is one. - Pointer message = dequeue( 0 ); - if( message == NULL ) { - return NULL; - } - - // Message preprocessing - beforeMessageIsConsumed( message ); - - // Need to clone the message because the user is responsible for freeing - // its copy of the message. - cms::Message* clonedMessage = - dynamic_cast( message->getMessage()->cloneDataStructure() ); - - // Post processing (may result in the message being deleted) - afterMessageIsConsumed( message, false ); - - // Return the cloned message. - return clonedMessage; + return this->config->kernel->receiveNoWait(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::setMessageListener( cms::MessageListener* listener ) { - - try{ - - this->checkClosed(); - - if( this->consumerInfo->getPrefetchSize() == 0 && listener != NULL ) { - throw ActiveMQException( - __FILE__, __LINE__, - "Cannot deliver async when Prefetch is Zero, set Prefecth to at least One."); - } - - if( listener != NULL ) { - - // Now that we have a valid message listener, - // redispatch all the messages that it missed. - - bool wasStarted = session->isStarted(); - if( wasStarted ) { - session->stop(); - } - - synchronized( &(this->internal->listenerMutex) ) { - this->internal->listener = listener; - } - - this->session->redispatch( *(this->internal->unconsumedMessages) ); - - if( wasStarted ) { - this->session->start(); - } - } else { - synchronized( &(this->internal->listenerMutex) ) { - this->internal->listener = NULL; - } - } - } - AMQ_CATCH_ALL_THROW_CMSEXCEPTION() -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer& dispatch ) { - - // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then - // we set the handler in the message to this object and send it out. - if( session->isClientAcknowledge() ) { - Pointer ackHandler( new ClientAckHandler( this->session ) ); - dispatch->getMessage()->setAckHandler( ackHandler ); - } else if( session->isIndividualAcknowledge() ) { - Pointer ackHandler( new IndividualAckHandler( this, dispatch ) ); - dispatch->getMessage()->setAckHandler( ackHandler ); - } - - this->internal->lastDeliveredSequenceId = - dispatch->getMessage()->getMessageId()->getBrokerSequenceId(); - - if( !isAutoAcknowledgeBatch() ) { - - // When not in an Auto - synchronized( &this->internal->dispatchedMessages ) { - this->internal->dispatchedMessages.addFirst( dispatch ); - } - - if( this->session->isTransacted() ) { - ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED ); - } - } -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::afterMessageIsConsumed( const Pointer& message, - bool messageExpired ) { - - try{ - - if( this->internal->unconsumedMessages->isClosed() ) { - return; - } - - if( messageExpired == true ) { - ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED ); - } - - if( session->isTransacted() ) { - return; - } else if( isAutoAcknowledgeEach() ) { - - if( this->internal->deliveringAcks.compareAndSet( false, true ) ) { - - synchronized( &this->internal->dispatchedMessages ) { - if( !this->internal->dispatchedMessages.isEmpty() ) { - Pointer ack = makeAckForAllDeliveredMessages( - ActiveMQConstants::ACK_TYPE_CONSUMED ); - - if( ack != NULL ) { - this->internal->dispatchedMessages.clear(); - session->oneway( ack ); - } - } - } - - this->internal->deliveringAcks.set( false ); - } - - } else if( isAutoAcknowledgeBatch() ) { - ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED ); - } else if( session->isClientAcknowledge() || session->isIndividualAcknowledge() ) { - - bool messageUnackedByConsumer = false; - - synchronized( &this->internal->dispatchedMessages ) { - messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message); - } - - if( messageUnackedByConsumer ) { - this->ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED ); - } - - } else { - throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" ); - } - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::deliverAcks() { - - try{ - - Pointer ack; - - if( this->internal->deliveringAcks.compareAndSet( false, true ) ) { - - if( isAutoAcknowledgeEach() ) { - - synchronized( &this->internal->dispatchedMessages ) { - - ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED ); - - if( ack != NULL ) { - this->internal->dispatchedMessages.clear(); - } else { - ack.swap( internal->pendingAck ); - } - } - - } else if( this->internal->pendingAck != NULL && - this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED ) { - - ack.swap( this->internal->pendingAck ); - } - - if( ack != NULL ) { - - try{ - this->session->oneway( ack ); - } catch(...) {} - - } else { - this->internal->deliveringAcks.set( false ); - } - } - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::ackLater( const Pointer& dispatch, int ackType ) { - - // Don't acknowledge now, but we may need to let the broker know the - // consumer got the message to expand the pre-fetch window - if( session->isTransacted() ) { - session->doStartTransaction(); - if( !this->internal->synchronizationRegistered ) { - this->internal->synchronizationRegistered = true; - - Pointer sync( new TransactionSynhcronization( this ) ); - this->session->getTransactionContext()->addSynchronization( sync ); - } - } - - // The delivered message list is only needed for the recover method - // which is only used with client ack. - this->internal->deliveredCounter++; - - Pointer oldPendingAck = this->internal->pendingAck; - this->internal->pendingAck.reset( new MessageAck() ); - this->internal->pendingAck->setConsumerId( dispatch->getConsumerId() ); - this->internal->pendingAck->setAckType( (unsigned char)ackType ); - this->internal->pendingAck->setDestination( dispatch->getDestination() ); - this->internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() ); - this->internal->pendingAck->setMessageCount( internal->deliveredCounter ); - - if( oldPendingAck == NULL ) { - this->internal->pendingAck->setFirstMessageId( this->internal->pendingAck->getLastMessageId() ); - } else if ( oldPendingAck->getAckType() == this->internal->pendingAck->getAckType() ) { - this->internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() ); - } else { - // old pending ack being superseded by ack of another type, if is is not a delivered - // ack and hence important, send it now so it is not lost. - if( oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED ) { - session->oneway( oldPendingAck ); - } - } - - if( session->isTransacted() ) { - this->internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId() ); - } - - if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( internal->deliveredCounter - internal->additionalWindowSize ) ) { - session->oneway( this->internal->pendingAck ); - this->internal->pendingAck.reset( NULL ); - this->internal->deliveredCounter = 0; - this->internal->additionalWindowSize = 0; - } -} - -//////////////////////////////////////////////////////////////////////////////// -Pointer ActiveMQConsumer::makeAckForAllDeliveredMessages( int type ) { - - synchronized( &this->internal->dispatchedMessages ) { - - if( !this->internal->dispatchedMessages.isEmpty() ) { - - Pointer dispatched = this->internal->dispatchedMessages.getFirst(); - - Pointer ack( new MessageAck() ); - ack->setAckType( (unsigned char)type ); - ack->setConsumerId( dispatched->getConsumerId() ); - ack->setDestination( dispatched->getDestination() ); - ack->setMessageCount( (int)this->internal->dispatchedMessages.size() ); - ack->setLastMessageId( dispatched->getMessage()->getMessageId() ); - ack->setFirstMessageId( this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId() ); - - return ack; - } - } - - return Pointer(); -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::acknowledge( const Pointer& dispatch ) { - - try{ - - this->checkClosed(); - - if( this->session->isIndividualAcknowledge() ) { - - Pointer ack( new MessageAck() ); - ack->setAckType( ActiveMQConstants::ACK_TYPE_CONSUMED ); - ack->setConsumerId( this->consumerInfo->getConsumerId() ); - ack->setDestination( this->consumerInfo->getDestination() ); - ack->setMessageCount( 1 ); - ack->setLastMessageId( dispatch->getMessage()->getMessageId() ); - ack->setFirstMessageId( dispatch->getMessage()->getMessageId() ); - - session->oneway( ack ); - - synchronized( &this->internal->dispatchedMessages ) { - std::auto_ptr< Iterator< Pointer > > iter( this->internal->dispatchedMessages.iterator() ); - while( iter->hasNext() ) { - if( iter->next() == dispatch ) { - iter->remove(); - break; - } - } - } - - } else { - throw IllegalStateException( - __FILE__, __LINE__, - "Session is not in IndividualAcknowledge mode." ); - } - } - AMQ_CATCH_ALL_THROW_CMSEXCEPTION() -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::acknowledge() { - - try{ - - synchronized( &this->internal->dispatchedMessages ) { - - // Acknowledge all messages so far. - Pointer ack = - makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED ); - - if( ack == NULL ) { - return; - } - - if( session->isTransacted() ) { - session->doStartTransaction(); - ack->setTransactionId( session->getTransactionContext()->getTransactionId() ); - } - - session->oneway( ack ); - this->internal->pendingAck.reset( NULL ); - - // Adjust the counters - this->internal->deliveredCounter = - Math::max( 0, this->internal->deliveredCounter - (int)this->internal->dispatchedMessages.size()); - this->internal->additionalWindowSize = - Math::max(0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size()); - - if( !session->isTransacted() ) { - this->internal->dispatchedMessages.clear(); - } - } - } - AMQ_CATCH_ALL_THROW_CMSEXCEPTION() -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::commit() { - - synchronized( &(this->internal->dispatchedMessages) ) { - this->internal->dispatchedMessages.clear(); - } - this->internal->redeliveryDelay = 0; -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::rollback() { - - synchronized( this->internal->unconsumedMessages.get() ) { - - synchronized( &this->internal->dispatchedMessages ) { - if( this->internal->dispatchedMessages.isEmpty() ) { - return; - } - - // Only increase the redelivery delay after the first redelivery.. - Pointer lastMsg = this->internal->dispatchedMessages.getFirst(); - const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter(); - if( currentRedeliveryCount > 0 ) { - this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay( internal->redeliveryDelay ); - } else { - this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay(); - } - - Pointer firstMsgId = - this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId(); - - std::auto_ptr< Iterator< Pointer > > iter( internal->dispatchedMessages.iterator() ); - - while( iter->hasNext() ) { - Pointer message = iter->next()->getMessage(); - message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 ); - } - - if( this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES && - lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries() ) { - - // We need to NACK the messages so that they get sent to the DLQ. - // Acknowledge the last message. - Pointer ack( new MessageAck() ); - ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON ); - ack->setConsumerId( this->consumerInfo->getConsumerId() ); - ack->setDestination( lastMsg->getDestination() ); - ack->setMessageCount( (int)this->internal->dispatchedMessages.size() ); - ack->setLastMessageId( lastMsg->getMessage()->getMessageId() ); - ack->setFirstMessageId( firstMsgId ); - - session->oneway( ack ); - // Adjust the window size. - this->internal->additionalWindowSize = - Math::max( 0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size() ); - this->internal->redeliveryDelay = 0; - - } else { - - // only redelivery_ack after first delivery - if( currentRedeliveryCount > 0 ) { - Pointer ack( new MessageAck() ); - ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED ); - ack->setConsumerId( this->consumerInfo->getConsumerId() ); - ack->setDestination( lastMsg->getDestination() ); - ack->setMessageCount( (int)this->internal->dispatchedMessages.size() ); - ack->setLastMessageId( lastMsg->getMessage()->getMessageId() ); - ack->setFirstMessageId( firstMsgId ); - - session->oneway( ack ); - } - - // stop the delivery of messages. - this->internal->unconsumedMessages->stop(); - - std::auto_ptr< Iterator< Pointer > > iter( this->internal->dispatchedMessages.iterator() ); - - while( iter->hasNext() ) { - this->internal->unconsumedMessages->enqueueFirst( iter->next() ); - } - - if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed() ) { - // TODO - Can't do this until we can control object lifetime. - // Start up the delivery again a little later. - // this->internal->scheduler->executeAfterDelay( - // new StartConsumerTask(this), internal->redeliveryDelay); - start(); - } else { - start(); - } - - } - this->internal->deliveredCounter -= (int)internal->dispatchedMessages.size(); - this->internal->dispatchedMessages.clear(); - } - } - - if( this->internal->listener != NULL ) { - session->redispatch( *this->internal->unconsumedMessages ); - } -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::dispatch( const Pointer& dispatch ) { - - try { - - synchronized( this->internal->unconsumedMessages.get() ) { - - clearMessagesInProgress(); - if( this->internal->clearDispatchList ) { - // we are reconnecting so lets flush the in progress - // messages - this->internal->clearDispatchList = false; - this->internal->unconsumedMessages->clear(); - } - - if( !this->internal->unconsumedMessages->isClosed() ) { - - // Don't dispatch expired messages, ack it and then destroy it - if( dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired() ) { - this->ackLater( dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED ); - - // stop now, don't queue - return; - } - - synchronized( &this->internal->listenerMutex ) { - // If we have a listener, send the message. - if( this->internal->listener != NULL && internal->unconsumedMessages->isRunning() ) { - - // Preprocessing. - beforeMessageIsConsumed( dispatch ); - - // Notify the listener - this->internal->listener->onMessage( - dynamic_cast( dispatch->getMessage().get() ) ); - - // Postprocessing - afterMessageIsConsumed( dispatch, false ); - - } else { - - // No listener, add it to the unconsumed messages list - this->internal->unconsumedMessages->enqueue( dispatch ); - } - } - } - } - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::sendPullRequest( long long timeout ) { +void ActiveMQConsumer::setMessageListener(cms::MessageListener* listener) { try { - - this->checkClosed(); - - // There are still local message, consume them first. - if( !this->internal->unconsumedMessages->isEmpty() ) { - return; - } - - if( this->consumerInfo->getPrefetchSize() == 0 ) { - - Pointer messagePull( new MessagePull() ); - messagePull->setConsumerId( this->consumerInfo->getConsumerId() ); - messagePull->setDestination( this->consumerInfo->getDestination() ); - messagePull->setTimeout( timeout ); - - this->session->oneway( messagePull ); - } - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::checkClosed() const { - if( this->isClosed() ) { - throw ActiveMQException( - __FILE__, __LINE__, - "ActiveMQConsumer - Consumer Already Closed" ); - } -} - -//////////////////////////////////////////////////////////////////////////////// -bool ActiveMQConsumer::iterate() { - - synchronized( &this->internal->listenerMutex ) { - - if( this->internal->listener != NULL ) { - - Pointer dispatch = internal->unconsumedMessages->dequeueNoWait(); - if( dispatch != NULL ) { - - try { - beforeMessageIsConsumed( dispatch ); - this->internal->listener->onMessage( - dynamic_cast( dispatch->getMessage().get() ) ); - afterMessageIsConsumed( dispatch, false ); - } catch( ActiveMQException& ex ) { - this->session->fire( ex ); - } - - return true; - } - } - } - - return false; -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::inProgressClearRequired() { - - this->internal->inProgressClearRequiredFlag = true; - // Clears dispatched messages async to avoid lock contention with inprogress acks. - this->internal->clearDispatchList = true; -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::clearMessagesInProgress() { - if( this->internal->inProgressClearRequiredFlag ) { - synchronized( this->internal->unconsumedMessages.get() ) { - if( this->internal->inProgressClearRequiredFlag ) { - - // TODO - Rollback duplicates. - - // allow dispatch on this connection to resume - this->session->getConnection()->setTransportInterruptionProcessingComplete(); - this->internal->inProgressClearRequiredFlag = false; - } - } + this->config->kernel->setMessageListener(listener); } -} - -//////////////////////////////////////////////////////////////////////////////// -bool ActiveMQConsumer::isAutoAcknowledgeEach() const { - return this->session->isAutoAcknowledge() || - ( this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue() ); -} - -//////////////////////////////////////////////////////////////////////////////// -bool ActiveMQConsumer::isAutoAcknowledgeBatch() const { - return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue(); + AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// int ActiveMQConsumer::getMessageAvailableCount() const { - return this->internal->unconsumedMessages->size(); -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::applyDestinationOptions( const Pointer& info ) { - - decaf::lang::Pointer amqDestination = info->getDestination(); - - // Get any options specified in the destination and apply them to the - // ConsumerInfo object. - const ActiveMQProperties& options = amqDestination->getOptions(); - - std::string noLocalStr = - core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_NOLOCAL ); - if( options.hasProperty( noLocalStr ) ) { - info->setNoLocal( Boolean::parseBoolean( - options.getProperty( noLocalStr ) ) ); - } - - std::string selectorStr = - core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_SELECTOR ); - if( options.hasProperty( selectorStr ) ) { - info->setSelector( options.getProperty( selectorStr ) ); - } - - std::string priorityStr = - core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PRIORITY ); - if( options.hasProperty( priorityStr ) ) { - info->setPriority( (unsigned char)Integer::parseInt( options.getProperty( priorityStr ) ) ); - } - - std::string dispatchAsyncStr = - core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_DISPATCHASYNC ); - if( options.hasProperty( dispatchAsyncStr ) ) { - info->setDispatchAsync( - Boolean::parseBoolean( options.getProperty( dispatchAsyncStr ) ) ); - } - - std::string exclusiveStr = - core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_EXCLUSIVE ); - if( options.hasProperty( exclusiveStr ) ) { - info->setExclusive( - Boolean::parseBoolean( options.getProperty( exclusiveStr ) ) ); - } - - std::string maxPendingMsgLimitStr = - core::ActiveMQConstants::toString( - core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT ); - - if( options.hasProperty( maxPendingMsgLimitStr ) ) { - info->setMaximumPendingMessageLimit( - Integer::parseInt( - options.getProperty( maxPendingMsgLimitStr ) ) ); - } - - std::string prefetchSizeStr = - core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE ); - if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr ) ) { - info->setPrefetchSize( - Integer::parseInt( options.getProperty( prefetchSizeStr, "1000" ) ) ); - } - - std::string retroactiveStr = - core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_RETROACTIVE ); - if( options.hasProperty( retroactiveStr ) ) { - info->setRetroactive( - Boolean::parseBoolean( options.getProperty( retroactiveStr ) ) ); - } - - std::string networkSubscriptionStr = "consumer.networkSubscription"; - - if( options.hasProperty( networkSubscriptionStr ) ) { - info->setNetworkSubscription( - Boolean::parseBoolean( - options.getProperty( networkSubscriptionStr ) ) ); - } -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::setRedeliveryPolicy( RedeliveryPolicy* policy ) { - if( policy != NULL ) { - this->internal->redeliveryPolicy.reset( policy ); - } + return this->config->kernel->getMessageAvailableCount(); } //////////////////////////////////////////////////////////////////////////////// RedeliveryPolicy* ActiveMQConsumer::getRedeliveryPolicy() const { - return this->internal->redeliveryPolicy.get(); + return this->config->kernel->getRedeliveryPolicy(); } //////////////////////////////////////////////////////////////////////////////// cms::MessageListener* ActiveMQConsumer::getMessageListener() const { - return this->internal->listener; + return this->config->kernel->getMessageListener(); } //////////////////////////////////////////////////////////////////////////////// const Pointer& ActiveMQConsumer::getConsumerInfo() const { - this->checkClosed(); - return this->consumerInfo; + return this->config->kernel->getConsumerInfo(); } //////////////////////////////////////////////////////////////////////////////// const Pointer& ActiveMQConsumer::getConsumerId() const { - this->checkClosed(); - return this->consumerInfo->getConsumerId(); -} - -//////////////////////////////////////////////////////////////////////////////// -bool ActiveMQConsumer::isSynchronizationRegistered() const { - return this->internal->synchronizationRegistered; -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::setSynchronizationRegistered( bool value ) { - this->internal->synchronizationRegistered = value; -} - -//////////////////////////////////////////////////////////////////////////////// -long long ActiveMQConsumer::getLastDeliveredSequenceId() const { - return this->internal->lastDeliveredSequenceId; -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::setLastDeliveredSequenceId( long long value ) { - this->internal->lastDeliveredSequenceId = value; -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumer::setFailureError( decaf::lang::Exception* error ) { - if( error != NULL ) { - this->internal->failureError.reset( error->clone() ); - } + return this->config->kernel->getConsumerId(); } //////////////////////////////////////////////////////////////////////////////// decaf::lang::Exception* ActiveMQConsumer::getFailureError() const { - if( this->internal->failureError == NULL ) { - return NULL; - } - - return this->internal->failureError.get(); + return this->config->kernel->getFailureError(); } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1305601&r1=1305600&r2=1305601&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Mar 26 21:11:12 2012 @@ -23,46 +23,25 @@ #include #include +#include #include #include -#include -#include -#include #include -#include -#include #include -#include namespace activemq{ namespace core{ using decaf::lang::Pointer; - using decaf::util::concurrent::atomic::AtomicBoolean; class ActiveMQSession; - class ActiveMQConsumerMembers; + class ActiveMQConsumerData; - class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer, - public Dispatcher - { + class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer { private: - /** - * Internal Class that holds Members of this class, allows for changes without API breakage. - */ - ActiveMQConsumerMembers* internal; - - /** - * The ActiveMQSession that owns this class instance. - */ - ActiveMQSession* session; - - /** - * The ConsumerInfo object for this class instance. - */ - Pointer consumerInfo; + ActiveMQConsumerData* config; private: @@ -72,23 +51,17 @@ namespace core{ public: /** - * Constructor + * Create a new ActiveMQConsumer that contains the pointer to the Kernel + * that implement the real MessageConsumer functionality. + * + * @param ActiveMQConsumerKernel + * This Consumer's functionality kernel. */ - ActiveMQConsumer( ActiveMQSession* session, - const Pointer& id, - const Pointer& destination, - const std::string& name, - const std::string& selector, - int prefetch, - int maxPendingMessageCount, - bool noLocal, - bool browser, - bool dispatchAsync, - cms::MessageListener* listener ); + ActiveMQConsumer(const Pointer& kernel); virtual ~ActiveMQConsumer() throw(); - public: // Interface Implementation + public: // Interface Implementation for cms::MessageConsumer virtual void start(); @@ -98,58 +71,17 @@ namespace core{ virtual cms::Message* receive(); - virtual cms::Message* receive( int millisecs ); + virtual cms::Message* receive(int millisecs); virtual cms::Message* receiveNoWait(); - virtual void setMessageListener( cms::MessageListener* listener ); + virtual void setMessageListener(cms::MessageListener* listener); virtual cms::MessageListener* getMessageListener() const; virtual std::string getMessageSelector() const; - virtual void acknowledge( const Pointer& dispatch ); - - public: // Dispatcher Methods - - virtual void dispatch( const Pointer& message ); - - public: // ActiveMQConsumer Methods - - /** - * Method called to acknowledge all messages that have been received so far. - * - * @throw CMSException if an error occurs while ack'ing the message. - */ - void acknowledge(); - - /** - * Called to Commit the current set of messages in this Transaction - * - * @throw ActiveMQException if an error occurs while performing the operation. - */ - void commit(); - - /** - * Called to Roll back the current set of messages in this Transaction - * - * @throw ActiveMQException if an error occurs while performing the operation. - */ - void rollback(); - - /** - * Performs the actual close operation on this consumer - * - * @throw ActiveMQException if an error occurs while performing the operation. - */ - void doClose(); - - /** - * Cleans up this objects internal resources. - * - * @throw ActiveMQException if an error occurs while performing the operation. - */ - void dispose(); + public: /** * Get the Consumer information for this consumer @@ -169,58 +101,6 @@ namespace core{ bool isClosed() const; /** - * Has this Consumer Transaction Synchronization been added to the transaction - * @return true if the synchronization has been added. - */ - bool isSynchronizationRegistered() const ; - - /** - * Sets the Synchronization Registered state of this consumer. - * @param value - true if registered false otherwise. - */ - void setSynchronizationRegistered( bool value ); - - /** - * Deliver any pending messages to the registered MessageListener if there - * is one, return true if not all dispatched, or false if no listener or all - * pending messages have been dispatched. - */ - bool iterate(); - - /** - * Forces this consumer to send all pending acks to the broker. - * - * @throw ActiveMQException if an error occurs while performing the operation. - */ - void deliverAcks(); - - /** - * Called on a Failover to clear any pending messages. - */ - void clearMessagesInProgress(); - - /** - * Signals that a Failure occurred and that anything in-progress in the - * consumer should be cleared. - */ - void inProgressClearRequired(); - - /** - * Gets the currently set Last Delivered Sequence Id - * - * @returns long long containing the sequence id of the last delivered Message. - */ - long long getLastDeliveredSequenceId() const; - - /** - * Sets the value of the Last Delivered Sequence Id - * - * @param value - * The new value to assign to the Last Delivered Sequence Id property. - */ - void setLastDeliveredSequenceId( long long value ); - - /** * @returns the number of Message's this consumer is waiting to Dispatch. */ int getMessageAvailableCount() const; @@ -234,7 +114,7 @@ namespace core{ * @param policy * Pointer to a Redelivery Policy object that his Consumer will use. */ - void setRedeliveryPolicy( RedeliveryPolicy* policy ); + void setRedeliveryPolicy(RedeliveryPolicy* policy); /** * Gets a pointer to this Consumer's Redelivery Policy object, the Consumer @@ -245,14 +125,6 @@ namespace core{ RedeliveryPolicy* getRedeliveryPolicy() const; /** - * Sets the Exception that has caused this Consumer to be in a failed state. - * - * @param error - * The error that is to be thrown when a Receive call is made. - */ - void setFailureError( decaf::lang::Exception* error ); - - /** * Gets the error that caused this Consumer to be in a Failed state, or NULL if * there is no Error. * @@ -260,71 +132,6 @@ namespace core{ */ decaf::lang::Exception* getFailureError() const; - protected: - - /** - * Used by synchronous receive methods to wait for messages to come in. - * @param timeout - The maximum number of milliseconds to wait before - * returning. - * - * If -1, it will block until a messages is received or this consumer - * is closed. - * If 0, will not block at all. If > 0, will wait at a maximum the - * specified number of milliseconds before returning. - * @return the message, if received within the allotted time. - * Otherwise NULL. - * - * @throws InvalidStateException if this consumer is closed upon - * entering this method. - */ - Pointer dequeue( long long timeout ); - - /** - * Pre-consume processing - * @param dispatch - the message being consumed. - */ - void beforeMessageIsConsumed( - const Pointer& dispatch ); - - /** - * Post-consume processing - * @param dispatch - the consumed message - * @param messageExpired - flag indicating if the message has expired. - */ - void afterMessageIsConsumed( - const Pointer& dispatch, bool messageExpired ); - - private: - - // Using options from the Destination URI override any settings that are - // defined for this consumer. - void applyDestinationOptions( const Pointer& info ); - - // If supported sends a message pull request to the service provider asking - // for the delivery of a new message. This is used in the case where the - // service provider has been configured with a zero prefetch or is only - // capable of delivering messages on a pull basis. No request is made if - // there are already messages in the unconsumed queue since there's no need - // for a server round-trip in that instance. - void sendPullRequest( long long timeout ); - - // Checks for the closed state and throws if so. - void checkClosed() const; - - // Sends an ack as needed in order to keep them coming in if the current - // ack mode allows the consumer to receive up to the prefetch limit before - // an real ack is sent. - void ackLater( const Pointer& message, int ackType ); - - // Create an Ack Message that acks all messages that have been delivered so far. - Pointer makeAckForAllDeliveredMessages( int type ); - - // Should Acks be sent on each dispatched message - bool isAutoAcknowledgeEach() const; - - // Can Acks be batched for less network overhead. - bool isAutoAcknowledgeBatch() const; - }; }} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Mon Mar 26 21:11:12 2012 @@ -17,73 +17,32 @@ #include "ActiveMQProducer.h" #include -#include -#include -#include #include -#include #include -#include -#include -#include -#include using namespace std; using namespace activemq; using namespace activemq::util; using namespace activemq::core; -using namespace activemq::commands; +using namespace activemq::core::kernels; using namespace activemq::exceptions; -using namespace decaf::util; using namespace decaf::lang; using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// -ActiveMQProducer::ActiveMQProducer( ActiveMQSession* session, - const Pointer& producerId, - const Pointer& destination, - long long sendTimeout ) : disableTimestamps(false), - disableMessageId(false), - defaultDeliveryMode(cms::Message::DEFAULT_DELIVERY_MODE), - defaultPriority(cms::Message::DEFAULT_MSG_PRIORITY), - defaultTimeToLive(cms::Message::DEFAULT_TIME_TO_LIVE), - sendTimeout(sendTimeout), - session(session), - producerInfo(), - closed(false), - memoryUsage(), - destination() { +ActiveMQProducer::ActiveMQProducer(Pointer kernel) : kernel(kernel) { - if( session == NULL || producerId == NULL ) { + if (kernel == NULL) { throw ActiveMQException( __FILE__, __LINE__, - "ActiveMQProducer::ActiveMQProducer - Init with NULL Session" ); + "ActiveMQProducer::ActiveMQProducer - Initialized with NULL Kernel"); } - - this->producerInfo.reset( new ProducerInfo() ); - - this->producerInfo->setProducerId( producerId ); - this->producerInfo->setDestination( destination ); - this->producerInfo->setWindowSize( session->getConnection()->getProducerWindowSize() ); - - // Get any options specified in the destination and apply them to the - // ProducerInfo object. - if( destination != NULL ) { - const ActiveMQProperties& options = destination->getOptions(); - this->producerInfo->setDispatchAsync( Boolean::parseBoolean( - options.getProperty( "producer.dispatchAsync", "false" )) ); - - this->destination = destination.dynamicCast(); - } - - // TODO - Check for need of MemoryUsage if there's a producer Windows size - // and the Protocol version is greater than 3. } //////////////////////////////////////////////////////////////////////////////// -ActiveMQProducer::~ActiveMQProducer() throw() { +ActiveMQProducer::~ActiveMQProducer() { try { - close(); + this->kernel->close(); } AMQ_CATCH_NOTHROW( ActiveMQException ) AMQ_CATCHALL_NOTHROW( ) @@ -93,145 +52,44 @@ ActiveMQProducer::~ActiveMQProducer() th void ActiveMQProducer::close() { try{ - - if( !this->isClosed() ) { - - dispose(); - - // Remove at the Broker Side, if this fails the producer has already - // been removed from the session and connection objects so its safe - // for an exception to be thrown. - Pointer info( new RemoveInfo ); - info->setObjectId( this->producerInfo->getProducerId() ); - this->session->oneway( info ); - } + this->kernel->close(); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQProducer::dispose() { - - if( !this->isClosed() ) { - this->session->removeProducer( this ); - this->closed = true; - } -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQProducer::send( cms::Message* message ) { +void ActiveMQProducer::send(cms::Message* message) { try { - - this->checkClosed(); - - this->send( this->destination.get(), message ); + this->kernel->send(message); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQProducer::send( cms::Message* message, int deliveryMode, int priority, long long timeToLive ) { +void ActiveMQProducer::send(cms::Message* message, int deliveryMode, int priority, long long timeToLive) { try { - - this->checkClosed(); - - this->send( this->destination.get(), message, deliveryMode, priority, timeToLive ); + this->kernel->send(message, deliveryMode, priority, timeToLive); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQProducer::send( const cms::Destination* destination, cms::Message* message ) { +void ActiveMQProducer::send(const cms::Destination* destination, cms::Message* message) { try { - - this->checkClosed(); - - this->send( destination, message, defaultDeliveryMode, defaultPriority, defaultTimeToLive ); + this->kernel->send(destination, message); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQProducer::send( const cms::Destination* destination, cms::Message* message, - int deliveryMode, int priority, long long timeToLive ) { +void ActiveMQProducer::send(const cms::Destination* destination, cms::Message* message, + int deliveryMode, int priority, long long timeToLive) { try { - - this->checkClosed(); - - if( destination == NULL ) { - - if( this->producerInfo->getDestination() == NULL ) { - throw cms::UnsupportedOperationException( "A destination must be specified.", NULL ); - } - - throw cms::InvalidDestinationException( "Don't understand null destinations", NULL ); - } - - const cms::Destination* dest; - if( destination == dynamic_cast( this->producerInfo->getDestination().get() ) ) { - dest = destination; - } else if( this->producerInfo->getDestination() == NULL ) { - - // TODO - We should apply a Transform so ensure the user hasn't create some - // external cms::Destination implementation. - dest = destination; - } else { - throw cms::UnsupportedOperationException( string( "This producer can only send messages to: " ) + - this->producerInfo->getDestination()->getPhysicalName(), NULL ); - } - - if( dest == NULL ) { - throw cms::CMSException( "No destination specified", NULL ); - } - - // configure the message - message->setCMSDestination( dest ); - message->setCMSDeliveryMode( deliveryMode ); - message->setCMSPriority( priority ); - - long long expiration = 0LL; - - if( !disableTimestamps ) { - - long long timeStamp = System::currentTimeMillis(); - message->setCMSTimestamp( timeStamp ); - if( timeToLive > 0LL ) { - expiration = timeToLive + timeStamp; - } - } - - message->setCMSExpiration( expiration ); - - // Delegate send to the session so that it can choose how to - // send the message. - this->session->send( message, this, this->memoryUsage.get() ); + this->kernel->send(destination, message, deliveryMode, priority, timeToLive); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQProducer::onProducerAck( const commands::ProducerAck& ack ) { - - try{ - - if( this->memoryUsage.get() != NULL ) { - this->memoryUsage->decreaseUsage( ack.getSize() ); - } - } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) -} - -//////////////////////////////////////////////////////////////////////////////// -void ActiveMQProducer::checkClosed() const { - if( closed ) { - throw ActiveMQException( - __FILE__, __LINE__, - "ActiveMQProducer - Producer Already Closed" ); - } -} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?rev=1305601&r1=1305600&r2=1305601&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Mon Mar 26 21:11:12 2012 @@ -23,12 +23,8 @@ #include #include -#include #include -#include -#include - -#include +#include namespace activemq{ namespace core{ @@ -40,85 +36,45 @@ namespace core{ class AMQCPP_API ActiveMQProducer : public cms::MessageProducer { private: - // Disable sending timestamps - bool disableTimestamps; - - // Disable adding a Message Id - bool disableMessageId; - - // The default delivery Mode of this Producer - int defaultDeliveryMode; - - // The default priority Level to send at - int defaultPriority; - - // The default time to live value for messages in milliseconds - long long defaultTimeToLive; - - // The default Send Timeout for this Producer. - long long sendTimeout; - - // Session that this producer sends to. - ActiveMQSession* session; - - // This Producers protocol specific info object - Pointer producerInfo; - - // Boolean that indicates if the consumer has been closed - bool closed; - - // Memory Usage Class, created only if the Producer is tracking its usage. - std::auto_ptr memoryUsage; - - // The Destination assigned at creation, NULL if not assigned. - Pointer destination; + Pointer kernel; private: - ActiveMQProducer( const ActiveMQProducer& ); - ActiveMQProducer& operator= ( const ActiveMQProducer& ); + ActiveMQProducer(const ActiveMQProducer&); + ActiveMQProducer& operator=(const ActiveMQProducer&); public: /** - * Constructor, creates an instance of an ActiveMQProducer + * Constructor, creates an instance of an ActiveMQProducer to wrap the + * provided ActiveMQProducerKernel. * - * @param session - * The Session which is the parent of this Producer. - * @param producerId - * Pointer to a ProducerId object which identifies this producer. - * @param destination - * The assigned Destination this Producer sends to, or null if not set. - * The Producer does not own the Pointer passed. - * @param sendTimeout - * The configured send timeout for this Producer. - */ - ActiveMQProducer( ActiveMQSession* session, - const Pointer& producerId, - const Pointer& destination, - long long sendTimeout ); + * @param kernel + * The Producer kernel pointer that implements the producers functionality. + */ + ActiveMQProducer(Pointer kernel); - virtual ~ActiveMQProducer() throw(); + virtual ~ActiveMQProducer(); public: // cms::MessageProducer methods. virtual void close(); - virtual void send( cms::Message* message ); + virtual void send(cms::Message* message); - virtual void send( cms::Message* message, int deliveryMode, int priority, long long timeToLive ); + virtual void send(cms::Message* message, int deliveryMode, int priority, long long timeToLive); - virtual void send( const cms::Destination* destination, cms::Message* message ); + virtual void send(const cms::Destination* destination, cms::Message* message); - virtual void send( const cms::Destination* destination, cms::Message* message, - int deliveryMode, int priority, long long timeToLive ); + virtual void send(const cms::Destination* destination, cms::Message* message, + int deliveryMode, int priority, long long timeToLive); /** * Sets the delivery mode for this Producer * @param mode - The DeliveryMode to use for Message sends. */ - virtual void setDeliveryMode( int mode ) { - this->defaultDeliveryMode = mode; + virtual void setDeliveryMode(int mode) { + this->kernel->setDeliveryMode(mode); } /** @@ -126,15 +82,15 @@ namespace core{ * @return The DeliveryMode */ virtual int getDeliveryMode() const { - return this->defaultDeliveryMode; + return this->kernel->getDeliveryMode(); } /** * Sets if Message Ids are disabled for this Producer * @param value - boolean indicating enable / disable (true / false) */ - virtual void setDisableMessageID( bool value ) { - this->disableMessageId = value; + virtual void setDisableMessageID(bool value) { + this->kernel->setDisableMessageID(value); } /** @@ -142,15 +98,15 @@ namespace core{ * @return a boolean indicating state enable / disable (true / false) for MessageIds. */ virtual bool getDisableMessageID() const { - return this->disableMessageId; + return this->kernel->getDisableMessageID(); } /** * Sets if Message Time Stamps are disabled for this Producer * @param value - boolean indicating enable / disable (true / false) */ - virtual void setDisableMessageTimeStamp( bool value ) { - this->disableTimestamps = value; + virtual void setDisableMessageTimeStamp(bool value) { + this->kernel->setDisableMessageTimeStamp(value); } /** @@ -158,15 +114,15 @@ namespace core{ * @returns boolean indicating state of enable / disable (true / false) */ virtual bool getDisableMessageTimeStamp() const { - return this->disableTimestamps; + return this->kernel->getDisableMessageTimeStamp(); } /** * Sets the Priority that this Producers sends messages at * @param priority int value for Priority level */ - virtual void setPriority( int priority ) { - this->defaultPriority = priority; + virtual void setPriority(int priority) { + this->kernel->setPriority(priority); } /** @@ -174,15 +130,15 @@ namespace core{ * @return int based priority level */ virtual int getPriority() const { - return this->defaultPriority; + return this->kernel->getPriority(); } /** * Sets the Time to Live that this Producers sends messages with * @param time The new default time to live value in milliseconds. */ - virtual void setTimeToLive( long long time ) { - this->defaultTimeToLive = time; + virtual void setTimeToLive(long long time) { + this->kernel->setTimeToLive(time); } /** @@ -190,15 +146,15 @@ namespace core{ * @return The default time to live value in milliseconds. */ virtual long long getTimeToLive() const { - return this->defaultTimeToLive; + return this->kernel->getTimeToLive(); } /** * Sets the Send Timeout that this Producers sends messages with * @param time The new default send timeout value in milliseconds. */ - virtual void setSendTimeout( long long time ) { - this->sendTimeout = time; + virtual void setSendTimeout(long long time) { + this->kernel->setSendTimeout(time); } /** @@ -206,7 +162,7 @@ namespace core{ * @return The default send timeout value in milliseconds. */ virtual long long getSendTimeout() const { - return this->sendTimeout; + return this->kernel->getSendTimeout(); } public: @@ -215,7 +171,7 @@ namespace core{ * @returns true if this Producer has been closed. */ bool isClosed() const { - return this->closed; + return this->kernel->isClosed(); } /** @@ -223,8 +179,7 @@ namespace core{ * @return ProducerInfo Reference */ const Pointer& getProducerInfo() const { - this->checkClosed(); - return this->producerInfo; + return this->kernel->getProducerInfo(); } /** @@ -232,29 +187,8 @@ namespace core{ * @return ProducerId Reference */ const Pointer& getProducerId() const { - this->checkClosed(); - return this->producerInfo->getProducerId(); + return this->kernel->getProducerId(); } - - /** - * Handles the work of Processing a ProducerAck Command from the Broker. - * @param ack - The ProducerAck message received from the Broker. - */ - virtual void onProducerAck( const commands::ProducerAck& ack ); - - /** - * Performs Producer object cleanup but doesn't attempt to send the Remove command - * to the broker. Called when the parent resource if closed first to avoid the message - * send and avoid any exceptions that might be thrown from an attempt to send a remove - * command to a failed transport. - */ - void dispose(); - - private: - - // Checks for the closed state and throws if so. - void checkClosed() const; - }; }}