activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r763046 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ test/ test/activemq/core/
Date Tue, 07 Apr 2009 23:59:08 GMT
Author: tabish
Date: Tue Apr  7 23:59:07 2009
New Revision: 763046

URL: http://svn.apache.org/viewvc?rev=763046&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-209

Major refactoring of the dispatching code, messages are dispatched one at a time instead of grabbing the dispatch thread for long periods.  The logic in the consumer deals better with rollbacks

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Tue Apr  7 23:59:07 2009
@@ -337,6 +337,7 @@
     activemq/core/ActiveMQTransactionContext.cpp \
     activemq/core/ActiveMQSession.cpp \
     activemq/core/ActiveMQConnectionSupport.cpp \
+    activemq/core/MessageDispatchChannel.cpp \
     activemq/core/ActiveMQConnectionFactory.cpp \
     activemq/core/ActiveMQProducer.cpp \
     activemq/commands/ActiveMQQueue.cpp \
@@ -877,6 +878,7 @@
     activemq/core/ActiveMQConnection.h \
     activemq/core/ActiveMQConstants.h \
     activemq/core/ActiveMQConnectionFactory.h \
+    activemq/core/MessageDispatchChannel.h \
     activemq/core/ActiveMQSession.h \
     activemq/core/Synchronization.h \
     activemq/core/ActiveMQConnectionMetaData.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Tue Apr  7 23:59:07 2009
@@ -67,8 +67,6 @@
     ActiveMQConnectionSupport( transport, properties ),
     connectionMetaData( new ActiveMQConnectionMetaData() ),
     connectionInfo( new ConnectionInfo() ),
-    started( false ),
-    closed( false ),
     exceptionListener( NULL ) {
 
     // Register for messages and exceptions from the connector.
@@ -154,7 +152,7 @@
         }
 
         // If we're already started, start the session.
