From commits-return-13693-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Thu Jun 03 20:28:13 2010 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 4106 invoked from network); 3 Jun 2010 20:28:13 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 3 Jun 2010 20:28:13 -0000 Received: (qmail 7959 invoked by uid 500); 3 Jun 2010 20:28:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 7931 invoked by uid 500); 3 Jun 2010 20:28:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 7920 invoked by uid 99); 3 Jun 2010 20:28:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Jun 2010 20:28:13 +0000 X-ASF-Spam-Status: No, hits=-1383.0 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Jun 2010 20:28:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 876D1238897D; Thu, 3 Jun 2010 20:27:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r951143 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ Date: Thu, 03 Jun 2010 20:27:51 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100603202751.876D1238897D@eris.apache.org> Author: tabish Date: Thu Jun 3 20:27:50 2010 New Revision: 951143 URL: http://svn.apache.org/viewvc?rev=951143&view=rev Log: https://issues.apache.org/activemq/browse/AMQCPP-293 Make use of the new Redelivery and Prefetch policies and clean up some older settings that are no longer used. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu Jun 3 20:27:50 2010 @@ -240,10 +240,11 @@ ActiveMQConsumer::ActiveMQConsumer( Acti this->lastDeliveredSequenceId = -1; this->synchronizationRegistered = false; this->additionalWindowSize = 0; - this->redeliveryDelay = 0; this->deliveredCounter = 0; this->clearDispatchList = false; this->listener = NULL; + this->redeliveryDelay = 0; + this->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone() ); if( listener != NULL ) { this->setMessageListener( listener ); @@ -891,7 +892,7 @@ void ActiveMQConsumer::rollback() throw( Pointer lastMsg = dispatchedMessages.front(); const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter(); if( currentRedeliveryCount > 0 ) { - redeliveryDelay = session->getTransactionContext()->getRedeliveryDelay(); + redeliveryDelay = this->redeliveryPolicy->getRedeliveryDelay( redeliveryDelay ); } Pointer firstMsgId = @@ -904,7 +905,8 @@ void ActiveMQConsumer::rollback() throw( message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 ); } - if( lastMsg->getRedeliveryCounter() > this->session->getTransactionContext()->getMaximumRedeliveries() ) { + if( this->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES && + lastMsg->getRedeliveryCounter() > this->redeliveryPolicy->getMaximumRedeliveries() ) { // We need to NACK the messages so that they get sent to the DLQ. // Acknowledge the last message. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Thu Jun 3 20:27:50 2010 @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -111,11 +112,6 @@ namespace core{ int additionalWindowSize; /** - * Time to wait before restarting delivery of rollback messages. - */ - long long redeliveryDelay; - - /** * Has the Synchronization been added for this transaction */ volatile bool synchronizationRegistered; @@ -125,6 +121,16 @@ namespace core{ */ bool clearDispatchList; + /** + * The redelivery delay used for the last set of redeliveries. + */ + long long redeliveryDelay; + + /** + * The policy to use when Message Redelivery is in progress. + */ + std::auto_ptr redeliveryPolicy; + private: ActiveMQConsumer( const ActiveMQConsumer& ); @@ -344,6 +350,31 @@ namespace core{ */ int getMessageAvailableCount() const; + /** + * Sets the RedeliveryPolicy this Consumer should use when a rollback is + * performed on a transacted Consumer. The Consumer takes ownership of the + * passed pointer. The Consumer's redelivery policy can never be null, a + * call to this method with a NULL pointer is ignored. + * + * @param policy + * Pointer to a Redelivery Policy object that his Consumer will use. + */ + void setRedeliveryPolicy( RedeliveryPolicy* policy ) { + if( policy != NULL ) { + this->redeliveryPolicy.reset( policy ); + } + } + + /** + * Gets a pointer to this Consumer's Redelivery Policy object, the Consumer + * retains ownership of this pointer so the caller should not delete it. + * + * @returns a Pointer to a RedeliveryPolicy that is in use by this Consumer. + */ + RedeliveryPolicy* getRedeliveryPolicy() const { + return this->redeliveryPolicy.get(); + } + protected: /** Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp Thu Jun 3 20:27:50 2010 @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include @@ -246,10 +246,12 @@ void ActiveMQQueueBrowser::waitForMessag ActiveMQConsumer* ActiveMQQueueBrowser::createConsumer() { this->browseDone.set( false ); - // TODO - get config options from connection and prefetch policy. + + int prefetch = this->session->getConnection()->getPrefetchPolicy()->getQueueBrowserPrefetch(); + std::auto_ptr consumer( new Browser( this, session, consumerId, destination, "", selector, - 500, 0, false, true, false, NULL ) ); + prefetch, 0, false, true, dispatchAsync, NULL ) ); try{ this->session->addConsumer( consumer.get() ); Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Jun 3 20:27:50 2010 @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -317,10 +318,17 @@ cms::MessageConsumer* ActiveMQSession::c Pointer dest( amqDestination->cloneDataStructure() ); + int prefetch = 0; + if( dest->isTopic() ) { + prefetch = this->connection->getPrefetchPolicy()->getTopicPrefetch(); + } else { + prefetch = this->connection->getPrefetchPolicy()->getQueuePrefetch(); + } + // Create the consumer instance. std::auto_ptr consumer( new ActiveMQConsumer( this, this->getNextConsumerId(), - dest, "", selector, 1000, 0, noLocal, + dest, "", selector, prefetch, 0, noLocal, false, this->connection->isDispatchAsync(), NULL ) ); try{ @@ -368,8 +376,9 @@ cms::MessageConsumer* ActiveMQSession::c // Create the consumer instance. std::auto_ptr consumer( new ActiveMQConsumer( this, this->getNextConsumerId(), - dest, name, selector, 1000, 0, noLocal, - false, this->connection->isDispatchAsync(), NULL ) ); + dest, name, selector, + this->connection->getPrefetchPolicy()->getDurableTopicPrefetch(), + 0, noLocal, false, this->connection->isDispatchAsync(), NULL ) ); try{ this->addConsumer( consumer.get() ); @@ -468,7 +477,8 @@ cms::QueueBrowser* ActiveMQSession::crea // Create the QueueBrowser instance std::auto_ptr browser( - new ActiveMQQueueBrowser( this, this->getNextConsumerId(), dest, selector, false ) ); + new ActiveMQQueueBrowser( this, this->getNextConsumerId(), dest, + selector, this->connection->isDispatchAsync() ) ); return browser.release(); } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Thu Jun 3 20:27:50 2010 @@ -52,11 +52,6 @@ ActiveMQTransactionContext::ActiveMQTran // Store State Data this->session = session; this->connection = session->getConnection(); - - maximumRedeliveries = Integer::parseInt( - properties.getProperty( "transaction.maxRedeliveryCount", "5" ) ); - redeliveryDelay = Long::parseLong( - properties.getProperty( "transaction.redeliveryDelay", "0" ) ); } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) @@ -243,13 +238,3 @@ const Pointer& ActiveMQTr bool ActiveMQTransactionContext::isInTransaction() const { return this->transactionId != NULL; } - -//////////////////////////////////////////////////////////////////////////////// -int ActiveMQTransactionContext::getMaximumRedeliveries() const { - return this->maximumRedeliveries; -} - -//////////////////////////////////////////////////////////////////////////////// -long long ActiveMQTransactionContext::getRedeliveryDelay() const { - return this->redeliveryDelay; -} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h Thu Jun 3 20:27:50 2010 @@ -47,16 +47,6 @@ namespace core{ * creates a new transaction for the next set of messages. The only * way to permanently end this transaction is to delete it. * - * Configuration options - * - * transaction.maxRedeliveryCount - * Max number of times a message can be re-delivered, if the session is - * rolled back more than this many time, the message is dropped. - * - * transaction.redeliveryDelay - * Time in Milliseconds between message redelivery for rolled back - * transactions. - * * @since 2.0 */ class AMQCPP_API ActiveMQTransactionContext { @@ -74,13 +64,6 @@ namespace core{ // List of Registered Synchronizations decaf::util::StlSet< Pointer > synchronizations; - // Maximum number of time to redeliver a message when a Transaction is - // rolled back. - int maximumRedeliveries; - - // Time to wait before starting delivery again. - long long redeliveryDelay; - private: ActiveMQTransactionContext( const ActiveMQTransactionContext& ); @@ -147,19 +130,6 @@ namespace core{ */ virtual bool isInTransaction() const; - /** - * @returns The Maximum number of time the client will attempt to redeliver a - * message from a rolled back transaction before marking the message as not - * consumed by this client. - */ - virtual int getMaximumRedeliveries() const; - - /** - * @returns The time in Milliseconds that this client is configured to wait in - * between redelivery attempts for a Message in a rolled back transaction. - */ - virtual long long getRedeliveryDelay() const; - private: void beforeEnd(); Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp Thu Jun 3 20:27:50 2010 @@ -29,6 +29,9 @@ using namespace decaf; using namespace decaf::lang; //////////////////////////////////////////////////////////////////////////////// +const long long RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES = -1; + +//////////////////////////////////////////////////////////////////////////////// RedeliveryPolicy::RedeliveryPolicy() { } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h?rev=951143&r1=951142&r2=951143&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h Thu Jun 3 20:27:50 2010 @@ -32,6 +32,10 @@ namespace core { * @since 3.2.0 */ class AMQCPP_API RedeliveryPolicy { + public: + + static const long long NO_MAXIMUM_REDELIVERIES; + private: RedeliveryPolicy( const RedeliveryPolicy& );