activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r951143 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/
Date Thu, 03 Jun 2010 20:27:51 GMT
Author: tabish
Date: Thu Jun  3 20:27:50 2010
New Revision: 951143

URL: http://svn.apache.org/viewvc?rev=951143&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-293

Make use of the new Redelivery and Prefetch policies and clean up some older settings that
are no longer used.  

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu
Jun  3 20:27:50 2010
@@ -240,10 +240,11 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
     this->lastDeliveredSequenceId = -1;
     this->synchronizationRegistered = false;
     this->additionalWindowSize = 0;
-    this->redeliveryDelay = 0;
     this->deliveredCounter = 0;
     this->clearDispatchList = false;
     this->listener = NULL;
+    this->redeliveryDelay = 0;
+    this->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone()
);
 
     if( listener != NULL ) {
         this->setMessageListener( listener );
@@ -891,7 +892,7 @@ void ActiveMQConsumer::rollback() throw(
             Pointer<MessageDispatch> lastMsg = dispatchedMessages.front();
             const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
             if( currentRedeliveryCount > 0 ) {
-                redeliveryDelay = session->getTransactionContext()->getRedeliveryDelay();
+                redeliveryDelay = this->redeliveryPolicy->getRedeliveryDelay( redeliveryDelay
);
             }
 
             Pointer<MessageId> firstMsgId =
@@ -904,7 +905,8 @@ void ActiveMQConsumer::rollback() throw(
                 message->setRedeliveryCounter( message->getRedeliveryCounter() + 1
);
             }
 
-            if( lastMsg->getRedeliveryCounter() > this->session->getTransactionContext()->getMaximumRedeliveries()
) {
+            if( this->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES
&&
+                lastMsg->getRedeliveryCounter() > this->redeliveryPolicy->getMaximumRedeliveries()
) {
 
                 // We need to NACK the messages so that they get sent to the DLQ.
                 // Acknowledge the last message.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Thu
Jun  3 20:27:50 2010
@@ -29,6 +29,7 @@
 #include <activemq/commands/MessageDispatch.h>
 #include <activemq/core/Dispatcher.h>
 #include <activemq/core/MessageDispatchChannel.h>
+#include <activemq/core/RedeliveryPolicy.h>
 
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/Pointer.h>
@@ -111,11 +112,6 @@ namespace core{
         int additionalWindowSize;
 
         /**
-         * Time to wait before restarting delivery of rollback messages.
-         */
-        long long redeliveryDelay;
-
-        /**
          * Has the Synchronization been added for this transaction
          */
         volatile bool synchronizationRegistered;
@@ -125,6 +121,16 @@ namespace core{
          */
         bool clearDispatchList;
 
+        /**
+         * The redelivery delay used for the last set of redeliveries.
+         */
+        long long redeliveryDelay;
+
+        /**
+         * The policy to use when Message Redelivery is in progress.
+         */
+        std::auto_ptr<RedeliveryPolicy> redeliveryPolicy;
+
     private:
 
         ActiveMQConsumer( const ActiveMQConsumer& );
@@ -344,6 +350,31 @@ namespace core{
          */
         int getMessageAvailableCount() const;
 
+        /**
+         * Sets the RedeliveryPolicy this Consumer should use when a rollback is
+         * performed on a transacted Consumer.  The Consumer takes ownership of the
+         * passed pointer.  The Consumer's redelivery policy can never be null, a
+         * call to this method with a NULL pointer is ignored.
+         *
+         * @param policy
+         *      Pointer to a Redelivery Policy object that his Consumer will use.
+         */
+        void setRedeliveryPolicy( RedeliveryPolicy* policy ) {
+            if( policy != NULL ) {
+                this->redeliveryPolicy.reset( policy );
+            }
+        }
+
+        /**
+         * Gets a pointer to this Consumer's Redelivery Policy object, the Consumer
+         * retains ownership of this pointer so the caller should not delete it.
+         *
+         * @returns a Pointer to a RedeliveryPolicy that is in use by this Consumer.
+         */
+        RedeliveryPolicy* getRedeliveryPolicy() const {
+            return this->redeliveryPolicy.get();
+        }
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
Thu Jun  3 20:27:50 2010
@@ -23,7 +23,7 @@
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQConsumer.h>
 #include <activemq/core/ActiveMQSession.h>
-#include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/core/PrefetchPolicy.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/util/CMSExceptionSupport.h>
 
@@ -246,10 +246,12 @@ void ActiveMQQueueBrowser::waitForMessag
 ActiveMQConsumer* ActiveMQQueueBrowser::createConsumer() {
 
     this->browseDone.set( false );
-    // TODO - get config options from connection and prefetch policy.
+
+    int prefetch = this->session->getConnection()->getPrefetchPolicy()->getQueueBrowserPrefetch();
+
     std::auto_ptr<ActiveMQConsumer> consumer(
         new Browser( this, session, consumerId, destination, "", selector,
-                     500, 0, false, true, false, NULL ) );
+                     prefetch, 0, false, true, dispatchAsync, NULL ) );
 
     try{
         this->session->addConsumer( consumer.get() );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu
Jun  3 20:27:50 2010
@@ -24,6 +24,7 @@
 #include <activemq/core/ActiveMQProducer.h>
 #include <activemq/core/ActiveMQQueueBrowser.h>
 #include <activemq/core/ActiveMQSessionExecutor.h>
+#include <activemq/core/PrefetchPolicy.h>
 #include <activemq/util/ActiveMQProperties.h>
 #include <activemq/util/CMSExceptionSupport.h>
 
@@ -317,10 +318,17 @@ cms::MessageConsumer* ActiveMQSession::c
 
         Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure()
);
 
+        int prefetch = 0;
+        if( dest->isTopic() ) {
+            prefetch = this->connection->getPrefetchPolicy()->getTopicPrefetch();
+        } else {
+            prefetch = this->connection->getPrefetchPolicy()->getQueuePrefetch();
+        }
+
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
             new ActiveMQConsumer( this, this->getNextConsumerId(),
-                                  dest, "", selector, 1000, 0, noLocal,
+                                  dest, "", selector, prefetch, 0, noLocal,
                                   false, this->connection->isDispatchAsync(), NULL
) );
 
         try{
@@ -368,8 +376,9 @@ cms::MessageConsumer* ActiveMQSession::c
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
             new ActiveMQConsumer( this, this->getNextConsumerId(),
-                                  dest, name, selector, 1000, 0, noLocal,
-                                  false, this->connection->isDispatchAsync(), NULL
) );
+                                  dest, name, selector,
+                                  this->connection->getPrefetchPolicy()->getDurableTopicPrefetch(),
+                                  0, noLocal, false, this->connection->isDispatchAsync(),
NULL ) );
 
         try{
             this->addConsumer( consumer.get() );
@@ -468,7 +477,8 @@ cms::QueueBrowser* ActiveMQSession::crea
 
         // Create the QueueBrowser instance
         std::auto_ptr<ActiveMQQueueBrowser> browser(
-            new ActiveMQQueueBrowser( this, this->getNextConsumerId(), dest, selector,
false ) );
+            new ActiveMQQueueBrowser( this, this->getNextConsumerId(), dest,
+                                      selector, this->connection->isDispatchAsync()
) );
 
         return browser.release();
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
Thu Jun  3 20:27:50 2010
@@ -52,11 +52,6 @@ ActiveMQTransactionContext::ActiveMQTran
         // Store State Data
         this->session = session;
         this->connection = session->getConnection();
-
-        maximumRedeliveries = Integer::parseInt(
-            properties.getProperty( "transaction.maxRedeliveryCount", "5" ) );
-        redeliveryDelay = Long::parseLong(
-            properties.getProperty( "transaction.redeliveryDelay", "0" ) );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -243,13 +238,3 @@ const Pointer<TransactionId>& ActiveMQTr
 bool ActiveMQTransactionContext::isInTransaction() const {
     return this->transactionId != NULL;
 }
-
-////////////////////////////////////////////////////////////////////////////////
-int ActiveMQTransactionContext::getMaximumRedeliveries() const {
-    return this->maximumRedeliveries;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-long long ActiveMQTransactionContext::getRedeliveryDelay() const {
-    return this->redeliveryDelay;
-}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
Thu Jun  3 20:27:50 2010
@@ -47,16 +47,6 @@ namespace core{
      * creates a new transaction for the next set of messages.  The only
      * way to permanently end this transaction is to delete it.
      *
-     * Configuration options
-     *
-     * transaction.maxRedeliveryCount
-     *   Max number of times a message can be re-delivered, if the session is
-     *   rolled back more than this many time, the message is dropped.
-     *
-     * transaction.redeliveryDelay
-     *   Time in Milliseconds between message redelivery for rolled back
-     *   transactions.
-     *
      * @since 2.0
      */
     class AMQCPP_API ActiveMQTransactionContext {
@@ -74,13 +64,6 @@ namespace core{
         // List of Registered Synchronizations
         decaf::util::StlSet< Pointer<Synchronization> > synchronizations;
 
-        // Maximum number of time to redeliver a message when a Transaction is
-        // rolled back.
-        int maximumRedeliveries;
-
-        // Time to wait before starting delivery again.
-        long long redeliveryDelay;
-
     private:
 
         ActiveMQTransactionContext( const ActiveMQTransactionContext& );
@@ -147,19 +130,6 @@ namespace core{
          */
         virtual bool isInTransaction() const;
 
-        /**
-         * @returns The Maximum number of time the client will attempt to redeliver a
-         * message from a rolled back transaction before marking the message as not
-         * consumed by this client.
-         */
-        virtual int getMaximumRedeliveries() const;
-
-        /**
-         * @returns The time in Milliseconds that this client is configured to wait in
-         * between redelivery attempts for a Message in a rolled back transaction.
-         */
-        virtual long long getRedeliveryDelay() const;
-
     private:
 
         void beforeEnd();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.cpp Thu
Jun  3 20:27:50 2010
@@ -29,6 +29,9 @@ using namespace decaf;
 using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
+const long long RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES = -1;
+
+////////////////////////////////////////////////////////////////////////////////
 RedeliveryPolicy::RedeliveryPolicy() {
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h?rev=951143&r1=951142&r2=951143&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/RedeliveryPolicy.h Thu
Jun  3 20:27:50 2010
@@ -32,6 +32,10 @@ namespace core {
      * @since 3.2.0
      */
     class AMQCPP_API RedeliveryPolicy {
+    public:
+
+        static const long long NO_MAXIMUM_REDELIVERIES;
+
     private:
 
         RedeliveryPolicy( const RedeliveryPolicy& );



Mime
View raw message