activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r525314 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQTransaction.cpp ActiveMQTransaction.h
Date Tue, 03 Apr 2007 22:36:33 GMT
Author: tabish
Date: Tue Apr  3 15:36:32 2007
New Revision: 525314

URL: http://svn.apache.org/viewvc?view=rev&rev=525314
Log:
http://issues.apache.org/activemq/browse/AMQCPP-83

Remove the threading or redeliveries as consumers are all sent messages from one thread in
the session, just dispatch the messages and be done with it

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?view=diff&rev=525314&r1=525313&r2=525314
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
Tue Apr  3 15:36:32 2007
@@ -23,8 +23,6 @@
 #include <activemq/core/ActiveMQMessage.h>
 #include <activemq/util/Integer.h>
 
-#include <activemq/concurrent/ThreadPool.h>
-
 using namespace std;
 using namespace cms;
 using namespace activemq;
@@ -52,11 +50,8 @@
         // Store State Data
         this->connection = connection;
         this->session = session;
-        this->taskCount = 0;
 
         // convert from property Strings to int.
-        redeliveryDelay = Integer::parseInt(
-            properties.getProperty( "transaction.redeliveryDelay", "25" ) );
         maxRedeliveries = Integer::parseInt(
             properties.getProperty( "transaction.maxRedeliveryCount", "5" ) );
 
@@ -80,18 +75,6 @@
 
         // Clean up
         clearTransaction();
-
-        // Must allow all the tasks to complete before we destruct otherwise
-        // the callbacks will cause an exception.
-        synchronized( &tasksDone )
-        {
-            while( taskCount != 0 )
-            {
-                tasksDone.wait(1000);
-
-                // TODO - Log Here to get some indication if we are stuck
-            }
-        }
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -263,89 +246,36 @@
         transactionInfo = connection->getConnectionData()->
             getConnector()->startTransaction( session->getSessionInfo() );
 
-        // Create a task for each consumer and copy its message list out
-        // to the Rollback task so we can clear the list for new messages
-        // that might come in next.
-        //  NOTE - This could be turned into a Thread so that the connection
-        //  doesn't have to wait on this method to complete an release its
-        //  mutex so it can dispatch new messages.  That would however requre
-        //  copying the whole map over to the thread.
+        // Start Deliveries
+        session->start();
+
+        // Roolback the messages to the Session, since we have the lock on the
+        // rollbackLock, then no message will added to the transaction unitll we
+        // are done processing all the messages that we to redeliver and the map
+        // is cleared.
         synchronized( &rollbackLock )
         {
             RollbackMap::iterator itr = rollbackMap.begin();
 
             for(; itr != rollbackMap.end(); ++itr)
             {
-                ThreadPool::getInstance()->queueTask( make_pair(
-                    new RollbackTask( itr->first,
-                                      connection,
-                                      session,
-                                      itr->second,
-                                      maxRedeliveries,
-                                      redeliveryDelay ), this ) );
-
-                // Count the tasks started.
-                taskCount++;
+                redeliverMessages( itr->first, itr->second );
             }
 
             // Clear the map.  Ownership of the messages is now handed off
             // to the rollback tasks.
             rollbackMap.clear();
         }
