Author: tabish
Date: Tue Apr 7 23:59:07 2009
New Revision: 763046
URL: http://svn.apache.org/viewvc?rev=763046&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-209
Major refactoring of the dispatching code, messages are dispatched one at a time instead of grabbing the dispatch thread for long periods. The logic in the consumer deals better with rollbacks
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h (with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Tue Apr 7 23:59:07 2009
@@ -337,6 +337,7 @@
activemq/core/ActiveMQTransactionContext.cpp \
activemq/core/ActiveMQSession.cpp \
activemq/core/ActiveMQConnectionSupport.cpp \
+ activemq/core/MessageDispatchChannel.cpp \
activemq/core/ActiveMQConnectionFactory.cpp \
activemq/core/ActiveMQProducer.cpp \
activemq/commands/ActiveMQQueue.cpp \
@@ -877,6 +878,7 @@
activemq/core/ActiveMQConnection.h \
activemq/core/ActiveMQConstants.h \
activemq/core/ActiveMQConnectionFactory.h \
+ activemq/core/MessageDispatchChannel.h \
activemq/core/ActiveMQSession.h \
activemq/core/Synchronization.h \
activemq/core/ActiveMQConnectionMetaData.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Tue Apr 7 23:59:07 2009
@@ -67,8 +67,6 @@
ActiveMQConnectionSupport( transport, properties ),
connectionMetaData( new ActiveMQConnectionMetaData() ),
connectionInfo( new ConnectionInfo() ),
- started( false ),
- closed( false ),
exceptionListener( NULL ) {
// Register for messages and exceptions from the connector.
@@ -154,7 +152,7 @@
}
// If we're already started, start the session.
- if( started ) {
+ if( this->started.get() ) {
session->start();
}
@@ -245,8 +243,8 @@
// Once current deliveries are done this stops the delivery
// of any new messages.
- this->started = false;
- this->closed = true;
+ this->started.set( false );
+ this->closed.set( true );
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -260,7 +258,7 @@
// This starts or restarts the delivery of all incomming messages
// messages delivered while this connection is stopped are dropped
// and not acknowledged.
- started = true;
+ this->started.set( true );
// Start all the sessions.
std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
@@ -280,7 +278,7 @@
// Once current deliveries are done this stops the delivery of any
// new messages.
- started = false;
+ this->started.set( false );
std::auto_ptr< Iterator<ActiveMQSession*> > iter( activeSessions.iterator() );
@@ -432,8 +430,7 @@
if( command->isMessageDispatch() ) {
- MessageDispatch* dispatch =
- dynamic_cast<MessageDispatch*>( command.get() );
+ Pointer<MessageDispatch> dispatch = command.dynamicCast<MessageDispatch>();
// Check fo an empty Message, shouldn't ever happen but who knows.
if( dispatch->getMessage() == NULL ) {
@@ -453,13 +450,10 @@
// just closed.
if( dispatcher != NULL ) {
- Pointer<commands::Message> message = dispatch->getMessage();
- message->setReadOnlyBody( true );
- message->setReadOnlyProperties( true );
-
- // Dispatch the message.
- DispatchData data( dispatch->getConsumerId(), message );
- dispatcher->dispatch( data );
+ dispatch->getMessage()->setReadOnlyBody( true );
+ dispatch->getMessage()->setReadOnlyProperties( true );
+
+ dispatcher->dispatch( dispatch );
}
}
@@ -529,6 +523,18 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::transportInterrupted() {
+
+ synchronized( &activeSessions ) {
+ std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->activeSessions.iterator() );
+
+ while( iter->hasNext() ) {
+ iter->next()->clearMessagesInProgress();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::oneway( Pointer<Command> command )
throw ( ActiveMQException ) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Tue Apr 7 23:59:07 2009
@@ -34,6 +34,7 @@
#include <decaf/util/Properties.h>
#include <decaf/util/StlMap.h>
#include <decaf/util/StlSet.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
@@ -45,6 +46,7 @@
namespace core{
using decaf::lang::Pointer;
+ using decaf::util::concurrent::atomic::AtomicBoolean;
class ActiveMQSession;
class ActiveMQProducer;
@@ -86,13 +88,13 @@
/**
* Indicates if this Connection is started
*/
- bool started;
+ AtomicBoolean started;
/**
* Indicates that this connection has been closed, it is no longer
* usable after this becomes true
*/
- bool closed;
+ AtomicBoolean closed;
/**
* Map of message dispatchers indexed by consumer id.
@@ -191,7 +193,15 @@
* @return true if the connection is closed
*/
bool isClosed() const {
- return this->closed;
+ return this->closed.get();
+ }
+
+ /**
+ * Check if this connection has been started.
+ * @return true if the start method has been called.
+ */
+ bool isStarted() const {
+ return this->started.get();
}
/**
@@ -315,7 +325,7 @@
exceptionListener = listener;
};
- public: // commands::CommandListener
+ public: // TransportListener
/**
* Event handler for the receipt of a non-response command from the
@@ -324,8 +334,6 @@
*/
virtual void onCommand( const Pointer<commands::Command>& command );
- public: // TransportExceptionListener
-
/**
* Event handler for an exception from a command transport.
* @param source The source of the exception
@@ -333,6 +341,11 @@
*/
virtual void onException( const decaf::lang::Exception& ex );
+ /**
+ * The transport has suffered an interruption from which it hopes to recover
+ */
+ virtual void transportInterrupted();
+
public:
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h Tue Apr 7 23:59:07 2009
@@ -302,11 +302,6 @@
}
/**
- * The transport has suffered an interruption from which it hopes to recover
- */
- virtual void transportInterrupted() {}
-
- /**
* The transport has resumed after an interruption
*/
virtual void transportResumed() {}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Tue Apr 7 23:59:07 2009
@@ -19,8 +19,8 @@
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/util/Date.h>
#include <decaf/lang/Math.h>
+#include <decaf/lang/System.h>
#include <activemq/util/Config.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/Message.h>
@@ -134,12 +134,13 @@
this->session = session;
this->transaction = transaction;
this->consumerInfo = consumerInfo;
- this->closed = false;
this->lastDeliveredSequenceId = 0;
this->synchronizationRegistered = false;
this->additionalWindowSize = 0;
this->redeliveryDelay = 0;
this->deliveredCounter = 0;
+ this->clearDispatchList = false;
+ this->listener = NULL;
}
////////////////////////////////////////////////////////////////////////////////
@@ -153,11 +154,29 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::start() {
+
+ if( this->unconsumedMessages.isClosed() ) {
+ return;
+ }
+
+ this->started.set( true );
+ this->unconsumedMessages.start();
+ this->session->wakeup();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::stop() {
+ this->started.set( false );
+ this->unconsumedMessages.stop();
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::close()
throw ( cms::CMSException ) {
try{
- if( !closed ) {
+ if( !this->isClosed() ) {
if( this->transaction != NULL && this->transaction->isInTransaction() ) {
// TODO - Currently we can do this since the consumer could be
@@ -167,8 +186,13 @@
//
//Pointer<Synchronization> sync( new CloseSynhcronization( this ) );
//this->transaction->addSynchronization( sync );
+
doClose();
+ throw UnsupportedOperationException(
+ __FILE__, __LINE__,
+ "The Consumer is still in an Active Transaction, commit it first." );
+
} else {
doClose();
}
@@ -192,7 +216,7 @@
// remove it from the Broker.
this->session->disposeOf( this->consumerInfo->getConsumerId() );
- this->closed = true;
+ this->started.set( false );
// Identifies any errors encountered during shutdown.
bool haveException = false;
@@ -200,7 +224,7 @@
// Purge all the pending messages
try{
- purgeMessages();
+ unconsumedMessages.clear();
} catch ( ActiveMQException& ex ){
if( !haveException ){
ex.setMark( __FILE__, __LINE__ );
@@ -209,10 +233,8 @@
}
}
- // Wakeup any synchronous consumers.
- synchronized( &unconsumedMessages ) {
- unconsumedMessages.notifyAll();
- }
+ // Stop and Wakeup all sync consumers.
+ unconsumedMessages.close();
// If we encountered an error, propagate it.
if( haveException ){
@@ -238,66 +260,47 @@
}
////////////////////////////////////////////////////////////////////////////////
-decaf::lang::Pointer<commands::Message> ActiveMQConsumer::dequeue( int timeout )
+decaf::lang::Pointer<MessageDispatch> ActiveMQConsumer::dequeue( int timeout )
throw ( cms::CMSException ) {
try {
this->checkClosed();
- synchronized( &unconsumedMessages ) {
-
- // Calculate the deadline
- long long deadline = 0;
- if( timeout > 0 ) {
- deadline = Date::getCurrentTimeMilliseconds() + timeout;
- }
-
- // Loop until the time is up or we get a non-expired message
- while( true ) {
-
- // Wait until either the deadline is met, a message arrives, or
- // we've closed.
- while( !closed && unconsumedMessages.empty() && timeout != 0 ) {
-
- if( timeout < 0 ) {
- unconsumedMessages.wait();
- } else if( timeout > 0 ) {
- unconsumedMessages.wait(timeout);
- timeout = std::max((int)(deadline - Date::getCurrentTimeMilliseconds()), 0);
- }
- }
-
- if( unconsumedMessages.empty() ) {
- break;
+ // Calculate the deadline
+ long long deadline = 0;
+ if( timeout > 0 ) {
+ deadline = Date::getCurrentTimeMilliseconds() + timeout;
+ }
+
+ // Loop until the time is up or we get a non-expired message
+ while( true ) {
+
+ Pointer<MessageDispatch> dispatch = unconsumedMessages.dequeue( timeout );
+ if( dispatch == NULL ) {
+
+ if( timeout > 0 && !unconsumedMessages.isClosed() ) {
+ timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
+ } else {
+ return Pointer<MessageDispatch>();
}
- // Fetch the Message then copy it so it can be handed off
- // to the user.
- DispatchData data = unconsumedMessages.pop();
-
- Pointer<Message> message = data.getMessage();
-
- // If it's expired, process the message and then go back to waiting.
- if( message->isExpired() ) {
-
- beforeMessageIsConsumed( message );
- afterMessageIsConsumed( message, true );
- if( timeout > 0 ) {
- timeout = std::max(
- (int)( deadline - Date::getCurrentTimeMilliseconds() ), 0 );
- }
+ } else if( dispatch->getMessage()->isExpired() ) {
- // Go back to waiting for a non-expired message.
- continue;
+ beforeMessageIsConsumed( dispatch );
+ afterMessageIsConsumed( dispatch, true );
+ if( timeout > 0 ) {
+ timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
}
- // Return the message.
- return message;
+ continue;
}
+
+ // Return the message.
+ return dispatch;
}
- return Pointer<Message>();
+ return Pointer<MessageDispatch>();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -313,18 +316,18 @@
this->sendPullRequest( 0 );
// Wait for the next message.
- Pointer<Message> message = dequeue( -1 );
+ Pointer<MessageDispatch> message = dequeue( -1 );
if( message == NULL ) {
return NULL;
}
- // Message preprocessing
+ // Message pre-processing
beforeMessageIsConsumed( message );
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMessage =
- dynamic_cast<cms::Message*>( message->cloneDataStructure() );
+ dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( message, false );
@@ -347,7 +350,7 @@
this->sendPullRequest( millisecs );
// Wait for the next message.
- Pointer<Message> message = dequeue( millisecs );
+ Pointer<MessageDispatch> message = dequeue( millisecs );
if( message == NULL ) {
return NULL;
}
@@ -358,7 +361,7 @@
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMessage =
- dynamic_cast<cms::Message*>( message->cloneDataStructure() );
+ dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( message, false );
@@ -381,7 +384,7 @@
this->sendPullRequest( -1 );
// Get the next available message, if there is one.
- Pointer<Message> message = dequeue( 0 );
+ Pointer<MessageDispatch> message = dequeue( 0 );
if( message == NULL ) {
return NULL;
}
@@ -392,7 +395,7 @@
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMessage =
- dynamic_cast<cms::Message*>( message->cloneDataStructure() );
+ dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( message, false );
@@ -426,7 +429,9 @@
session->stop();
}
- this->listener.set( listener );
+ synchronized( &listenerMutex ) {
+ this->listener = listener;
+ }
session->redispatch( unconsumedMessages );
@@ -434,7 +439,9 @@
session->start();
}
} else {
- this->listener.set( NULL );
+ synchronized( &listenerMutex ) {
+ this->listener = NULL;
+ }
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -443,7 +450,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<Message>& message ) {
+void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<MessageDispatch>& dispatch ) {
// If the Session is in ClientAcknowledge mode, then we set the
// handler in the message to this object and send it out. Otherwise
@@ -452,13 +459,14 @@
// Register ourself so that we can handle the Message's
// acknowledge method.
- message->setAckHandler( this );
+ dispatch->getMessage()->setAckHandler( this );
}
- this->lastDeliveredSequenceId = message->getMessageId()->getBrokerSequenceId();
+ this->lastDeliveredSequenceId =
+ dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
synchronized( &dispatchedMessages ) {
- dispatchedMessages.enqueueFront( message );
+ dispatchedMessages.enqueueFront( dispatch );
}
// If the session is transacted then we hand off the message to it to
@@ -473,16 +481,20 @@
"In a Transacted Session but no Transaction Context set." );
}
- ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+ ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<Message>& message,
+void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<MessageDispatch>& message,
bool messageExpired AMQCPP_UNUSED ) {
try{
+ if( unconsumedMessages.isClosed() ) {
+ return;
+ }
+
if( messageExpired == true ) {
ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
}
@@ -557,7 +569,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::ackLater( const Pointer<Message>& message, int ackType )
+void ActiveMQConsumer::ackLater( const Pointer<MessageDispatch>& dispatch, int ackType )
throw ( ActiveMQException ) {
// Don't acknowledge now, but we may need to let the broker know the
@@ -578,10 +590,10 @@
Pointer<MessageAck> oldPendingAck = pendingAck;
pendingAck.reset( new MessageAck() );
- pendingAck->setConsumerId( this->consumerInfo->getConsumerId() );
+ pendingAck->setConsumerId( dispatch->getConsumerId() );
pendingAck->setAckType( ackType );
- pendingAck->setDestination( message->getDestination() );
- pendingAck->setLastMessageId( message->getMessageId() );
+ pendingAck->setDestination( dispatch->getDestination() );
+ pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() );
pendingAck->setMessageCount( deliveredCounter );
if( oldPendingAck == NULL ) {
@@ -614,14 +626,14 @@
if( !dispatchedMessages.empty() ) {
- Pointer<Message> message = dispatchedMessages.front();
+ Pointer<Message> message = dispatchedMessages.front()->getMessage();
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( type );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( message->getDestination() );
ack->setMessageCount( (int)dispatchedMessages.size() );
ack->setLastMessageId( message->getMessageId() );
- ack->setFirstMessageId( dispatchedMessages.back()->getMessageId() );
+ ack->setFirstMessageId( dispatchedMessages.back()->getMessage()->getMessageId() );
return ack;
}
@@ -701,18 +713,19 @@
}
// Only increase the redelivery delay after the first redelivery..
- Pointer<Message> lastMsg = dispatchedMessages.front();
+ Pointer<Message> lastMsg = dispatchedMessages.front()->getMessage();
const int currentRedeliveryCount = lastMsg->getRedeliveryCounter();
if( currentRedeliveryCount > 0 ) {
redeliveryDelay = transaction->getRedeliveryDelay();
}
- Pointer<MessageId> firstMsgId = dispatchedMessages.back()->getMessageId();
+ Pointer<MessageId> firstMsgId =
+ dispatchedMessages.back()->getMessage()->getMessageId();
- std::auto_ptr< Iterator< Pointer<Message> > > iter( dispatchedMessages.iterator() );
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( dispatchedMessages.iterator() );
while( iter->hasNext() ) {
- Pointer<Message> message = iter->next();
+ Pointer<Message> message = iter->next()->getMessage();
message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 );
}
@@ -749,11 +762,32 @@
session->oneway( ack );
}
- std::auto_ptr< Iterator< Pointer<Message> > > iter( dispatchedMessages.iterator() );
+ // stop the delivery of messages.
+ unconsumedMessages.stop();
+
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( dispatchedMessages.iterator() );
while( iter->hasNext() ) {
- DispatchData dispatch( this->consumerInfo->getConsumerId(), iter->next() );
- unconsumedMessages.enqueueFront( dispatch );
+ unconsumedMessages.enqueueFirst( iter->next() );
+ }
+
+ if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
+ // TODO
+ // Start up the delivery again a little later.
+ //scheduler.executeAfterDelay(new Runnable() {
+ // public void run() {
+ // try {
+ // if( !started.get() ) {
+ // start();
+ // }
+ // } catch( CMSException& e ) {
+ // session.connection.onAsyncException(e);
+ // }
+ // }
+ //}, redeliveryDelay);
+ start();
+ } else {
+ start();
}
}
@@ -762,61 +796,56 @@
}
}
- if( this->listener.get() != NULL ) {
+ if( this->listener != NULL ) {
session->redispatch( unconsumedMessages );
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::dispatch( DispatchData& data ) {
+void ActiveMQConsumer::dispatch( const Pointer<MessageDispatch>& dispatch ) {
try {
- Pointer<Message> message = data.getMessage();
-
- // Don't dispatch expired messages, ack it and then destroy it
- if( message->isExpired() ) {
- this->ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
-
- // stop now, don't queue
- return;
- }
+ synchronized( &unconsumedMessages ) {
- cms::MessageListener* cmsListener = this->listener.get();
+ if( this->clearDispatchList ) {
+ // we are reconnecting so lets flush the in progress
+ // messages
+ clearDispatchList = false;
+ unconsumedMessages.clear();
+ }
- // If we have a listener, send the message.
- if( cmsListener != NULL ) {
+ if( !unconsumedMessages.isClosed() ) {
- // Preprocessing.
- beforeMessageIsConsumed( message );
+ // Don't dispatch expired messages, ack it and then destroy it
+ if( dispatch->getMessage()->isExpired() ) {
+ this->ackLater( dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED );
- // Notify the listener
- cmsListener->onMessage( dynamic_cast<cms::Message*>( message.get() ) );
+ // stop now, don't queue
+ return;
+ }
- // Postprocessing
- afterMessageIsConsumed( message, false );
+ synchronized( &listenerMutex ) {
+ // If we have a listener, send the message.
+ if( this->listener != NULL && unconsumedMessages.isRunning() ) {
- } else {
+ // Preprocessing.
+ beforeMessageIsConsumed( dispatch );
- // No listener, add it to the unconsumed messages list
- synchronized( &unconsumedMessages ) {
- unconsumedMessages.push( data );
- unconsumedMessages.notifyAll();
- }
- }
- }
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
-}
+ // Notify the listener
+ this->listener->onMessage(
+ dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException ) {
+ // Postprocessing
+ afterMessageIsConsumed( dispatch, false );
- try {
+ } else {
- synchronized( &this->unconsumedMessages ) {
- this->unconsumedMessages.clear();
+ // No listener, add it to the unconsumed messages list
+ this->unconsumedMessages.enqueue( dispatch );
+ }
+ }
+ }
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -833,7 +862,7 @@
this->checkClosed();
// There are still local message, consume them first.
- if( !this->unconsumedMessages.empty() ) {
+ if( !this->unconsumedMessages.isEmpty() ) {
return;
}
@@ -860,3 +889,42 @@
"ActiveMQConsumer - Consumer Already Closed" );
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::iterate() {
+
+ synchronized( &listenerMutex ) {
+
+ if( this->listener != NULL ) {
+
+ Pointer<MessageDispatch> dispatch = unconsumedMessages.dequeueNoWait();
+ if( dispatch != NULL ) {
+
+ try {
+ beforeMessageIsConsumed( dispatch );
+ this->listener->onMessage(
+ dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
+ afterMessageIsConsumed( dispatch, false );
+ } catch( ActiveMQException& ex ) {
+ this->session->fire( ex );
+ }
+
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::clearMessagesInProgress() {
+ // we are called from inside the transport reconnection logic
+ // which involves us clearing all the connections' consumers
+ // dispatch lists and clearing them
+ // so rather than trying to grab a mutex (which could be already
+ // owned by the message listener calling the send) we will just set
+ // a flag so that the list can be cleared as soon as the
+ // dispatch thread is ready to flush the dispatch list
+ clearDispatchList = true;
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Tue Apr 7 23:59:07 2009
@@ -26,11 +26,12 @@
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/ConsumerInfo.h>
#include <activemq/commands/MessageAck.h>
+#include <activemq/commands/MessageDispatch.h>
#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/core/ActiveMQTransactionContext.h>
#include <activemq/core/Dispatcher.h>
+#include <activemq/core/MessageDispatchChannel.h>
-#include <decaf/util/concurrent/atomic/AtomicReference.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/StlQueue.h>
@@ -41,15 +42,14 @@
namespace core{
using decaf::lang::Pointer;
- using decaf::util::concurrent::atomic::AtomicReference;
using decaf::util::concurrent::atomic::AtomicBoolean;
+ using activemq::core::MessageDispatchChannel;
class ActiveMQSession;
- class AMQCPP_API ActiveMQConsumer :
- public cms::MessageConsumer,
- public ActiveMQAckHandler,
- public Dispatcher
+ class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
+ public ActiveMQAckHandler,
+ public Dispatcher
{
private:
@@ -71,7 +71,12 @@
/**
* The Message Listener for this Consumer
*/
- AtomicReference<cms::MessageListener> listener;
+ cms::MessageListener* listener;
+
+ /**
+ * Mutex to Protect access to the listener during delivery.
+ */
+ decaf::util::concurrent::Mutex listenerMutex;
/**
* Is the consumer currently delivering acks.
@@ -79,14 +84,19 @@
AtomicBoolean deliveringAcks;
/**
+ * Has this Consumer been started yet.
+ */
+ AtomicBoolean started;
+
+ /**
* Queue of unconsumed messages.
*/
- decaf::util::StlQueue<DispatchData> unconsumedMessages;
+ MessageDispatchChannel unconsumedMessages;
/**
* Queue of consumed messages.
*/
- decaf::util::StlQueue< decaf::lang::Pointer<commands::Message> > dispatchedMessages;
+ decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
/**
* The last delivered message's BrokerSequenceId.
@@ -119,9 +129,9 @@
volatile bool synchronizationRegistered;
/**
- * Boolean that indicates if the consumer has been closed
+ * Boolean indicating if in progress messages should be cleared.
*/
- bool closed;
+ bool clearDispatchList;
public:
@@ -137,6 +147,19 @@
public: // Interface Implementation
/**
+ * Starts the Consumer if not already started and not closed. A consumer
+ * will no deliver messages until started.
+ */
+ virtual void start();
+
+ /**
+ * Stops a Consumer, the Consumer will not deliver any messages that are
+ * dispatched to it until it is started again. A Closed Consumer is also a
+ * stopped consumer.
+ */
+ virtual void stop();
+
+ /**
* Closes the Consumer. This will return all allocated resources
* and purge any outstanding messages. This method will block if
* there is a call to receive in progress, or a dispatch to a
@@ -180,7 +203,7 @@
* @param MessageListener interface pointer
*/
virtual cms::MessageListener* getMessageListener() const {
- return this->listener.get();
+ return this->listener;
}
/**
@@ -204,9 +227,9 @@
/**
* Called asynchronously by the session to dispatch a message.
- * @param message object pointer
+ * @param message dispatch object pointer
*/
- virtual void dispatch( DispatchData& message );
+ virtual void dispatch( const Pointer<MessageDispatch>& message );
public: // ActiveMQConsumer Methods
@@ -256,7 +279,7 @@
* @returns if this Consumer has been closed.
*/
bool isClosed() const {
- return this->closed;
+ return this->unconsumedMessages.isClosed();
}
/**
@@ -275,13 +298,25 @@
this->synchronizationRegistered = value;
}
- protected:
+ /**
+ * Deliver any pending messages to the registered MessageListener if there
+ * is one, return true if not all dispatched, or false if no listener or all
+ * pending messages have been dispatched.
+ */
+ bool iterate();
+
+
+ /**
+ * Forces this consumer to send all pending acks to the broker.
+ */
+ void deliverAcks() throw ( exceptions::ActiveMQException );
/**
- * Purges all messages currently in the queue. This can be as a
- * result of a rollback, or of the consumer being shutdown.
+ * Called on a Failover to clear any pending messages.
*/
- void purgeMessages() throw ( exceptions::ActiveMQException );
+ void clearMessagesInProgress();
+
+ protected:
/**
* Used by synchronous receive methods to wait for messages to come in.
@@ -296,7 +331,7 @@
* @throws InvalidStateException if this consumer is closed upon
* entering this method.
*/
- Pointer<commands::Message> dequeue( int timeout )
+ Pointer<MessageDispatch> dequeue( int timeout )
throw ( cms::CMSException );
/**
@@ -304,7 +339,7 @@
* @param message - the message being consumed.
*/
void beforeMessageIsConsumed(
- const Pointer<commands::Message>& message );
+ const Pointer<commands::MessageDispatch>& dispatch );
/**
* Post-consume processing
@@ -312,7 +347,7 @@
* @param messageExpired - flag indicating if the message has expired.
*/
void afterMessageIsConsumed(
- const Pointer<commands::Message>& message, bool messageExpired );
+ const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired );
private:
@@ -334,12 +369,9 @@
// Sends an ack as needed in order to keep them coming in if the current
// ack mode allows the consumer to receive up to the prefetch limit before
// an real ack is sent.
- void ackLater( const Pointer<commands::Message>& message, int ackType )
+ void ackLater( const Pointer<commands::MessageDispatch>& message, int ackType )
throw ( exceptions::ActiveMQException );
- // Delivers all pending acks before a consumer is closed
- void deliverAcks() throw ( exceptions::ActiveMQException );
-
// Create an Ack Message that acks all messages that have been delivered so far.
Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Tue Apr 7 23:59:07 2009
@@ -194,20 +194,40 @@
"ActiveMQSession::rollback - This Session is not Transacted" );
}
- bool started = this->executor->isStarted();
+ // Roll back the Transaction
+ this->transaction->rollback();
+ }
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::clearMessagesInProgress() {
+
+ if( this->executor.get() != NULL ) {
+ this->executor->clearMessagesInProgress();
+ }
+
+ synchronized( &this->consumers ) {
+ std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
- if( started ) {
- this->executor->stop();
+ std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+ for( ; iter != consumers.end(); ++iter ) {
+ (*iter)->clearMessagesInProgress();
}
+ }
+}
- // Roll back the Transaction
- this->transaction->rollback();
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::deliverAcks() {
+
+ synchronized( &this->consumers ) {
+ std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
- if( started ) {
- this->executor->start();
+ std::vector< ActiveMQConsumer* >::iterator iter = consumers.begin();
+ for( ; iter != consumers.end(); ++iter ) {
+ (*iter)->deliverAcks();
}
}
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
@@ -275,6 +295,10 @@
// Send our info to the Broker.
this->syncRequest( consumerInfo );
+ if( this->connection->isStarted() ) {
+ consumer->start();
+ }
+
return consumer.release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -318,6 +342,10 @@
// Send our info to the Broker.
this->syncRequest( consumerInfo );
+ if( this->connection->isStarted() ) {
+ consumer->start();
+ }
+
return consumer.release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -656,34 +684,36 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::dispatch( DispatchData& message ) {
+void ActiveMQSession::dispatch( const Pointer<MessageDispatch>& dispatch ) {
if( this->executor.get() != NULL ) {
- this->executor->execute( message );
+ this->executor->execute( dispatch );
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::redispatch( decaf::util::StlQueue<DispatchData>& unconsumedMessages ) {
+void ActiveMQSession::redispatch( MessageDispatchChannel& unconsumedMessages ) {
- decaf::util::StlQueue<DispatchData> reversedList;
+ std::vector< Pointer<MessageDispatch> > messages = unconsumedMessages.removeAll();
+ std::vector< Pointer<MessageDispatch> >::reverse_iterator iter = messages.rbegin();
- // Copy the list in reverse order then clear the original list.
- synchronized( &unconsumedMessages ) {
- unconsumedMessages.reverse( reversedList );
- unconsumedMessages.clear();
- }
-
- // Add the list to the front of the executor.
- while( !reversedList.empty() ) {
- DispatchData data = reversedList.pop();
- executor->executeFirst( data );
+ for( ; iter != messages.rend(); ++iter ) {
+ executor->executeFirst( *iter );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::start() {
+ synchronized( &this->consumers ) {
+ std::vector< ActiveMQConsumer* > consumers = this->consumers.values();
+
+ std::vector< ActiveMQConsumer*>::iterator iter = consumers.begin();
+ for( ; iter != consumers.end(); ++iter ) {
+ (*iter)->start();
+ }
+ }
+
if( this->executor.get() != NULL ) {
this->executor->start();
}
@@ -704,7 +734,7 @@
return false;
}
- return this->executor->isStarted();
+ return this->executor->isRunning();
}
////////////////////////////////////////////////////////////////////////////////
@@ -950,28 +980,11 @@
if( this->consumers.containsKey( id ) ) {
- // If the executor thread is currently running, stop it.
- bool wasStarted = isStarted();
- if( wasStarted ) {
- stop();
- }
-
// Remove this Id both from the Sessions Map of Consumers and from
// the Connection.
this->connection->removeDispatcher( id );
this->connection->disposeOf( id );
this->consumers.remove( id );
-
- // Clean up any resources in the executor for this consumer
- if( this->executor.get() != NULL ) {
-
- // Purge any pending messages for this consumer.
- this->executor->purgeConsumerMessages( id );
- }
-
- if( wasStarted ) {
- start();
- }
}
}
}
@@ -1004,17 +1017,6 @@
}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer* ActiveMQSession::getConsumer( const Pointer<ConsumerId>& id ) {
-
- synchronized( &this->consumers ) {
- if( this->consumers.containsKey( id ) ) {
- return this->consumers.get( id );
- }
- }
- return NULL;
-}
-
-////////////////////////////////////////////////////////////////////////////////
void ActiveMQSession::doStartTransaction() throw ( ActiveMQException ) {
if( !this->isTransacted() ) {
@@ -1023,3 +1025,11 @@
this->transaction->begin();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::wakeup() {
+
+ if( this->executor.get() != NULL ) {
+ this->executor->wakeup();
+ }
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Tue Apr 7 23:59:07 2009
@@ -30,6 +30,7 @@
#include <activemq/commands/ProducerId.h>
#include <activemq/commands/TransactionId.h>
#include <activemq/core/Dispatcher.h>
+#include <activemq/core/MessageDispatchChannel.h>
#include <activemq/util/LongSequenceGenerator.h>
#include <decaf/util/StlMap.h>
@@ -63,6 +64,8 @@
ActiveMQProducer*,
commands::ProducerId::COMPARATOR> ProducersMap;
+ friend class ActiveMQSessionExecutor;
+
private:
/**
@@ -130,16 +133,10 @@
virtual ~ActiveMQSession();
/**
- * Looks up a consumer of this Session by a Pointer to its Id.
- * @param id - a Pointer to a ConsumerId to match in the Map of Consumers.
- */
- ActiveMQConsumer* getConsumer( const decaf::lang::Pointer<commands::ConsumerId>& id );
-
- /**
* Redispatches the given set of unconsumed messages to the consumers.
* @param unconsumedMessages - unconsumed messages to be redelivered.
*/
- void redispatch( decaf::util::StlQueue<DispatchData>& unconsumedMessages );
+ void redispatch( MessageDispatchChannel& unconsumedMessages );
/**
* Stops asynchronous message delivery.
@@ -178,7 +175,7 @@
* Dispatches a message to a particular consumer.
* @param message - the message to be dispatched
*/
- virtual void dispatch( DispatchData& message );
+ virtual void dispatch( const Pointer<MessageDispatch>& message );
public: // Implements Methods
@@ -473,6 +470,23 @@
*/
void doStartTransaction() throw ( exceptions::ActiveMQException );
+ /**
+ * Request that this Session inform all of its consumers to deliver their pending
+ * acks.
+ */
+ void deliverAcks();
+
+ /**
+ * Request that this Session inform all of its consumers to clear all messages that
+ * are currently in progress.
+ */
+ void clearMessagesInProgress();
+
+ /**
+ * Causes the Session to wakeup its executer and ensure all messages are dispatched.
+ */
+ void wakeup();
+
private:
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Tue Apr 7 23:59:07 2009
@@ -18,11 +18,14 @@
#include "ActiveMQSessionExecutor.h"
#include "ActiveMQSession.h"
#include "ActiveMQConsumer.h"
+
#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/threads/DedicatedTaskRunner.h>
using namespace std;
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::threads;
using namespace activemq::exceptions;
using namespace decaf::lang;
using namespace decaf::util;
@@ -32,8 +35,6 @@
ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
this->session = session;
- this->closed = false;
- this->started = false;
}
////////////////////////////////////////////////////////////////////////////////
@@ -42,6 +43,9 @@
try {
// Terminate the thread.
+ stop();
+
+ // Close out the Message Channel.
close();
// Empty the message queue and destroy any remaining messages.
@@ -51,119 +55,84 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::close() {
-
- synchronized( &mutex ) {
-
- closed = true;
- mutex.notifyAll();
- }
-
- if( thread != NULL ) {
- thread->join();
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::execute( DispatchData& data ) {
+void ActiveMQSessionExecutor::execute( const Pointer<MessageDispatch>& dispatch ) {
// Add the data to the queue.
- synchronized( &mutex ) {
- messageQueue.push_back( data );
- mutex.notifyAll();
- }
+ this->messageQueue.enqueue( dispatch );
+ this->wakeup();
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::executeFirst( DispatchData& data ) {
+void ActiveMQSessionExecutor::executeFirst( const Pointer<MessageDispatch>& dispatch ) {
- // Add the data to the front of the queue.
- synchronized( &mutex ) {
- messageQueue.push_front( data );
- mutex.notifyAll();
- }
+ // Add the data to the queue.
+ this->messageQueue.enqueueFirst( dispatch );
+ this->wakeup();
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::purgeConsumerMessages(
- const decaf::lang::Pointer<commands::ConsumerId>& consumerId )
-{
- synchronized( &mutex ) {
+void ActiveMQSessionExecutor::wakeup() {
- list<DispatchData>::iterator iter = messageQueue.begin();
- while( iter != messageQueue.end() ) {
- list<DispatchData>::iterator currentIter = iter;
- DispatchData& dispatchData = *iter++;
- if( consumerId->equals( *( dispatchData.getConsumerId() ) ) ) {
- messageQueue.erase( currentIter );
- }
+ Pointer<TaskRunner> taskRunner = this->taskRunner;
+ synchronized( &messageQueue ) {
+ if( this->taskRunner == NULL ) {
+ this->taskRunner.reset( new DedicatedTaskRunner( this ) );
}
+
+ taskRunner = this->taskRunner;
}
+
+ taskRunner->wakeup();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::start() {
- synchronized( &mutex ) {
-
- if( closed || started ) {
- return;
- }
-
- started = true;
+ if( !messageQueue.isRunning() ) {
- // Don't create the thread unless we need to.
- if( thread == NULL ) {
- thread.reset( new Thread( this ) );
- thread->start();
+ messageQueue.start();
+ if( hasUncomsumedMessages() ) {
+ this->wakeup();
}
-
- mutex.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionExecutor::stop() {
- // We lock here to make sure that we wait until the thread
- // is done with an internal dispatch operation, otherwise
- // we might return before that and cause the caller to be
- // in an inconsistent state.
- synchronized( &dispatchMutex ) {
-
- if( closed || !started ) {
- return;
+ if( messageQueue.isRunning() ) {
+ messageQueue.stop();
+ Pointer<TaskRunner> taskRunner = this->taskRunner;
+ if( taskRunner != NULL ) {
+ this->taskRunner.reset( NULL );
+ taskRunner->shutdown();
}
-
- synchronized( &mutex ) { started = false; }
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::clear() {
-
- synchronized( &mutex ) {
- this->messageQueue.clear();
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::dispatch( DispatchData& data ) {
+void ActiveMQSessionExecutor::dispatch( const Pointer<MessageDispatch>& dispatch ) {
try {
- ActiveMQConsumer* consumer = session->getConsumer( data.getConsumerId() );
+ ActiveMQConsumer* consumer = NULL;
- // If the consumer is not available, just delete the message.
- // Otherwise, dispatch the message to the consumer.
- if( consumer != NULL ) {
- consumer->dispatch( data );
+ synchronized( &( this->session->consumers ) ) {
+ if( this->session->consumers.containsKey( dispatch->getConsumerId() ) ) {
+ consumer = this->session->consumers.get( dispatch->getConsumerId() );
+ }
+
+ // If the consumer is not available, just ignore the message.
+ // Otherwise, dispatch the message to the consumer.
+ if( consumer != NULL ) {
+ consumer->dispatch( dispatch );
+ }
}
- } catch( ActiveMQException& ex ) {
+ } catch( decaf::lang::Exception& ex ) {
ex.setMark(__FILE__, __LINE__ );
ex.printStackTrace();
- } catch( exception& ex ) {
+ } catch( std::exception& ex ) {
ActiveMQException amqex( __FILE__, __LINE__, ex.what() );
amqex.printStackTrace();
} catch( ... ) {
@@ -173,72 +142,44 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::run() {
+bool ActiveMQSessionExecutor::iterate() {
try {
- while( true ) {
+ synchronized( &( this->session->consumers ) ) {
- // Dispatch all currently available messages.
- dispatchAll();
+ std::vector<ActiveMQConsumer*> consumers = this->session->consumers.values();
+ std::vector<ActiveMQConsumer*>::iterator iter = consumers.begin();
- synchronized( &mutex ) {
-
- // If we're closing down, exit the thread.
- if( closed ) {
- return;
- }
-
- // When stopped we hit this case and wait otherwise
- // if there are messages we
- if( ( messageQueue.empty() || !started ) && !closed ) {
-
- // Wait for more data or to be woke up.
- mutex.wait();
+ // Deliver any messages queued on the consumer to their listeners.
+ for( ; iter != consumers.end(); ++iter ) {
+ if( (*iter)->iterate() ) {
+ return true;
}
}
}
- } catch( ActiveMQException& ex ) {
+ // No messages left queued on the listeners.. so now dispatch messages
+ // queued on the session
+ Pointer<MessageDispatch> message = messageQueue.dequeueNoWait();
+ if( message != NULL ) {
+ dispatch( message );
+ return !messageQueue.isEmpty();
+ }
+
+ return false;
+
+ } catch( decaf::lang::Exception& ex ) {
ex.setMark(__FILE__, __LINE__ );
session->fire( ex );
- } catch( exception& stdex ) {
+ return true;
+ } catch( std::exception& stdex ) {
ActiveMQException ex( __FILE__, __LINE__, stdex.what() );
session->fire( ex );
+ return true;
} catch( ... ) {
ActiveMQException ex(__FILE__, __LINE__, "caught unknown exception" );
session->fire( ex );
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::dispatchAll() {
-
- // Dispatch all currently available messages. This lock allows the
- // main thread to wait while we finish with a dispatch cycle, the
- // stop method for instance should try and lock this mutex so that
- // it knows that we've had a chance to read the started flag and
- // detect that we are stopped, otherwise stop might return while
- // we are still dispatching messages.
- synchronized( &dispatchMutex ) {
-
- // Take out all of the dispatch data currently in the array.
- list<DispatchData> dataList;
- synchronized( &mutex ) {
-
- // If stopped or closed we don't want to start dispatching.
- if( !started || closed ) {
- return;
- }
-
- dataList = messageQueue;
- messageQueue.clear();
- }
-
- list<DispatchData>::iterator iter = dataList.begin();
- while( iter != dataList.end() ) {
- DispatchData& data = *iter++;
- dispatch( data );
- }
+ return true;
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Tue Apr 7 23:59:07 2009
@@ -19,20 +19,18 @@
#define ACTIVEMQ_CORE_ACTIVEMQSESSIONEXECUTOR_
#include <activemq/util/Config.h>
-#include <activemq/core/Dispatcher.h>
+#include <activemq/core/MessageDispatchChannel.h>
#include <activemq/commands/ConsumerId.h>
-#include <decaf/lang/Thread.h>
-#include <decaf/lang/Runnable.h>
+#include <activemq/commands/MessageDispatch.h>
+#include <activemq/threads/Task.h>
+#include <activemq/threads/TaskRunner.h>
#include <decaf/lang/Pointer.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/StlList.h>
-#include <vector>
-#include <list>
namespace activemq{
namespace core{
using decaf::lang::Pointer;
+ using activemq::commands::MessageDispatch;
class ActiveMQSession;
class ActiveMQConsumer;
@@ -41,32 +39,17 @@
* Delegate dispatcher for a single session. Contains a thread
* to provide for asynchronous dispatching.
*/
- class AMQCPP_API ActiveMQSessionExecutor :
- public Dispatcher,
- public decaf::lang::Runnable
- {
+ class AMQCPP_API ActiveMQSessionExecutor : activemq::threads::Task {
private:
/** Session that is this executors parent. */
ActiveMQSession* session;
- /** List used to hold messages waiting to be dispatched. */
- std::list<DispatchData> messageQueue;
+ /** The Channel that holds the waiting Messages for Dispatching. */
+ MessageDispatchChannel messageQueue;
- /** The Dispatcher Thread */
- Pointer<decaf::lang::Thread> thread;
-
- /** Mutex used to lock on access to the Message Queue */
- decaf::util::concurrent::Mutex mutex;
-
- /** Locks when messages are being dispatched to consumers. */
- decaf::util::concurrent::Mutex dispatchMutex;
-
- /** Has the Start method been called */
- volatile bool started;
-
- /** Has the Close method been called */
- volatile bool closed;
+ /** The Dispatcher TaskRunner */
+ Pointer<activemq::threads::TaskRunner> taskRunner;
public:
@@ -85,21 +68,33 @@
* end of the queue.
* @param data - the data to be dispatched.
*/
- virtual void execute( DispatchData& data );
+ virtual void execute( const Pointer<MessageDispatch>& data );
/**
* Executes the dispatch. Adds the given data to the
* beginning of the queue.
* @param data - the data to be dispatched.
*/
- virtual void executeFirst( DispatchData& data );
+ virtual void executeFirst( const Pointer<MessageDispatch>& data );
+
+ /**
+ * Removes all messages in the Dispatch Channel so that non are delivered.
+ */
+ virtual void clearMessagesInProgress() {
+ this->messageQueue.clear();
+ }
+
+ /**
+ * @return true if there are any pending messages in the dispatch channel.
+ */
+ virtual bool hasUncomsumedMessages() const {
+ return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
+ }
/**
- * Removes all messages for the given consumer from the.
- * @param consumerId - the subject consumer
+ * wakeup this executer and dispatch any pending messages.
*/
- virtual void purgeConsumerMessages(
- const decaf::lang::Pointer<commands::ConsumerId>& consumerId );
+ virtual void wakeup();
/**
* Starts the dispatching.
@@ -115,39 +110,55 @@
* Terminates the dispatching thread. Once this is called, the executor is no longer
* usable.
*/
- virtual void close();
+ virtual void close() {
+ this->messageQueue.close();
+ }
+
+ /**
+ * @return true indicates if the executor is started
+ */
+ virtual bool isRunning() const {
+ return this->messageQueue.isRunning();
+ }
/**
- * Indicates if the executor is started
+ * @return true if there are no messages in the Dispatch Channel.
*/
- virtual bool isStarted() const {
- return started;
+ virtual bool isEmpty() {
+ return messageQueue.isEmpty();
}
/**
* Removes all queued messages and destroys them.
*/
- virtual void clear();
+ virtual void clear() {
+ this->messageQueue.clear();
+ }
/**
- * Dispatches a message to a particular consumer.
- * @param consumer - The consumer to dispatch to.
- * @param msg - The message to be dispatched.
+ * Iterates on the MessageDispatchChannel sending all pending messages
+ * to the Consumers they are destined for.
+ *
+ * @return false if there are no more messages to dispatch.
*/
- virtual void dispatch( DispatchData& data );
+ virtual bool iterate();
/**
- * Run method - called by the Thread class in the context
- * of the thread.
+ * @returns a vector containing all the unconsumed messages, this clears the
+ * Message Dispatch Channel when called.
*/
- virtual void run();
+ std::vector< Pointer<MessageDispatch> > getUnconsumedMessages() {
+ return messageQueue.removeAll();
+ }
private:
/**
- * Dispatches all messages currently in the queue.
+ * Dispatches a message to a particular consumer.
+ * @param data - The message to be dispatched.
*/
- void dispatchAll();
+ virtual void dispatch( const Pointer<MessageDispatch>& data );
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Tue Apr 7 23:59:07 2009
@@ -89,9 +89,12 @@
throw ( activemq::exceptions::ActiveMQException ) {
try{
+
if( !isInTransaction() ) {
- this->synchronizations.clear();
+ synchronized( &synchronizations ) {
+ this->synchronizations.clear();
+ }
// Create the Id
Pointer<LocalTransactionId> id( new LocalTransactionId() );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h Tue Apr 7 23:59:07 2009
@@ -18,28 +18,33 @@
#ifndef ACTIVEMQ_CORE_DISPATCHER_H_
#define ACTIVEMQ_CORE_DISPATCHER_H_
-#include <activemq/core/DispatchData.h>
+#include <activemq/commands/MessageDispatch.h>
#include <activemq/util/Config.h>
+#include <decaf/lang/Pointer.h>
namespace activemq{
namespace core{
-
+
+ using decaf::lang::Pointer;
+ using activemq::commands::MessageDispatch;
+
/**
- * Interface for an object responsible for dispatching messages to
+ * Interface for an object responsible for dispatching messages to
* consumers.
*/
class AMQCPP_API Dispatcher {
public:
-
+
virtual ~Dispatcher(){}
-
+
/**
* Dispatches a message to a particular consumer.
* @param message - the message to be dispatched.
*/
- virtual void dispatch( DispatchData& message ) = 0;
+ virtual void dispatch( const Pointer<MessageDispatch>& message ) = 0;
+
};
-
+
}}
#endif /*ACTIVEMQ_CORE_DISPATCHER_H_*/
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp Tue Apr 7 23:59:07 2009
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageDispatchChannel.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+MessageDispatchChannel::MessageDispatchChannel() {
+
+ this->running = false;
+ this->closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MessageDispatchChannel::~MessageDispatchChannel() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::enqueue( const Pointer<MessageDispatch>& message ) {
+ synchronized( &channel ) {
+ channel.push( message );
+ channel.notify();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::enqueueFirst( const Pointer<MessageDispatch>& message ) {
+ synchronized( &channel ) {
+ channel.enqueueFront( message );
+ channel.notify();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool MessageDispatchChannel::isEmpty() const {
+ synchronized( &channel ) {
+ return channel.empty();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> MessageDispatchChannel::dequeue( long long timeout )
+ throw( exceptions::ActiveMQException ) {
+
+ synchronized( &channel ) {
+ // Wait until the channel is ready to deliver messages.
+ while( timeout != 0 && !closed && ( channel.empty() || !running ) ) {
+ if( timeout == -1 ) {
+ channel.wait();
+ } else {
+ channel.wait( timeout );
+ break;
+ }
+ }
+
+ if( closed || !running || channel.empty() ) {
+ return Pointer<MessageDispatch>();
+ }
+
+ return channel.pop();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> MessageDispatchChannel::dequeueNoWait() {
+ synchronized( &channel ) {
+ if( closed || !running || channel.empty() ) {
+ return Pointer<MessageDispatch>();
+ }
+ return channel.pop();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> MessageDispatchChannel::peek() const {
+ synchronized( &channel ) {
+ if( closed || !running || channel.empty() ) {
+ return Pointer<MessageDispatch>();
+ }
+ return channel.front();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::start() {
+ synchronized( &channel ) {
+ if( !closed ) {
+ running = true;
+ channel.notifyAll();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::stop() {
+ synchronized( &channel ) {
+ running = false;
+ channel.notifyAll();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::close() {
+ synchronized( &channel ) {
+ if( !closed ) {
+ running = false;
+ closed = true;
+ }
+ channel.notifyAll();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessageDispatchChannel::clear() {
+ synchronized( &channel ) {
+ channel.clear();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int MessageDispatchChannel::size() const {
+ synchronized( &channel ) {
+ return channel.size();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector< Pointer<MessageDispatch> > MessageDispatchChannel::removeAll() {
+ synchronized( &channel ) {
+ std::vector< Pointer<MessageDispatch> > result = channel.toArray();
+ channel.clear();
+ return result;
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h?rev=763046&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h Tue Apr 7 23:59:07 2009
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNEL_H_
+#define _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNEL_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/commands/MessageDispatch.h>
+
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/Synchronizable.h>
+#include <decaf/util/StlQueue.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace core {
+
+ using decaf::lang::Pointer;
+ using activemq::commands::MessageDispatch;
+
+ class AMQCPP_API MessageDispatchChannel : public decaf::util::concurrent::Synchronizable {
+ private:
+
+ bool closed;
+ bool running;
+
+ mutable decaf::util::StlQueue< Pointer<MessageDispatch> > channel;
+
+ public:
+
+ MessageDispatchChannel();
+ virtual ~MessageDispatchChannel();
+
+ /**
+ * Add a Message to the Channel behind all pending message.
+ *
+ * @param message - The message to add to the Channel.
+ */
+ void enqueue( const Pointer<MessageDispatch>& message );
+
+ /**
+ * Add a message to the front of the Channel.
+ *
+ * @param message - The Message to add to the front of the Channel.
+ */
+ void enqueueFirst( const Pointer<MessageDispatch>& message );
+
+ /**
+ * @return true if there are no messages in the Channel.
+ */
+ bool isEmpty() const;
+
+ /**
+ * @return has the Queue been closed.
+ */
+ bool isClosed() const {
+ return this->closed;
+ }
+
+ /**
+ * @return true if the Channel currently running and will dequeue message.
+ */
+ bool isRunning() const {
+ return this->running;
+ }
+
+ /**
+ * Used to get an enqueued message. The amount of time this method blocks is
+ * based on the timeout value. - if timeout==-1 then it blocks until a
+ * message is received. - if timeout==0 then it it tries to not block at
+ * all, it returns a message if it is available - if timeout>0 then it
+ * blocks up to timeout amount of time. Expired messages will consumed by
+ * this method.
+ *
+ * @return null if we timeout or if the consumer is closed.
+ * @throws ActiveMQException
+ */
+ Pointer<MessageDispatch> dequeue( long long timeout )
+ throw( exceptions::ActiveMQException );
+
+ /**
+ * Used to get an enqueued message if there is one queued right now. If there is
+ * no waiting message than this method returns Null.
+ *
+ * @return a message if there is one in the queue.
+ */
+ Pointer<MessageDispatch> dequeueNoWait();
+
+ /**
+ * Peek in the Queue and return the first message in the Channel without removing
+ * it from the channel.
+ *
+ * @return a message if there is one in the queue.
+ */
+ Pointer<MessageDispatch> peek() const;
+
+ /**
+ * Starts dispatch of messages from the Channel.
+ */
+ void start();
+
+ /**
+ * Stops dispatch of message from the Channel.
+ */
+ void stop();
+
+ /**
+ * Close this channel no messages will be dispatched after this method is called.
+ */
+ void close();
+
+ /**
+ * Clear the Channel, all pending messages are removed.
+ */
+ void clear();
+
+ /**
+ * @return the number of Messages currently in the Channel.
+ */
+ int size() const;
+
+ /**
+ * Remove all messages that are currently in the Channel and return them as
+ * a list of Messages.
+ *
+ * @return a list of Messages that was previously in the Channel.
+ */
+ std::vector< Pointer<MessageDispatch> > removeAll();
+
+ public:
+
+ /**
+ * Locks the object.
+ */
+ virtual void lock() throw( decaf::lang::Exception ){
+ channel.lock();
+ }
+
+ /**
+ * Unlocks the object.
+ */
+ virtual void unlock() throw( decaf::lang::Exception ){
+ channel.unlock();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ */
+ virtual void wait() throw( decaf::lang::Exception ){
+ channel.wait();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param millisecs time to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait( unsigned long millisecs ) throw( decaf::lang::Exception ) {
+ channel.wait( millisecs );
+ }
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notify() throw( decaf::lang::Exception ){
+ channel.notify();
+ }
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ */
+ virtual void notifyAll() throw( decaf::lang::Exception ){
+ channel.notifyAll();
+ }
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_MESSAGEDISPATCHCHANNEL_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Tue Apr 7 23:59:07 2009
@@ -111,6 +111,7 @@
activemq/core/ActiveMQSessionTest.cpp \
activemq/core/ActiveMQConnectionFactoryTest.cpp \
activemq/core/ActiveMQConnectionTest.cpp \
+ activemq/core/MessageDispatchChannelTest.cpp \
activemq/commands/ActiveMQTextMessageTest.cpp \
activemq/commands/BrokerIdTest.cpp \
activemq/commands/BrokerInfoTest.cpp \
@@ -226,6 +227,7 @@
activemq/core/ActiveMQConnectionFactoryTest.h \
activemq/core/ActiveMQConnectionTest.h \
activemq/core/ActiveMQSessionTest.h \
+ activemq/core/MessageDispatchChannelTest.h \
activemq/commands/ActiveMQTempTopicTest.h \
activemq/commands/ActiveMQQueueTest.h \
activemq/commands/ActiveMQBytesMessageTest.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.cpp Tue Apr 7 23:59:07 2009
@@ -24,10 +24,16 @@
#include <decaf/lang/Pointer.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/transport/mock/MockTransport.h>
#include <activemq/transport/mock/MockTransportFactory.h>
#include <activemq/transport/TransportRegistry.h>
#include <activemq/util/Config.h>
+#include <activemq/commands/Message.h>
+
+#include <cms/Connection.h>
+#include <cms/ExceptionListener.h>
using namespace activemq;
using namespace activemq::core;
@@ -36,6 +42,58 @@
using namespace decaf::util;
using namespace decaf::lang;
+namespace activemq {
+namespace core{
+
+ class MyCommandListener : public transport::DefaultTransportListener{
+ public:
+
+ commands::Command* cmd;
+
+ public:
+
+ MyCommandListener(){
+ cmd = NULL;
+ }
+ virtual ~MyCommandListener(){}
+
+ virtual void onCommand( commands::Command* command ){
+ cmd = command;
+ }
+ };
+
+ class MyExceptionListener : public cms::ExceptionListener{
+ public:
+
+ bool caughtOne;
+
+ public:
+
+ MyExceptionListener(){ caughtOne = false; }
+ virtual ~MyExceptionListener(){}
+
+ virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED){
+ caughtOne = true;
+ }
+ };
+
+ class MyDispatcher : public Dispatcher
+ {
+ public:
+
+ std::vector< decaf::lang::Pointer<commands::Message> > messages;
+
+ public:
+ virtual ~MyDispatcher(){}
+
+ virtual void dispatch( const decaf::lang::Pointer<commands::MessageDispatch>& data )
+ throw ( exceptions::ActiveMQException )
+ {
+ messages.push_back( data->getMessage() );
+ }
+ };
+}}
+
////////////////////////////////////////////////////////////////////////////////
//void ActiveMQConnectionTest::test1WithStomp()
//{
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h?rev=763046&r1=763045&r2=763046&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h Tue Apr 7 23:59:07 2009
@@ -21,15 +21,6 @@
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
-#include <cms/Connection.h>
-#include <cms/ExceptionListener.h>
-
-#include <activemq/transport/Transport.h>
-#include <activemq/transport/DefaultTransportListener.h>
-#include <activemq/util/Config.h>
-#include <activemq/core/ActiveMQConnection.h>
-#include <activemq/commands/Message.h>
-
namespace activemq{
namespace core{
@@ -46,54 +37,6 @@
ActiveMQConnectionTest() {};
virtual ~ActiveMQConnectionTest() {}
- class MyCommandListener : public transport::DefaultTransportListener{
- public:
-
- commands::Command* cmd;
-
- public:
-
- MyCommandListener(){
- cmd = NULL;
- }
- virtual ~MyCommandListener(){}
-
- virtual void onCommand( commands::Command* command ){
- cmd = command;
- }
- };
-
- class MyExceptionListener : public cms::ExceptionListener{
- public:
-
- bool caughtOne;
-
- public:
-
- MyExceptionListener(){ caughtOne = false; }
- virtual ~MyExceptionListener(){}
-
- virtual void onException(const cms::CMSException& ex AMQCPP_UNUSED){
- caughtOne = true;
- }
- };
-
- class MyDispatcher : public Dispatcher
- {
- public:
-
- std::vector< decaf::lang::Pointer<commands::Message> > messages;
-
- public:
- virtual ~MyDispatcher(){}
-
- virtual void dispatch( DispatchData& data )
- throw ( exceptions::ActiveMQException )
- {
- messages.push_back( data.getMessage() );
- }
- };
-
// void test1WithStomp();
// void test2WithStomp();
void test2WithOpenwire();
|