-        if( started ) {
+        if( this->started.get() ) {
             session->start();
         }
 
@@ -245,8 +243,8 @@
 
         // Once current deliveries are done this stops the delivery
         // of any new messages.
-        this->started = false;
-        this->closed = true;
+        this->started.set( false );
+        this->closed.set( true );
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -260,7 +258,7 @@
         // This starts or restarts the delivery of all incomming messages
         // messages delivered while this connection is stopped are dropped
         // and not acknowledged.
-        started = true;
+        this->started.set( true );
 
         // Start all the sessions.
         std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
@@ -280,7 +278,7 @@
 
         // Once current deliveries are done this stops the delivery of any
         // new messages.
-        started = false;
+        this->started.set( false );
 
         std::auto_ptr< Iterator<ActiveMQSession*> > iter( activeSessions.iterator() );
 
@@ -432,8 +430,7 @@
 
         if( command->isMessageDispatch() ) {
 
-            MessageDispatch* dispatch =
-                dynamic_cast<MessageDispatch*>( command.get() );
+            Pointer<MessageDispatch> dispatch = command.dynamicCast<MessageDispatch>();
 
             // Check fo an empty Message, shouldn't ever happen but who knows.
             if( dispatch->getMessage() == NULL ) {
@@ -453,13 +450,10 @@
                 // just closed.
                 if( dispatcher != NULL ) {
 
-                    Pointer<commands::Message> message = dispatch->getMessage();
-                    message->setReadOnlyBody( true );
-                    message->setReadOnlyProperties( true );
-
-                    // Dispatch the message.
-                    DispatchData data( dispatch->getConsumerId(), message );
-                    dispatcher->dispatch( data );
+                    dispatch->getMessage()->setReadOnlyBody( true );
+                    dispatch->getMessage()->setReadOnlyProperties( true );
+
+                    dispatcher->dispatch( dispatch );
                 }
             }
 
@@ -529,6 +523,18 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::transportInterrupted() {
+
+    synchronized( &activeSessions ) {
+        std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->activeSessions.iterator() );
+
+        while( iter->hasNext() ) {
+            iter->next()->clearMessagesInProgress();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::oneway( Pointer<Command> command )
     throw ( ActiveMQException ) {
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Tue Apr  7 23:59:07 2009
@@ -34,6 +34,7 @@
 #include <decaf/util/Properties.h>
 #include <decaf/util/StlMap.h>
 #include <decaf/util/StlSet.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/IllegalStateException.h>
@@ -45,6 +46,7 @@
 namespace core{
 
     using decaf::lang::Pointer;
+    using decaf::util::concurrent::atomic::AtomicBoolean;
 
     class ActiveMQSession;
     class ActiveMQProducer;
@@ -86,13 +88,13 @@
         /**
          * Indicates if this Connection is started
          */
-        bool started;
+        AtomicBoolean started;
 
         /**
          * Indicates that this connection has been closed, it is no longer
          * usable after this becomes true
          */
-        bool closed;
+        AtomicBoolean closed;
 
         /**
          * Map of message dispatchers indexed by consumer id.
@@ -191,7 +193,15 @@
          * @return true if the connection is closed
          */
         bool isClosed() const {
-            return this->closed;
+            return this->closed.get();
+        }
+
+        /**
+         * Check if this connection has been started.
+         * @return true if the start method has been called.
+         */
+        bool isStarted() const {
+            return this->started.get();
         }
 
         /**
@@ -315,7 +325,7 @@
             exceptionListener = listener;
         };
 
-    public: // commands::CommandListener
+    public: // TransportListener
 
         /**
          * Event handler for the receipt of a non-response command from the
@@ -324,8 +334,6 @@
          */
         virtual void onCommand( const Pointer<commands::Command>& command );
 
-    public: // TransportExceptionListener
-
         /**
          * Event handler for an exception from a command transport.
          * @param source The source of the exception
@@ -333,6 +341,11 @@
          */
         virtual void onException( const decaf::lang::Exception& ex );
 
+        /**
+         * The transport has suffered an interruption from which it hopes to recover
+         */
+        virtual void transportInterrupted();
+
     public:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h Tue Apr  7 23:59:07 2009
@@ -302,11 +302,6 @@
         }
 
         /**
-         * The transport has suffered an interruption from which it hopes to recover
-         */
-        virtual void transportInterrupted() {}
-
-        /**
          * The transport has resumed after an interruption
          */
         virtual void transportResumed() {}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Tue Apr  7 23:59:07 2009
@@ -19,8 +19,8 @@
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/util/Date.h>
 #include <decaf/lang/Math.h>
+#include <decaf/lang/System.h>
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/Message.h>
@@ -134,12 +134,13 @@
     this->session = session;
     this->transaction = transaction;
     this->consumerInfo = consumerInfo;
-    this->closed = false;
     this->lastDeliveredSequenceId = 0;
     this->synchronizationRegistered = false;
     this->additionalWindowSize = 0;
     this->redeliveryDelay = 0;
     this->deliveredCounter = 0;
+    this->clearDispatchList = false;
+    this->listener = NULL;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -153,11 +154,29 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::start() {
+
+    if( this->unconsumedMessages.isClosed() ) {
+        return;
+    }
+
+    this->started.set( true );
+    this->unconsumedMessages.start();
+    this->session->wakeup();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::stop() {
+    this->started.set( false );
+    this->unconsumedMessages.stop();
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::close()
     throw ( cms::CMSException ) {
 
     try{
-        if( !closed ) {
+        if( !this->isClosed() ) {
             if( this->transaction != NULL && this->transaction->isInTransaction() ) {
 
                 // TODO - Currently we can do this since the consumer could be
@@ -167,8 +186,13 @@
                 //
                 //Pointer<Synchronization> sync( new CloseSynhcronization( this ) );
                 //this->transaction->addSynchronization( sync );
+
                 doClose();
 
+                throw UnsupportedOperationException(
+                    __FILE__, __LINE__,
+                    "The Consumer is still in an Active Transaction, commit it first." );
+
             } else {
                 doClose();
             }
@@ -192,7 +216,7 @@
             // remove it from the Broker.
             this->session->disposeOf( this->consumerInfo->getConsumerId() );
 
-            this->closed = true;
+            this->started.set( false );
 
             // Identifies any errors encountered during shutdown.
             bool haveException = false;
@@ -200,7 +224,7 @@
 
             // Purge all the pending messages
             try{
-                purgeMessages();
+                unconsumedMessages.clear();
             } catch ( ActiveMQException& ex ){
                 if( !haveException ){
                     ex.setMark( __FILE__, __LINE__ );
@@ -209,10 +233,8 @@
                 }
             }
 
-            // Wakeup any synchronous consumers.
-            synchronized( &unconsumedMessages ) {
-                unconsumedMessages.notifyAll();
-            }
+            // Stop and Wakeup all sync consumers.
+            unconsumedMessages.close();
 
             // If we encountered an error, propagate it.
             if( haveException ){
@@ -238,66 +260,47 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-decaf::lang::Pointer<commands::Message> ActiveMQConsumer::dequeue( int timeout )
+decaf::lang::Pointer<MessageDispatch> ActiveMQConsumer::dequeue( int timeout )
     throw ( cms::CMSException ) {
 
     try {
 
         this->checkClosed();
 
-        synchronized( &unconsumedMessages ) {
-
-            // Calculate the deadline
-            long long deadline = 0;
-            if( timeout > 0 ) {
-                deadline = Date::getCurrentTimeMilliseconds() + timeout;
-            }
-
-            // Loop until the time is up or we get a non-expired message
-            while( true ) {
-
-                // Wait until either the deadline is met, a message arrives, or
-                // we've closed.
-                while( !closed && unconsumedMessages.empty() && timeout != 0 ) {
-
-                    if( timeout < 0 ) {
-                        unconsumedMessages.wait();
-                    } else if( timeout > 0 ) {
-                        unconsumedMessages.wait(timeout);
-                        timeout = std::max((int)(deadline - Date::getCurrentTimeMilliseconds()), 0);
-                    }
-                }
-
-                if( unconsumedMessages.empty() ) {
-                    break;
+        // Calculate the deadline
+        long long deadline = 0;
+        if( timeout > 0 ) {
+            deadline = Date::getCurrentTimeMilliseconds() + timeout;
+        }
+
+        // Loop until the time is up or we get a non-expired message
+        while( true ) {
+
+            Pointer<MessageDispatch> dispatch = unconsumedMessages.dequeue( timeout );
+            if( dispatch == NULL ) {
+
+                if( timeout > 0 && !unconsumedMessages.isClosed() ) {
+                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
+                } else {
+                    return Pointer<MessageDispatch>();
                 }
 
-                // Fetch the Message then copy it so it can be handed off
-                // to the user.
-                DispatchData data = unconsumedMessages.pop();
-
-                Pointer<Message> message = data.getMessage();
-
-                // If it's expired, process the message and then go back to waiting.
-                if( message->isExpired() ) {
-
-                    beforeMessageIsConsumed( message );
-                    afterMessageIsConsumed( message, true );
-                    if( timeout > 0 ) {
-                        timeout = std::max(
-                            (int)( deadline - Date::getCurrentTimeMilliseconds() ), 0 );
-                    }
+            } else if( dispatch->getMessage()->isExpired() ) {
 
-                    // Go back to waiting for a non-expired message.
-                    continue;
+                beforeMessageIsConsumed( dispatch );
+                afterMessageIsConsumed( dispatch, true );
+                if( timeout > 0 ) {
+                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
                 }
 
-                // Return the message.
-                return message;
+                continue;
             }
+
+            // Return the message.
+            return dispatch;
         }
 
-        return Pointer<Message>();
+        return Pointer<MessageDispatch>();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -313,18 +316,18 @@
         this->sendPullRequest( 0 );
 
         // Wait for the next message.
-        Pointer<Message> message = dequeue( -1 );
+        Pointer<MessageDispatch> message = dequeue( -1 );
         if( message == NULL ) {
             return NULL;
         }
 
-        // Message preprocessing
+        // Message pre-processing
         beforeMessageIsConsumed( message );
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->cloneDataStructure() );
+            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
 
         // Post processing (may result in the message being deleted)
         afterMessageIsConsumed( message, false );
@@ -347,7 +350,7 @@
         this->sendPullRequest( millisecs );
 
         // Wait for the next message.
-        Pointer<Message> message = dequeue( millisecs );
+        Pointer<MessageDispatch> message = dequeue( millisecs );
         if( message == NULL ) {
             return NULL;
         }
@@ -358,7 +361,7 @@
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->cloneDataStructure() );
+            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
 
         // Post processing (may result in the message being deleted)
         afterMessageIsConsumed( message, false );
@@ -381,7 +384,7 @@
         this->sendPullRequest( -1 );
 
         // Get the next available message, if there is one.
-        Pointer<Message> message = dequeue( 0 );
+        Pointer<MessageDispatch> message = dequeue( 0 );
         if( message == NULL ) {
             return NULL;
         }
@@ -392,7 +395,7 @@
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->cloneDataStructure() );
+            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
 
         // Post processing (may result in the message being deleted)
         afterMessageIsConsumed( message, false );
@@ -426,7 +429,9 @@
                 session->stop();
             }
 
-            this->listener.set( listener );
+            synchronized( &listenerMutex ) {
+                this->listener = listener;
+            }
 
             session->redispatch( unconsumedMessages );
 
@@ -434,7 +439,9 @@
                 session->start();
             }
         } else {
-            this->listener.set( NULL );
+            synchronized( &listenerMutex ) {
+                this->listener = NULL;
+            }
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -443,7 +450,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<Message>& message ) {
+void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<MessageDispatch>& dispatch ) {
 
     // If the Session is in ClientAcknowledge mode, then we set the
     // handler in the message to this object and send it out.  Otherwise
@@ -452,13 +459,14 @@
 
         // Register ourself so that we can handle the Message's
         // acknowledge method.
-        message->setAckHandler( this );
+        dispatch->getMessage()->setAckHandler( this );
     }
 
-    this->lastDeliveredSequenceId = message->getMessageId()->getBrokerSequenceId();
+    this->lastDeliveredSequenceId =
+        dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
 
     synchronized( &dispatchedMessages ) {
-        dispatchedMessages.enqueueFront( message );
+        dispatchedMessages.enqueueFront( dispatch );
     }
 
     // If the session is transacted then we hand off the message to it to
@@ -473,16 +481,20 @@
                 "In a Transacted Session but no Transaction Context set." );
         }
 
-        ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<Message>& message,
+void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<MessageDispatch>& message,
                                                bool messageExpired AMQCPP_UNUSED ) {
 
     try{
 
+        if( unconsumedMessages.isClosed() ) {
+            return;
+        }
+
         if( messageExpired == true ) {
             ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
         }
@@ -557,7 +569,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::ackLater( const Pointer<Message>& message, int ackType )
+void ActiveMQConsumer::ackLater( const Pointer<MessageDispatch>& dispatch, int ackType )
     throw ( ActiveMQException ) {
 
     // Don't acknowledge now, but we may need to let the broker know the
@@ -578,10 +590,10 @@
 
     Pointer<MessageAck> oldPendingAck = pendingAck;
     pendingAck.reset( new MessageAck() );
-    pendingAck->setConsumerId( this->consumerInfo->getConsumerId() );
+    pendingAck->setConsumerId( dispatch->getConsumerId() );
     pendingAck->setAckType( ackType );
-    pendingAck->setDestination( message->getDestination() );
-    pendingAck->setLastMessageId( message->getMessageId() );
+    pendingAck->setDestination( dispatch->getDestination() );
+    pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() );
     pendingAck->setMessageCount( deliveredCounter );
 
     if( oldPendingAck == NULL ) {
@@ -614,14 +626,14 @@
 
         if( !dispatchedMessages.empty() ) {
 
-            Pointer<Message> message = dispatchedMessages.front();
+            Pointer<Message> message = dispatchedMessages.front()->getMessage();
             Pointer<MessageAck> ack( new MessageAck() );
             ack->setAckType( type );
             ack->setConsumerId( this->consumerInfo->getConsumerId() );
             ack->setDestination( message->getDestination() );
             ack->setMessageCount( (int)dispatchedMessages.size() );
             ack->setLastMessageId( message->getMessageId() );
-            ack->setFirstMessageId( dispatchedMessages.back()->getMessageId() );
+            ack->setFirstMessageId( dispatchedMessages.back()->getMessage()->getMessageId() );
 
             return ack;
         }
@@ -701,18 +713,19 @@
             }
 
             // Only increase the redelivery delay after the first redelivery..
-            Pointer<Message> lastMsg = dispatchedMessages.front();
+            Pointer<Message> lastMsg = dispatchedMessages.front()->getMessage();
             const int currentRedeliveryCount = lastMsg->getRedeliveryCounter();
             if( currentRedeliveryCount > 0 ) {
                 redeliveryDelay = transaction->getRedeliveryDelay();
             }
 
-            Pointer<MessageId> firstMsgId = dispatchedMessages.back()->getMessageId();
+            Pointer<MessageId> firstMsgId =
+                dispatchedMessages.back()->getMessage()->getMessageId();
 
-            std::auto_ptr< Iterator< Pointer<Message> > > iter( dispatchedMessages.iterator() );
+            std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( dispatchedMessages.iterator() );
 
             while( iter->hasNext() ) {
-                Pointer<Message> message = iter->next();
+                Pointer<Message> message = iter->next()->getMessage();
                 message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 );
             }
 
@@ -749,11 +762,32 @@
                     session->oneway( ack );
                 }
 
-                std::auto_ptr< Iterator< Pointer<Message> > > iter( dispatchedMessages.iterator() );
+                // stop the delivery of messages.
+                unconsumedMessages.stop();
+
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( dispatchedMessages.iterator() );
 
                 while( iter->hasNext() ) {
-                    DispatchData dispatch( this->consumerInfo->getConsumerId(), iter->next() );
-                    unconsumedMessages.enqueueFront( dispatch );
+                    unconsumedMessages.enqueueFirst( iter->next() );
+                }
+
+                if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
+                    // TODO
+                    // Start up the delivery again a little later.
+                    //scheduler.executeAfterDelay(new Runnable() {
+                    //    public void run() {
+                    //        try {
+                    //            if( !started.get() ) {
+                    //                start();
+                    //            }
+                    //        } catch( CMSException& e ) {
+                    //            session.connection.onAsyncException(e);
+                    //        }
+                    //    }
+                    //}, redeliveryDelay);
+                    start();
+                } else {
+                    start();
                 }
 
             }
@@ -762,61 +796,56 @@
         }
     }
 
-    if( this->listener.get() != NULL ) {
+    if( this->listener != NULL ) {
         session->redispatch( unconsumedMessages );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::dispatch( DispatchData& data ) {
+void ActiveMQConsumer::dispatch( const Pointer<MessageDispatch>& dispatch ) {
 
     try {
 
-        Pointer<Message> message = data.getMessage();
-
-        // Don't dispatch expired messages, ack it and then destroy it
-        if( message->isExpired() ) {
-            this->ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
-
-            // stop now, don't queue
-            return;
-        }
+        synchronized( &unconsumedMessages ) {
 
-        cms::MessageListener* cmsListener = this->listener.get();
+            if( this->clearDispatchList ) {
+                // we are reconnecting so lets flush the in progress
+                // messages
+                clearDispatchList = false;
+                unconsumedMessages.clear();
+            }
 
-        // If we have a listener, send the message.
-        if( cmsListener != NULL ) {
+            if( !unconsumedMessages.isClosed() ) {
 
-            // Preprocessing.
-            beforeMessageIsConsumed( message );
+                // Don't dispatch expired messages, ack it and then destroy it
+                if( dispatch->getMessage()->isExpired() ) {
+                    this->ackLater( dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED );
 
-            // Notify the listener
-            cmsListener->onMessage( dynamic_cast<cms::Message*>( message.get() ) );
+                    // stop now, don't queue
+                    return;
+                }
 
-            // Postprocessing
-            afterMessageIsConsumed( message, false );
+                synchronized( &listenerMutex ) {
+                    // If we have a listener, send the message.
+                    if( this->listener != NULL && unconsumedMessages.isRunning() ) {
 
-        } else {
+                        // Preprocessing.
+                        beforeMessageIsConsumed( dispatch );
 
-            // No listener, add it to the unconsumed messages list
-            synchronized( &unconsumedMessages ) {
-                unconsumedMessages.push( data );
-                unconsumedMessages.notifyAll();
-            }
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
+                        // Notify the listener
+                        this->listener->onMessage(
+                            dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
 
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException ) {
+                        // Postprocessing
+                        afterMessageIsConsumed( dispatch, false );
 
-    try {
+                    } else {
 
-        synchronized( &this->unconsumedMessages ) {
-            this->unconsumedMessages.clear();
+                        // No listener, add it to the unconsumed messages list
+                        this->unconsumedMessages.enqueue( dispatch );
+                    }
+                }
+            }
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -833,7 +862,7 @@
         this->checkClosed();
 
         // There are still local message, consume them first.
-        if( !this->unconsumedMessages.empty() ) {
+        if( !this->unconsumedMessages.isEmpty() ) {
             return;
         }
 
@@ -860,3 +889,42 @@
             "ActiveMQConsumer - Consumer Already Closed" );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::iterate() {
+
+    synchronized( &listenerMutex ) {
+
+        if( this->listener != NULL ) {
+
+            Pointer<MessageDispatch> dispatch = unconsumedMessages.dequeueNoWait();
+            if( dispatch != NULL ) {
+
+                try {
+                    beforeMessageIsConsumed( dispatch );
+                    this->listener->onMessage(
+                        dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
+                    afterMessageIsConsumed( dispatch, false );
+                } catch( ActiveMQException& ex ) {
+                    this->session->fire( ex );
+                }
+
+                return true;
+            }
+        }
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::clearMessagesInProgress() {
+    // we are called from inside the transport reconnection logic
+    // which involves us clearing all the connections' consumers
+    // dispatch lists and clearing them
+    // so rather than trying to grab a mutex (which could be already
+    // owned by the message listener calling the send) we will just set
+    // a flag so that the list can be cleared as soon as the
+    // dispatch thread is ready to flush the dispatch list
+    clearDispatchList = true;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Tue Apr  7 23:59:07 2009
@@ -26,11 +26,12 @@
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessageDispatch.h>
 #include <activemq/core/ActiveMQAckHandler.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/Dispatcher.h>
+#include <activemq/core/MessageDispatchChannel.h>
 
-#include <decaf/util/concurrent/atomic/AtomicReference.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/StlQueue.h>
@@ -41,15 +42,14 @@
 namespace core{
 
     using decaf::lang::Pointer;
-    using decaf::util::concurrent::atomic::AtomicReference;
     using decaf::util::concurrent::atomic::AtomicBoolean;
+    using activemq::core::MessageDispatchChannel;
 
     class ActiveMQSession;
 
-    class AMQCPP_API ActiveMQConsumer :
-        public cms::MessageConsumer,
-        public ActiveMQAckHandler,
-        public Dispatcher
+    class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
+                                        public ActiveMQAckHandler,
+                                        public Dispatcher
     {
     private:
 
@@ -71,7 +71,12 @@
         /**
          * The Message Listener for this Consumer
          */
-        AtomicReference<cms::MessageListener> listener;
+        cms::MessageListener* listener;
+
+        /**
+         * Mutex to Protect access to the listener during delivery.
+         */
+        decaf::util::concurrent::Mutex listenerMutex;
 
         /**
          * Is the consumer currently delivering acks.
@@ -79,14 +84,19 @@
         AtomicBoolean deliveringAcks;
 
         /**
+         * Has this Consumer been started yet.
+         */
+        AtomicBoolean started;
+
+        /**
          * Queue of unconsumed messages.
          */
-        decaf::util::StlQueue<DispatchData> unconsumedMessages;
+        MessageDispatchChannel unconsumedMessages;
 
         /**
          * Queue of consumed messages.
          */
-        decaf::util::StlQueue< decaf::lang::Pointer<commands::Message> > dispatchedMessages;
+        decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
 
         /**
          * The last delivered message's BrokerSequenceId.
@@ -119,9 +129,9 @@
         volatile bool synchronizationRegistered;
 
         /**
-         * Boolean that indicates if the consumer has been closed
+         * Boolean indicating if in progress messages should be cleared.
          */
-        bool closed;
+        bool clearDispatchList;
 
     public:
 
@@ -137,6 +147,19 @@
     public:  // Interface Implementation
 
         /**
+         * Starts the Consumer if not already started and not closed. A consumer
+         * will no deliver messages until started.
+         */
+        virtual void start();
+
+        /**
+         * Stops a Consumer, the Consumer will not deliver any messages that are
+         * dispatched to it until it is started again.  A Closed Consumer is also a
+         * stopped consumer.
+         */
+        virtual void stop();
+
+        /**
          * Closes the Consumer.  This will return all allocated resources
          * and purge any outstanding messages.  This method will block if
          * there is a call to receive in progress, or a dispatch to a
@@ -180,7 +203,7 @@
          * @param MessageListener interface pointer
          */
         virtual cms::MessageListener* getMessageListener() const {
-            return this->listener.get();
+            return this->listener;
         }
 
         /**
@@ -204,9 +227,9 @@
 
         /**
          * Called asynchronously by the session to dispatch a message.
-         * @param message object pointer
+         * @param message dispatch object pointer
          */
-        virtual void dispatch( DispatchData& message );
+        virtual void dispatch( const Pointer<MessageDispatch>& message );
 
     public:  // ActiveMQConsumer Methods
 
@@ -256,7 +279,7 @@
          * @returns if this Consumer has been closed.
          */
         bool isClosed() const {
-            return this->closed;
+            return this->unconsumedMessages.isClosed();
         }
 
         /**
@@ -275,13 +298,25 @@
             this->synchronizationRegistered = value;
         }
 
-    protected:
+        /**
+         * Deliver any pending messages to the registered MessageListener if there
+         * is one, return true if not all dispatched, or false if no listener or all
+         * pending messages have been dispatched.
+         */
+        bool iterate();
+
+
+        /**
+         * Forces this consumer to send all pending acks to the broker.
+         */
+        void deliverAcks() throw ( exceptions::ActiveMQException );
 
         /**
-         * Purges all messages currently in the queue.  This can be as a
-         * result of a rollback, or of the consumer being shutdown.
+         * Called on a Failover to clear any pending messages.
          */
-        void purgeMessages() throw ( exceptions::ActiveMQException );
+        void clearMessagesInProgress();
+
+    protected:
 
         /**
          * Used by synchronous receive methods to wait for messages to come in.
@@ -296,7 +331,7 @@
          * @throws InvalidStateException if this consumer is closed upon
          * entering this method.
          */
-        Pointer<commands::Message> dequeue( int timeout )
+        Pointer<MessageDispatch> dequeue( int timeout )
             throw ( cms::CMSException );
 
         /**
@@ -304,7 +339,7 @@
          * @param message - the message being consumed.
          */
         void beforeMessageIsConsumed(
-            const Pointer<commands::Message>& message );
+            const Pointer<commands::MessageDispatch>& dispatch );
 
         /**
          * Post-consume processing
@@ -312,7 +347,7 @@
          * @param messageExpired - flag indicating if the message has expired.
          */
         void afterMessageIsConsumed(
-            const Pointer<commands::Message>& message, bool messageExpired );
+            const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired );
 
     private:
 
@@ -334,12 +369,9 @@
         // Sends an ack as needed in order to keep them coming in if the current
         // ack mode allows the consumer to receive up to the prefetch limit before
         // an real ack is sent.
-        void ackLater( const Pointer<commands::Message>& message, int ackType )
+        void ackLater( const Pointer<commands::MessageDispatch>& message, int ackType )
             throw ( exceptions::ActiveMQException );
 
-        // Delivers all pending acks before a consumer is closed
-        void deliverAcks() throw ( exceptions::ActiveMQException );
-
         // Create an Ack Message that acks all messages that have been delivered so far.
         Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Tue Apr  7 23:59:07 2009
@@ -194,20 +194,40 @@
                 "ActiveMQSession::rollback - This Session is not Transacted" );
         }
 
-        bool started = this->executor->isStarted();
+        // Roll back the Transaction
+        this->transaction->rollback();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::clearMessagesInProgress() {
+
+    if( this->executor.get() != NULL ) {
+        this->executor->clearMessagesInProgress();
+    }
+
+    synchronized( &this->consumers ) {
+        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
 
-        if( started ) {
-            this->executor->stop();
+        std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+        for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->clearMessagesInProgress();
         }
+    }
+}
 
-        // Roll back the Transaction
-        this->transaction->rollback();
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::deliverAcks() {
+
+    synchronized( &this->consumers ) {
+        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
 
-        if( started ) {
-            this->executor->start();
+        std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+        for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->deliverAcks();
         }
     }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -275,6 +295,10 @@
         // Send our info to the Broker.
         this->syncRequest( consumerInfo );
 
+        if( this->connection->isStarted() ) {
+            consumer->start();
+        }
+
         return consumer.release();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -318,6 +342,10 @@
         // Send our info to the Broker.
         this->syncRequest( consumerInfo );
 
+        if( this->connection->isStarted() ) {
+            consumer->start();
+        }
+
         return consumer.release();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -656,34 +684,36 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::dispatch( DispatchData& message ) {
+void ActiveMQSession::dispatch( const Pointer<MessageDispatch>& dispatch ) {
 
     if( this->executor.get() != NULL ) {
-        this->executor->execute( message );
+        this->executor->execute( dispatch );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::redispatch( decaf::util::StlQueue<DispatchData>& unconsumedMessages ) {
+void ActiveMQSession::redispatch( MessageDispatchChannel& unconsumedMessages ) {
 
-    decaf::util::StlQueue<DispatchData> reversedList;
+    std::vector< Pointer<MessageDispatch> > messages = unconsumedMessages.removeAll();
+    std::vector< Pointer<MessageDispatch> >::reverse_iterator iter = messages.rbegin();
 
-    // Copy the list in reverse order then clear the original list.
-    synchronized( &unconsumedMessages ) {
-        unconsumedMessages.reverse( reversedList );
-        unconsumedMessages.clear();
-    }
-
-    // Add the list to the front of the executor.
-    while( !reversedList.empty() ) {
-        DispatchData data = reversedList.pop();
-        executor->executeFirst( data );
+    for( ; iter != messages.rend(); ++iter ) {
+        executor->executeFirst( *iter );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::start() {
 
+    synchronized( &this->consumers ) {
+        std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+
+        std::vector< ActiveMQConsumer*>::iterator iter = consumers.begin();
+        for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->start();
+        }
+    }
+
     if( this->executor.get() != NULL ) {
         this->executor->start();
     }
@@ -704,7 +734,7 @@
         return false;
     }
 
-    return this->executor->isStarted();
+    return this->executor->isRunning();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -950,28 +980,11 @@
 
             if( this->consumers.containsKey( id ) ) {
 
-                // If the executor thread is currently running, stop it.
-                bool wasStarted = isStarted();
-                if( wasStarted ) {
-                    stop();
-                }
-
                 // Remove this Id both from the Sessions Map of Consumers and from
                 // the Connection.
                 this->connection->removeDispatcher( id );
                 this->connection->disposeOf( id );
                 this->consumers.remove( id );
-
-                // Clean up any resources in the executor for this consumer
-                if( this->executor.get() != NULL ) {
-
-                    // Purge any pending messages for this consumer.
-                    this->executor->purgeConsumerMessages( id );
-                }
-
-                if( wasStarted ) {
-                    start();
-                }
             }
         }
     }
@@ -1004,17 +1017,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer* ActiveMQSession::getConsumer( const Pointer<ConsumerId>& id ) {
-
-    synchronized( &this->consumers ) {
-        if( this->consumers.containsKey( id ) ) {
-            return this->consumers.get( id );
-        }
-    }
-    return NULL;
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::doStartTransaction() throw ( ActiveMQException ) {
 
     if( !this->isTransacted() ) {
@@ -1023,3 +1025,11 @@
 
     this->transaction->begin();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::wakeup() {
+
+    if( this->executor.get() != NULL ) {
+        this->executor->wakeup();
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Tue Apr  7 23:59:07 2009
@@ -30,6 +30,7 @@
 #include <activemq/commands/ProducerId.h>
 #include <activemq/commands/TransactionId.h>
 #include <activemq/core/Dispatcher.h>
+#include <activemq/core/MessageDispatchChannel.h>
 #include <activemq/util/LongSequenceGenerator.h>
 
 #include <decaf/util/StlMap.h>
@@ -63,6 +64,8 @@
                                      ActiveMQProducer*,
                                      commands::ProducerId::COMPARATOR> ProducersMap;
 
+        friend class ActiveMQSessionExecutor;
+
     private:
 
         /**
@@ -130,16 +133,10 @@
         virtual ~ActiveMQSession();
 
         /**
-         * Looks up a consumer of this Session by a Pointer to its Id.
-         * @param id - a Pointer to a ConsumerId to match in the Map of Consumers.
-         */
-        ActiveMQConsumer* getConsumer( const decaf::lang::Pointer<commands::ConsumerId>& id );
-
-        /**
          * Redispatches the given set of unconsumed messages to the consumers.
          * @param unconsumedMessages - unconsumed messages to be redelivered.
          */
-        void redispatch( decaf::util::StlQueue<DispatchData>& unconsumedMessages );
+        void redispatch( MessageDispatchChannel& unconsumedMessages );
 
         /**
          * Stops asynchronous message delivery.
@@ -178,7 +175,7 @@
          * Dispatches a message to a particular consumer.
          * @param message - the message to be dispatched
          */
-        virtual void dispatch( DispatchData& message );
+        virtual void dispatch( const Pointer<MessageDispatch>& message );
 
     public:   // Implements Methods
 
@@ -473,6 +470,23 @@
          */
         void doStartTransaction() throw ( exceptions::ActiveMQException );
 
+        /**
+         * Request that this Session inform all of its consumers to deliver their pending
+         * acks.
+         */
+        void deliverAcks();
+
+        /**
+         * Request that this Session inform all of its consumers to clear all messages that
+         * are currently in progress.
+         */
+        void clearMessagesInProgress();
+
+        /**
+         * Causes the Session to wakeup its executer and ensure all messages are dispatched.
+         */
+        void wakeup();
+
    private:
 
        /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Tue Apr  7 23:59:07 2009
@@ -18,11 +18,14 @@
 #include "ActiveMQSessionExecutor.h"
 #include "ActiveMQSession.h"
 #include "ActiveMQConsumer.h"
+
 #include <activemq/commands/ConsumerInfo.h>
+#include <activemq/threads/DedicatedTaskRunner.h>
 
 using namespace std;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::threads;
 using namespace activemq::exceptions;
 using namespace decaf::lang;
 using namespace decaf::util;
@@ -32,8 +35,6 @@
 ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
 
     this->session = session;
-    this->closed = false;
-    this->started = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -42,6 +43,9 @@
     try {
 
         // Terminate the thread.
+        stop();
+
+        // Close out the Message Channel.
         close();
 
         // Empty the message queue and destroy any remaining messages.
@@ -51,119 +55,84 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::close() {
-
-    synchronized( &mutex ) {
-
-        closed = true;
-        mutex.notifyAll();
-    }
-
-    if( thread != NULL ) {
-        thread->join();
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::execute( DispatchData& data ) {
+void ActiveMQSessionExecutor::execute( const Pointer<MessageDispatch>& dispatch ) {
 
     // Add the data to the queue.
-    synchronized( &mutex ) {
-        messageQueue.push_back( data );
-        mutex.notifyAll();
-    }
+    this->messageQueue.enqueue( dispatch );
+    this->wakeup();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
+void ActiveMQSessionExecutor::executeFirst( const Pointer<MessageDispatch>& dispatch ) {
 
-    // Add the data to the front of the queue.
-    synchronized( &mutex ) {
-        messageQueue.push_front( data );
-        mutex.notifyAll();
-    }
+    // Add the data to the queue.
+    this->messageQueue.enqueueFirst( dispatch );
+    this->wakeup();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::purgeConsumerMessages(
-    const decaf::lang::Pointer<commands::ConsumerId>& consumerId )
-{
-    synchronized( &mutex ) {
+void ActiveMQSessionExecutor::wakeup() {
 
-        list<DispatchData>::iterator iter = messageQueue.begin();
-        while( iter != messageQueue.end() ) {
-            list<DispatchData>::iterator currentIter = iter;
-            DispatchData& dispatchData = *iter++;
-            if( consumerId->equals( *( dispatchData.getConsumerId() ) ) ) {
-                messageQueue.erase( currentIter );
-            }
+    Pointer<TaskRunner> taskRunner = this->taskRunner;
+    synchronized( &messageQueue ) {
+        if( this->taskRunner == NULL ) {
+            this->taskRunner.reset( new DedicatedTaskRunner( this ) );
         }
+
+        taskRunner = this->taskRunner;
     }
+
+    taskRunner->wakeup();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::start() {
 
-    synchronized( &mutex ) {
-
-        if( closed || started ) {
-            return;
-        }
-
-        started = true;
+    if( !messageQueue.isRunning() ) {
 
-        // Don't create the thread unless we need to.
-        if( thread == NULL ) {
-            thread.reset( new Thread( this ) );
-            thread->start();
+        messageQueue.start();
+        if( hasUncomsumedMessages() ) {
+            this->wakeup();
         }
-
-        mutex.notifyAll();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::stop() {
 
-    // We lock here to make sure that we wait until the thread
-    // is done with an internal dispatch operation, otherwise
-    // we might return before that and cause the caller to be
-    // in an inconsistent state.
-    synchronized( &dispatchMutex ) {
-
-        if( closed || !started ) {
-            return;
+    if( messageQueue.isRunning() ) {
+        messageQueue.stop();
+        Pointer<TaskRunner> taskRunner = this->taskRunner;
+        if( taskRunner != NULL ) {
+            this->taskRunner.reset( NULL );
+            taskRunner->shutdown();
         }
-
-        synchronized( &mutex ) { started = false; }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::clear() {
-
-    synchronized( &mutex ) {
-        this->messageQueue.clear();
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::dispatch( DispatchData& data ) {
+void ActiveMQSessionExecutor::dispatch( const Pointer<MessageDispatch>& dispatch ) {
 
     try {
 
-        ActiveMQConsumer* consumer = session->getConsumer( data.getConsumerId() );
+        ActiveMQConsumer* consumer = NULL;
 
-        // If the consumer is not available, just delete the message.
-        // Otherwise, dispatch the message to the consumer.
-        if( consumer != NULL ) {
-            consumer->dispatch( data );
+        synchronized( &( this->session->consumers ) ) {
+            if( this->session->consumers.containsKey( dispatch->getConsumerId() ) ) {
+                consumer = this->session->consumers.get( dispatch->getConsumerId() );
+            }
+
+            // If the consumer is not available, just ignore the message.
+            // Otherwise, dispatch the message to the consumer.
+            if( consumer != NULL ) {
+                consumer->dispatch( dispatch );
+            }
         }
 
-    } catch( ActiveMQException& ex ) {
+    } catch( decaf::lang::Exception& ex ) {
         ex.setMark(__FILE__, __LINE__ );
         ex.printStackTrace();
-    } catch( exception& ex ) {
+    } catch( std::exception& ex ) {
         ActiveMQException amqex( __FILE__, __LINE__, ex.what() );
         amqex.printStackTrace();
     } catch( ... ) {
@@ -173,72 +142,44 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::run() {
+bool ActiveMQSessionExecutor::iterate() {
 
     try {
 
-        while( true ) {
+        synchronized( &( this->session->consumers ) ) {
 
-            // Dispatch all currently available messages.
-            dispatchAll();
+            std::vector<ActiveMQConsumer*> consumers = this->session->consumers.values();
+            std::vector<ActiveMQConsumer*>::iterator iter = consumers.begin();
 
-            synchronized( &mutex ) {
-
-                // If we're closing down, exit the thread.
-                if( closed ) {
-                    return;
-                }
-
-                // When stopped we hit this case and wait otherwise
-                // if there are messages we
-                if( ( messageQueue.empty() || !started ) && !closed ) {
-
-                    // Wait for more data or to be woke up.
-                    mutex.wait();
+            // Deliver any messages queued on the consumer to their listeners.
+            for( ; iter != consumers.end(); ++iter ) {
+                if( (*iter)->iterate() ) {
+                    return true;
                 }
             }
         }
 
-    } catch( ActiveMQException& ex ) {
+        // No messages left queued on the listeners.. so now dispatch messages
+        // queued on the session
+        Pointer<MessageDispatch> message = messageQueue.dequeueNoWait();
+        if( message != NULL ) {
+            dispatch( message );
+            return !messageQueue.isEmpty();
+        }
+
+        return false;
+
+    } catch( decaf::lang::Exception& ex ) {
         ex.setMark(__FILE__, __LINE__ );
         session->fire( ex );
-    } catch( exception& stdex ) {
+        return true;
+    } catch( std::exception& stdex ) {
         ActiveMQException ex( __FILE__, __LINE__, stdex.what() );
         session->fire( ex );
+        return true;
     } catch( ... ) {
         ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception" );
         session->fire( ex );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::dispatchAll() {
-
-    // Dispatch all currently available messages.  This lock allows the
-    // main thread to wait while we finish with a dispatch cycle, the
-    // stop method for instance should try and lock this mutex so that
-    // it knows that we've had a chance to read the started flag and
-    // detect that we are stopped, otherwise stop might return while
-    // we are still dispatching messages.
-    synchronized( &dispatchMutex ) {
-
-        // Take out all of the dispatch data currently in the array.
-        list<DispatchData> dataList;
-        synchronized( &mutex ) {
-
-            // If stopped or closed we don't want to start dispatching.
-            if( !started || closed ) {
-                return;
-            }
-
-            dataList = messageQueue;
-            messageQueue.clear();
-        }
-
-        list<DispatchData>::iterator iter = dataList.begin();
-        while( iter != dataList.end() ) {
-            DispatchData& data = *iter++;
-            dispatch( data );
-        }
+        return true;
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Tue Apr  7 23:59:07 2009
@@ -19,20 +19,18 @@
 #define ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_
 
 #include <activemq/util/Config.h>
-#include <activemq/core/Dispatcher.h>
+#include <activemq/core/MessageDispatchChannel.h>
 #include <activemq/commands/ConsumerId.h>
-#include <decaf/lang/Thread.h>
-#include <decaf/lang/Runnable.h>
+#include <activemq/commands/MessageDispatch.h>
+#include <activemq/threads/Task.h>
+#include <activemq/threads/TaskRunner.h>
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/StlList.h>
-#include <vector>
-#include <list>
 
 namespace activemq{
 namespace core{
 
     using decaf::lang::Pointer;
+    using activemq::commands::MessageDispatch;
 
     class ActiveMQSession;
     class ActiveMQConsumer;
@@ -41,32 +39,17 @@
      * Delegate dispatcher for a single session.  Contains a thread
      * to provide for asynchronous dispatching.
      */
-    class AMQCPP_API ActiveMQSessionExecutor :
-        public Dispatcher,
-        public decaf::lang::Runnable
-    {
+    class AMQCPP_API ActiveMQSessionExecutor : activemq::threads::Task {
     private:
 
         /** Session that is this executors parent. */
         ActiveMQSession* session;
 
-        /** List used to hold messages waiting to be dispatched. */
-        std::list<DispatchData> messageQueue;
+        /** The Channel that holds the waiting Messages for Dispatching. */
+        MessageDispatchChannel messageQueue;
 
-        /** The Dispatcher Thread */
-        Pointer<decaf::lang::Thread> thread;
-
-        /** Mutex used to lock on access to the Message Queue */
-        decaf::util::concurrent::Mutex mutex;
-
-        /** Locks when messages are being dispatched to consumers. */
-        decaf::util::concurrent::Mutex dispatchMutex;
-
-        /** Has the Start method been called */
-        volatile bool started;
-
-        /** Has the Close method been called */
-        volatile bool closed;
+        /** The Dispatcher TaskRunner */
+        Pointer<activemq::threads::TaskRunner> taskRunner;
 
     public:
 
@@ -85,21 +68,33 @@
          * end of the queue.
          * @param data - the data to be dispatched.
          */
-        virtual void execute( DispatchData& data );
+        virtual void execute( const Pointer<MessageDispatch>& data );
 
         /**
          * Executes the dispatch.  Adds the given data to the
          * beginning of the queue.
          * @param data - the data to be dispatched.
          */
-        virtual void executeFirst( DispatchData& data );
+        virtual void executeFirst( const Pointer<MessageDispatch>& data );
+
+        /**
+         * Removes all messages in the Dispatch Channel so that non are delivered.
+         */
+        virtual void clearMessagesInProgress() {
+            this->messageQueue.clear();
+        }
+
+        /**
+         * @return true if there are any pending messages in the dispatch channel.
+         */
+        virtual bool hasUncomsumedMessages() const {
+            return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
+        }
 
         /**
-         * Removes all messages for the given consumer from the.
-         * @param consumerId - the subject consumer
+         * wakeup this executer and dispatch any pending messages.
          */
-        virtual void purgeConsumerMessages(
-            const decaf::lang::Pointer<commands::ConsumerId>& consumerId );
+        virtual void wakeup();
 
         /**
          * Starts the dispatching.
@@ -115,39 +110,55 @@
          * Terminates the dispatching thread.  Once this is called, the executor is no longer
          * usable.
          */
-        virtual void close();
+        virtual void close() {
+            this->messageQueue.close();
+        }
+
+        /**
+         * @return true indicates if the executor is started
+         */
+        virtual bool isRunning() const {
+            return this->messageQueue.isRunning();
+        }
 
         /**
-         * Indicates if the executor is started
+         * @return true if there are no messages in the Dispatch Channel.
          */
-        virtual bool isStarted() const {
-            return started;
+        virtual bool isEmpty() {
+            return messageQueue.isEmpty();
         }
 
         /**
          * Removes all queued messages and destroys them.
          */
-        virtual void clear();
+        virtual void clear() {
+            this->messageQueue.clear();
+        }
 
         /**
-         * Dispatches a message to a particular consumer.
-         * @param consumer - The consumer to dispatch to.
-         * @param msg - The message to be dispatched.
+         * Iterates on the MessageDispatchChannel sending all pending messages
+         * to the Consumers they are destined for.
+         *
+         * @return false if there are no more messages to dispatch.
          */
-        virtual void dispatch( DispatchData& data );
+        virtual bool iterate();
 
         /**
-         * Run method - called by the Thread class in the context
-         * of the thread.
+         * @returns a vector containing all the unconsumed messages, this clears the
+         *          Message Dispatch Channel when called.
          */
-        virtual void run();
+        std::vector< Pointer<MessageDispatch> > getUnconsumedMessages() {
+            return messageQueue.removeAll();
+        }
 
     private:
 
         /**
-         * Dispatches all messages currently in the queue.
+         * Dispatches a message to a particular consumer.
+         * @param data - The message to be dispatched.
          */
-        void dispatchAll();
+        virtual void dispatch( const Pointer<MessageDispatch>& data );
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Tue Apr  7 23:59:07 2009
@@ -89,9 +89,12 @@
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
+
         if( !isInTransaction() ) {
 
-            this->synchronizations.clear();
+            synchronized( &synchronizations ) {
+                this->synchronizations.clear();
+            }
 
             // Create the Id
             Pointer<LocalTransactionId> id( new LocalTransactionId() );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h Tue Apr  7 23:59:07 2009
@@ -18,28 +18,33 @@
 #ifndef ACTIVEMQ_CORE_DISPATCHER_H_
 #define ACTIVEMQ_CORE_DISPATCHER_H_
 
-#include <activemq/core/DispatchData.h>
+#include <activemq/commands/MessageDispatch.h>
 #include <activemq/util/Config.h>
+#include <decaf/lang/Pointer.h>
 
 namespace activemq{
 namespace core{
-    
+
+    using decaf::lang::Pointer;
+    using activemq::commands::MessageDispatch;
+
     /**
-     * Interface for an object responsible for dispatching messages to 
+     * Interface for an object responsible for dispatching messages to
      * consumers.
      */
     class AMQCPP_API Dispatcher {
     public:
-    
+
         virtual ~Dispatcher(){}
-        
+
         /**
          * Dispatches a message to a particular consumer.
          * @param message - the message to be dispatched.
          */
-        virtual void dispatch( DispatchData& message ) = 0;
+        virtual void dispatch( const Pointer<MessageDispatch>& message ) = 0;
+
     };
-    
+
 }}
 
 #endif /*ACTIVEMQ_CORE_DISPATCHER_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp Tue Apr  7 23:59:07 2009
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageDispatchChannel.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+MessageDispatchChannel::MessageDispatchChannel() {
+
+    this->running = false;
+    this->closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MessageDispatchChannel::~MessageDispatchChannel() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::enqueue( const Pointer<MessageDispatch>& message ) {
+    synchronized( &channel ) {
+        channel.push( message );
+        channel.notify();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::enqueueFirst( const Pointer<MessageDispatch>& message ) {
+    synchronized( &channel ) {
+        channel.enqueueFront( message );
+        channel.notify();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageDispatchChannel::isEmpty() const {
+    synchronized( &channel ) {
+        return channel.empty();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> MessageDispatchChannel::dequeue( long long timeout )
+    throw( exceptions::ActiveMQException ) {
+
+    synchronized( &channel ) {
+        // Wait until the channel is ready to deliver messages.
+        while( timeout != 0 && !closed && ( channel.empty() || !running ) ) {
+            if( timeout == -1 ) {
+                channel.wait();
+            } else {
+                channel.wait( timeout );
+                break;
+            }
+        }
+
+        if( closed || !running || channel.empty() ) {
+            return Pointer<MessageDispatch>();
+        }
+
+        return channel.pop();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> MessageDispatchChannel::dequeueNoWait() {
+    synchronized( &channel ) {
+        if( closed || !running || channel.empty() ) {
+            return Pointer<MessageDispatch>();
+        }
+        return channel.pop();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> MessageDispatchChannel::peek() const {
+    synchronized( &channel ) {
+        if( closed || !running || channel.empty() ) {
+            return Pointer<MessageDispatch>();
+        }
+        return channel.front();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::start() {
+    synchronized( &channel ) {
+        if( !closed ) {
+            running = true;
+            channel.notifyAll();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::stop() {
+    synchronized( &channel ) {
+        running = false;
+        channel.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::close() {
+    synchronized( &channel ) {
+        if( !closed ) {
+            running = false;
+            closed = true;
+        }
+        channel.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::clear() {
+    synchronized( &channel ) {
+        channel.clear();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int MessageDispatchChannel::size() const {
+    synchronized( &channel ) {
+        return channel.size();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector< Pointer<MessageDispatch> > MessageDispatchChannel::removeAll() {
+    synchronized( &channel ) {
+        std::vector< Pointer<MessageDispatch> > result = channel.toArray();
+        channel.clear();
+        return result;
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h Tue Apr  7 23:59:07 2009
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNEL_H_
+#define _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNEL_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/commands/MessageDispatch.h>
+
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/Synchronizable.h>
+#include <decaf/util/StlQueue.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace core {
+
+    using decaf::lang::Pointer;
+    using activemq::commands::MessageDispatch;
+
+    class AMQCPP_API MessageDispatchChannel : public decaf::util::concurrent::Synchronizable {
+    private:
+
+        bool closed;
+        bool running;
+
+        mutable decaf::util::StlQueue< Pointer<MessageDispatch> > channel;
+
+    public:
+
+        MessageDispatchChannel();
+        virtual ~MessageDispatchChannel();
+
+        /**
+         * Add a Message to the Channel behind all pending message.
+         *
+         * @param message - The message to add to the Channel.
+         */
+        void enqueue( const Pointer<MessageDispatch>& message );
+
+        /**
+         * Add a message to the front of the Channel.
+         *
+         * @param message - The Message to add to the front of the Channel.
+         */
+        void enqueueFirst( const Pointer<MessageDispatch>& message );
+
+        /**
+         * @return true if there are no messages in the Channel.
+         */
+        bool isEmpty() const;
+
+        /**
+         * @return has the Queue been closed.
+         */
+        bool isClosed() const {
+            return this->closed;
+        }
+
+        /**
+         * @return true if the Channel currently running and will dequeue message.
+         */
+        bool isRunning() const {
+            return this->running;
+        }
+
+        /**
+         * Used to get an enqueued message. The amount of time this method blocks is
+         * based on the timeout value. - if timeout==-1 then it blocks until a
+         * message is received. - if timeout==0 then it it tries to not block at
+         * all, it returns a message if it is available - if timeout>0 then it
+         * blocks up to timeout amount of time. Expired messages will consumed by
+         * this method.
+         *
+         * @return null if we timeout or if the consumer is closed.
+         * @throws ActiveMQException
+         */
+        Pointer<MessageDispatch> dequeue( long long timeout )
+            throw( exceptions::ActiveMQException );
+
+        /**
+         * Used to get an enqueued message if there is one queued right now.  If there is
+         * no waiting message than this method returns Null.
+         *
+         * @return a message if there is one in the queue.
+         */
+        Pointer<MessageDispatch> dequeueNoWait();
+
+        /**
+         * Peek in the Queue and return the first message in the Channel without removing
+         * it from the channel.
+         *
+         * @return a message if there is one in the queue.
+         */
+        Pointer<MessageDispatch> peek() const;
+
+        /**
+         * Starts dispatch of messages from the Channel.
+         */
+        void start();
+
+        /**
+         * Stops dispatch of message from the Channel.
+         */
+        void stop();
+
+        /**
+         * Close this channel no messages will be dispatched after this method is called.
+         */
+        void close();
+
+        /**
+         * Clear the Channel, all pending messages are removed.
+         */
+        void clear();
+
+        /**
+         * @return the number of Messages currently in the Channel.
+         */
+        int size() const;
+
+        /**
+         * Remove all messages that are currently in the Channel and return them as
+         * a list of Messages.
+         *
+         * @return a list of Messages that was previously in the Channel.
+         */
+        std::vector< Pointer<MessageDispatch> > removeAll();
+
+    public:
+
+        /**
+         * Locks the object.
+         */
+        virtual void lock() throw( decaf::lang::Exception ){
+            channel.lock();
+        }
+
+        /**
+         * Unlocks the object.
+         */
+        virtual void unlock() throw( decaf::lang::Exception ){
+            channel.unlock();
+        }
+
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.
+         */
+        virtual void wait() throw( decaf::lang::Exception ){
+            channel.wait();
+        }
+
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.  This wait will timeout after the specified time
+         * interval.
+         * @param millisecs time to wait, or WAIT_INIFINITE
+         * @throws ActiveMQException
+         */
+        virtual void wait( unsigned long millisecs ) throw( decaf::lang::Exception ) {
+            channel.wait( millisecs );
+        }
+
+        /**
+         * Signals a waiter on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         */
+        virtual void notify() throw( decaf::lang::Exception ){
+            channel.notify();
+        }
+
+        /**
+         * Signals the waiters on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         */
+        virtual void notifyAll() throw( decaf::lang::Exception ){
+            channel.notifyAll();
+        }
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNEL_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Tue Apr  7 23:59:07 2009
@@ -111,6 +111,7 @@
     activemq/core/ActiveMQSessionTest.cpp \
     activemq/core/ActiveMQConnectionFactoryTest.cpp \
     activemq/core/ActiveMQConnectionTest.cpp \
+    activemq/core/MessageDispatchChannelTest.cpp \
     activemq/commands/ActiveMQTextMessageTest.cpp \
     activemq/commands/BrokerIdTest.cpp \
     activemq/commands/BrokerInfoTest.cpp \
@@ -226,6 +227,7 @@
     activemq/core/ActiveMQConnectionFactoryTest.h \
     activemq/core/ActiveMQConnectionTest.h \
     activemq/core/ActiveMQSessionTest.h \
+    activemq/core/MessageDispatchChannelTest.h \
     activemq/commands/ActiveMQTempTopicTest.h \
     activemq/commands/ActiveMQQueueTest.h \
     activemq/commands/ActiveMQBytesMessageTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp Tue Apr  7 23:59:07 2009
@@ -24,10 +24,16 @@
 #include <decaf/lang/Pointer.h>
 #include <activemq/core/ActiveMQConnectionFactory.h>
 #include <activemq/core/ActiveMQConnection.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/DefaultTransportListener.h>
 #include <activemq/transport/mock/MockTransport.h>
 #include <activemq/transport/mock/MockTransportFactory.h>
 #include <activemq/transport/TransportRegistry.h>
 #include <activemq/util/Config.h>
+#include <activemq/commands/Message.h>
+
+#include <cms/Connection.h>
+#include <cms/ExceptionListener.h>
 
 using namespace activemq;
 using namespace activemq::core;
@@ -36,6 +42,58 @@
 using namespace decaf::util;
 using namespace decaf::lang;
 
+namespace activemq {
+namespace core{
+
+    class MyCommandListener : public transport::DefaultTransportListener{
+    public:
+
+        commands::Command* cmd;
+
+    public:
+
+        MyCommandListener(){
+            cmd = NULL;
+        }
+        virtual ~MyCommandListener(){}
+
+        virtual void onCommand( commands::Command* command ){
+            cmd = command;
+        }
+    };
+
+    class MyExceptionListener : public cms::ExceptionListener{
+    public:
+
+        bool caughtOne;
+
+    public:
+
+        MyExceptionListener(){ caughtOne = false; }
+        virtual ~MyExceptionListener(){}
+
+        virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED){
+            caughtOne = true;
+        }
+    };
+
+    class MyDispatcher : public Dispatcher
+    {
+    public:
+
+        std::vector< decaf::lang::Pointer<commands::Message> > messages;
+
+    public:
+        virtual ~MyDispatcher(){}
+
+        virtual void dispatch( const decaf::lang::Pointer<commands::MessageDispatch>& data )
+            throw ( exceptions::ActiveMQException )
+        {
+            messages.push_back( data->getMessage() );
+        }
+    };
+}}
+
 ////////////////////////////////////////////////////////////////////////////////
 //void ActiveMQConnectionTest::test1WithStomp()
 //{

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h Tue Apr  7 23:59:07 2009
@@ -21,15 +21,6 @@
 #include <cppunit/TestFixture.h>
 #include <cppunit/extensions/HelperMacros.h>
 
-#include <cms/Connection.h>
-#include <cms/ExceptionListener.h>
-
-#include <activemq/transport/Transport.h>
-#include <activemq/transport/DefaultTransportListener.h>
-#include <activemq/util/Config.h>
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/commands/Message.h>
-
 namespace activemq{
 namespace core{
 
@@ -46,54 +37,6 @@
         ActiveMQConnectionTest() {};
         virtual ~ActiveMQConnectionTest() {}
 
-        class MyCommandListener : public transport::DefaultTransportListener{
-        public:
-
-            commands::Command* cmd;
-
-        public:
-
-            MyCommandListener(){
-                cmd = NULL;
-            }
-            virtual ~MyCommandListener(){}
-
-            virtual void onCommand( commands::Command* command ){
-                cmd = command;
-            }
-        };
-
-        class MyExceptionListener : public cms::ExceptionListener{
-        public:
-
-            bool caughtOne;
-
-        public:
-
-            MyExceptionListener(){ caughtOne = false; }
-            virtual ~MyExceptionListener(){}
-
-            virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED){
-                caughtOne = true;
-            }
-        };
-
-        class MyDispatcher : public Dispatcher
-        {
-        public:
-
-            std::vector< decaf::lang::Pointer<commands::Message> > messages;
-
-        public:
-            virtual ~MyDispatcher(){}
-
-            virtual void dispatch( DispatchData& data )
-                throw ( exceptions::ActiveMQException )
-            {
-                messages.push_back( data.getMessage() );
-            }
-        };
-
 //        void test1WithStomp();
 //        void test2WithStomp();
         void test2WithOpenwire();



Mime
View raw message