-
-        // Start Deliveries
-        session->start();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::onTaskComplete( Runnable* task )
-{
-    try
-    {
-        // Delete the task
-        delete task;
+void ActiveMQTransaction::redeliverMessages( ActiveMQConsumer* consumer,
+                                             MessageList& messages ) 
+    throw ( exceptions::ActiveMQException ) {
 
-        taskCount--;
-
-        if( taskCount == 0 )
-        {
-            synchronized( &tasksDone )
-            {
-                tasksDone.notifyAll();
-            }
-        }
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::onTaskException( Runnable* task,
-                                           exceptions::ActiveMQException& ex )
-{
-    try
-    {
-        // Delegate
-        onTaskComplete( task );
-
-        // Route the Error
-        ExceptionListener* listener = connection->getExceptionListener();
-
-        if( listener != NULL )
-        {
-            listener->onException( ex );
-        }
-    }
-    AMQ_CATCH_NOTHROW( ActiveMQException )
-    AMQ_CATCHALL_NOTHROW( )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::RollbackTask::run(void)
-{
     try
     {
         MessageList::iterator itr = messages.begin();
@@ -354,9 +284,6 @@
         {
             ActiveMQMessage* message = *itr;
             message->setRedeliveryCount( message->getRedeliveryCount() + 1 );
-
-            // Redeliver Messages at some point in the future
-            Thread::sleep( redeliveryDelay );
 
             if( message->getRedeliveryCount() >= maxRedeliveries )
             {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h?view=diff&rev=525314&r1=525313&r2=525314
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
Tue Apr  3 15:36:32 2007
@@ -24,8 +24,6 @@
 #include <cms/CMSException.h>
 
 #include <activemq/concurrent/Mutex.h>
-#include <activemq/concurrent/TaskListener.h>
-#include <activemq/concurrent/Runnable.h>
 #include <activemq/connector/TransactionInfo.h>
 #include <activemq/exceptions/InvalidStateException.h>
 #include <activemq/exceptions/IllegalArgumentException.h>
@@ -48,15 +46,11 @@
      *
      * Configuration options
      *
-     * transaction.redeliveryDelay
-     *   Wait time between the redelivery of each message
-     *
      * transaction.maxRedeliveryCount
      *   Max number of times a message can be redelivered, if the session is
      *   rolled back more than this many time, the message is dropped.
      */
-    class ActiveMQTransaction : public concurrent::TaskListener,
-                                public connector::TransactionInfo
+    class ActiveMQTransaction : public connector::TransactionInfo
     {
     private:
 
@@ -87,15 +81,9 @@
         // Max number of redeliveries before we quit
         int maxRedeliveries;
 
-        // Wait time between sends of message on a rollback
-        int redeliveryDelay;
-
         // Mutex that is signaled when all tasks complete.
         concurrent::Mutex tasksDone;
 
-        // Count of Tasks that are outstanding
-        int taskCount;
-
     public:
 
         /**
@@ -190,28 +178,6 @@
             transactionInfo->setSessionInfo( session );
         }
 
-    protected:   // Task Listener Interface
-
-        /**
-         * Called when a queued task has completed, the task that
-         * finished is passed along for user consumption.  The task is
-         * deleted and the count of outstanding tasks is reduced.
-         * @param task - Runnable Pointer to the task that finished
-         */
-        virtual void onTaskComplete( concurrent::Runnable* task );
-
-         /**
-          * Called when a queued task has thrown an exception while
-          * being run.  The Callee should assume that this was an
-          * unrecoverable exeption and that this task is now defunct.
-          * Deletes the Task and notifies the connection that the
-          * exception has occurred.  Reduce the outstanding task count.
-          * @param task - Runnable Pointer to the task
-          * @param ex - The ActiveMQException that was thrown.
-          */
-         virtual void onTaskException( concurrent::Runnable* task,
-                                       exceptions::ActiveMQException& ex );
-
     protected:
 
         /**
@@ -222,55 +188,17 @@
          */
         virtual void clearTransaction();
 
-    private:
-
-        // Internal class that is used to redeliver one consumers worth
-        // of messages from this transaction.
-        class RollbackTask : public concurrent::Runnable
-        {
-        private:
-
-            // Wait time before redelivery in millisecs
-            int redeliveryDelay;
-
-            // Max number of time to redeliver this message
-            int maxRedeliveries;
-
-            // Messages to Redeliver
-            MessageList messages;
-
-            // Consumer we are redelivering to
-            ActiveMQConsumer* consumer;
-
-            // Connection to use for sending message acks
-            ActiveMQConnection* connection;
-
-            // Session for this Transaction
-            ActiveMQSession* session;
-
-        public:
-
-            RollbackTask( ActiveMQConsumer* consumer,
-                          ActiveMQConnection* connection,
-                          ActiveMQSession* session,
-                          MessageList& messages,
-                          int maxRedeliveries,
-                          int redeliveryDelay ){
-
-                // Store State Data.
-                this->messages        = messages;
-                this->consumer        = consumer;
-                this->redeliveryDelay = redeliveryDelay;
-                this->maxRedeliveries = maxRedeliveries;
-                this->session         = session;
-                this->connection      = connection;
-            }
-
-            // Dispatches the Messages to the Consumer.
-            virtual void run();
-
-        };
-
+        /**
+         * Redelivers each message that is in the Message List to the specified
+         * consumer, throwing messages away as they hit their max redilviery 
+         * count.
+         * @param consumer - the ActiveMQConsumer to redeliver to
+         * @param messages - the list of messages that should be sent.
+         * @throws ActiveMQException if an error occurs.
+         */
+        virtual void redeliverMessages( ActiveMQConsumer* consumer,
+                                        MessageList& messages ) 
+                                            throw ( exceptions::ActiveMQException );
     };
 
 }}



Mime
View raw message