activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1025714 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConsumer.cpp ActiveMQConsumer.h
Date Wed, 20 Oct 2010 20:46:54 GMT
Author: tabish
Date: Wed Oct 20 20:46:54 2010
New Revision: 1025714

URL: http://svn.apache.org/viewvc?rev=1025714&view=rev
Log:
Refactored to internalize the member data and adds a failure state.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1025714&r1=1025713&r2=1025714&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Wed
Oct 20 20:46:54 2010
@@ -60,6 +60,38 @@ using namespace decaf::util::concurrent;
 namespace activemq{
 namespace core {
 
+    class ActiveMQConsumerMembers {
+    public:
+
+        cms::MessageListener* listener;
+        decaf::util::concurrent::Mutex listenerMutex;
+        AtomicBoolean deliveringAcks;
+        AtomicBoolean started;
+        Pointer<MessageDispatchChannel> unconsumedMessages;
+        decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch> >
dispatchedMessages;
+        long long lastDeliveredSequenceId;
+        Pointer<commands::MessageAck> pendingAck;
+        int deliveredCounter;
+        int additionalWindowSize;
+        volatile bool synchronizationRegistered;
+        bool clearDispatchList;
+        bool inProgressClearRequiredFlag;
+        long long redeliveryDelay;
+        Pointer<RedeliveryPolicy> redeliveryPolicy;
+        Pointer<Exception> failureError;
+
+        ActiveMQConsumerMembers() : listener(NULL),
+                                    lastDeliveredSequenceId(0),
+                                    deliveredCounter(0),
+                                    additionalWindowSize(0),
+                                    synchronizationRegistered(false),
+                                    clearDispatchList(false),
+                                    inProgressClearRequiredFlag(false),
+                                    redeliveryDelay(0) {
+        }
+
+    };
+
     /**
      * Class used to deal with consumers in an active transaction.  This
      * class calls back into the consumer when the transaction is Committed or
@@ -223,6 +255,8 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
             "ActiveMQConsumer::ActiveMQConsumer - Destination given has no Physical Name."
);
     }
 
+    this->internal = new ActiveMQConsumerMembers();
+
     Pointer<ConsumerInfo> consumerInfo( new ConsumerInfo() );
 
     consumerInfo->setConsumerId( id );
@@ -238,20 +272,20 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
     // Initialize Consumer Data
     this->session = session;
     this->consumerInfo = consumerInfo;
-    this->lastDeliveredSequenceId = -1;
-    this->synchronizationRegistered = false;
-    this->additionalWindowSize = 0;
-    this->deliveredCounter = 0;
-    this->clearDispatchList = false;
-    this->inProgressClearRequiredFlag = false;
-    this->listener = NULL;
-    this->redeliveryDelay = 0;
-    this->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone()
);
+    this->internal->lastDeliveredSequenceId = -1;
+    this->internal->synchronizationRegistered = false;
+    this->internal->additionalWindowSize = 0;
+    this->internal->deliveredCounter = 0;
+    this->internal->clearDispatchList = false;
+    this->internal->inProgressClearRequiredFlag = false;
+    this->internal->listener = NULL;
+    this->internal->redeliveryDelay = 0;
+    this->internal->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone()
);
 
     if( this->session->getConnection()->isMessagePrioritySupported() ) {
-        this->unconsumedMessages.reset( new SimplePriorityMessageDispatchChannel() );
+        this->internal->unconsumedMessages.reset( new SimplePriorityMessageDispatchChannel()
);
     } else {
-        this->unconsumedMessages.reset( new FifoMessageDispatchChannel() );
+        this->internal->unconsumedMessages.reset( new FifoMessageDispatchChannel()
);
     }
 
     if( listener != NULL ) {
@@ -263,7 +297,12 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
 ActiveMQConsumer::~ActiveMQConsumer() throw() {
 
     try {
-        close();
+
+        try{
+            this->close();
+        } catch(...) {}
+
+        delete this->internal;
     }
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
@@ -272,24 +311,24 @@ ActiveMQConsumer::~ActiveMQConsumer() th
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::start() {
 
-    if( this->unconsumedMessages->isClosed() ) {
+    if( this->internal->unconsumedMessages->isClosed() ) {
         return;
     }
 
-    this->started.set( true );
-    this->unconsumedMessages->start();
+    this->internal->started.set( true );
+    this->internal->unconsumedMessages->start();
     this->session->wakeup();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::stop() {
-    this->started.set( false );
-    this->unconsumedMessages->stop();
+    this->internal->started.set( false );
+    this->internal->unconsumedMessages->stop();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumer::isClosed() const {
-    return this->unconsumedMessages->isClosed();
+    return this->internal->unconsumedMessages->isClosed();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -333,7 +372,7 @@ void ActiveMQConsumer::doClose() {
                 deliverAcks();
             }
 
-            this->started.set( false );
+            this->internal->started.set( false );
 
             // Identifies any errors encountered during shutdown.
             bool haveException = false;
@@ -341,7 +380,7 @@ void ActiveMQConsumer::doClose() {
 
             // Purge all the pending messages
             try{
-                unconsumedMessages->clear();
+                internal->unconsumedMessages->clear();
             } catch ( ActiveMQException& ex ){
                 if( !haveException ){
                     ex.setMark( __FILE__, __LINE__ );
@@ -351,30 +390,30 @@ void ActiveMQConsumer::doClose() {
             }
 
             // Stop and Wakeup all sync consumers.
-            unconsumedMessages->close();
+            internal->unconsumedMessages->close();
 
             if( this->session->isIndividualAcknowledge() ) {
                 // For IndividualAck Mode we need to unlink the ack handler to remove a
                 // cyclic reference to the MessageDispatch that brought the message to us.
-                synchronized( &dispatchedMessages ) {
-                    std::auto_ptr< Iterator< Pointer<MessageDispatch> > >
iter( this->dispatchedMessages.iterator() );
+                synchronized( &internal->dispatchedMessages ) {
+                    std::auto_ptr< Iterator< Pointer<MessageDispatch> > >
iter( this->internal->dispatchedMessages.iterator() );
                     while( iter->hasNext() ) {
                         iter->next()->getMessage()->setAckHandler( Pointer<ActiveMQAckHandler>()
);
                     }
 
-                    dispatchedMessages.clear();
+                    this->internal->dispatchedMessages.clear();
                 }
             }
 
             // Remove this Consumer from the Connections set of Dispatchers
-            this->session->removeConsumer( this->consumerInfo->getConsumerId(),
lastDeliveredSequenceId );
+            this->session->removeConsumer( this->consumerInfo->getConsumerId(),
this->internal->lastDeliveredSequenceId );
 
             // Remove at the Broker Side, consumer has been removed from the local
             // Session and Connection objects so if the remote call to remove throws
             // it is okay to propagate to the client.
             Pointer<RemoveInfo> info( new RemoveInfo );
             info->setObjectId( this->consumerInfo->getConsumerId() );
-            info->setLastDeliveredSequenceId( lastDeliveredSequenceId );
+            info->setLastDeliveredSequenceId( this->internal->lastDeliveredSequenceId
);
             this->session->oneway( info );
 
             // If we encountered an error, propagate it.
@@ -415,13 +454,17 @@ decaf::lang::Pointer<MessageDispatch> Ac
         // Loop until the time is up or we get a non-expired message
         while( true ) {
 
-            Pointer<MessageDispatch> dispatch = unconsumedMessages->dequeue( timeout
);
+            Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue(
timeout );
             if( dispatch == NULL ) {
 
-                if( timeout > 0 && !unconsumedMessages->isClosed() ) {
+                if( timeout > 0 && !this->internal->unconsumedMessages->isClosed()
) {
                     timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
                 } else {
-                    return Pointer<MessageDispatch>();
+                    if( this->internal->failureError != NULL ) {
+                        throw CMSExceptionSupport::create(*this->internal->failureError);
+                    } else {
+                        return Pointer<MessageDispatch>();
+                    }
                 }
 
             } else if( dispatch->getMessage() == NULL ) {
@@ -570,18 +613,18 @@ void ActiveMQConsumer::setMessageListene
                 session->stop();
             }
 
-            synchronized( &listenerMutex ) {
-                this->listener = listener;
+            synchronized( &(this->internal->listenerMutex) ) {
+                this->internal->listener = listener;
             }
 
-            session->redispatch( *unconsumedMessages );
+            this->session->redispatch( *(this->internal->unconsumedMessages)
);
 
             if( wasStarted ) {
-                session->start();
+                this->session->start();
             }
         } else {
-            synchronized( &listenerMutex ) {
-                this->listener = NULL;
+            synchronized( &(this->internal->listenerMutex) ) {
+                this->internal->listener = NULL;
             }
         }
     }
@@ -601,14 +644,14 @@ void ActiveMQConsumer::beforeMessageIsCo
         dispatch->getMessage()->setAckHandler( ackHandler );
     }
 
-    this->lastDeliveredSequenceId =
+    this->internal->lastDeliveredSequenceId =
         dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
 
     if( !isAutoAcknowledgeBatch() ) {
 
         // When not in an Auto
-        synchronized( &dispatchedMessages ) {
-            dispatchedMessages.enqueueFront( dispatch );
+        synchronized( &internal->dispatchedMessages ) {
+            internal->dispatchedMessages.enqueueFront( dispatch );
         }
 
         if( this->session->isTransacted() ) {
@@ -623,7 +666,7 @@ void ActiveMQConsumer::afterMessageIsCon
 
     try{
 
-        if( unconsumedMessages->isClosed() ) {
+        if( internal->unconsumedMessages->isClosed() ) {
             return;
         }
 
@@ -635,21 +678,21 @@ void ActiveMQConsumer::afterMessageIsCon
             return;
         } else if( isAutoAcknowledgeEach() ) {
 
-            if( this->deliveringAcks.compareAndSet( false, true ) ) {
+            if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
 
-                synchronized( &dispatchedMessages ) {
-                    if( !dispatchedMessages.empty() ) {
+                synchronized( &internal->dispatchedMessages ) {
+                    if( !internal->dispatchedMessages.empty() ) {
                         Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
                             ActiveMQConstants::ACK_TYPE_CONSUMED );
 
                         if( ack != NULL ) {
-                            dispatchedMessages.clear();
+                            internal->dispatchedMessages.clear();
                             session->oneway( ack );
                         }
                     }
                 }
 
-                this->deliveringAcks.set( false );
+                this->internal->deliveringAcks.set( false );
             }
 
         } else if( isAutoAcknowledgeBatch() ) {
@@ -659,8 +702,8 @@ void ActiveMQConsumer::afterMessageIsCon
 
             bool messageUnackedByConsumer = false;
 
-            synchronized( &dispatchedMessages ) {
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->dispatchedMessages.iterator() );
+            synchronized( &internal->dispatchedMessages ) {
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
                 while( iter->hasNext() ) {
                     if( iter->next() == message ) {
                         messageUnackedByConsumer = true;
@@ -689,25 +732,25 @@ void ActiveMQConsumer::deliverAcks() {
 
         Pointer<MessageAck> ack;
 
-        if( this->deliveringAcks.compareAndSet( false, true ) ) {
+        if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
 
             if( isAutoAcknowledgeEach() ) {
 
-                synchronized( &dispatchedMessages ) {
+                synchronized( &internal->dispatchedMessages ) {
 
                     ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED
);
 
                     if( ack != NULL ) {
-                        dispatchedMessages.clear();
+                        internal->dispatchedMessages.clear();
                     } else {
-                        ack.swap( pendingAck );
+                        ack.swap( internal->pendingAck );
                     }
                 }
 
-            } else if( pendingAck != NULL &&
-                       pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED
) {
+            } else if( internal->pendingAck != NULL &&
+                       internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED
) {
 
-                ack.swap( pendingAck );
+                ack.swap( internal->pendingAck );
             }
 
             if( ack != NULL ) {
@@ -717,7 +760,7 @@ void ActiveMQConsumer::deliverAcks() {
                 } catch(...) {}
 
             } else {
-                this->deliveringAcks.set( false );
+                this->internal->deliveringAcks.set( false );
             }
         }
     }
@@ -733,8 +776,8 @@ void ActiveMQConsumer::ackLater( const P
     // consumer got the message to expand the pre-fetch window
     if( session->isTransacted() ) {
         session->doStartTransaction();
-        if( !synchronizationRegistered ) {
-            synchronizationRegistered = true;
+        if( !internal->synchronizationRegistered ) {
+            internal->synchronizationRegistered = true;
 
             Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
             this->session->getTransactionContext()->addSynchronization( sync );
@@ -743,20 +786,20 @@ void ActiveMQConsumer::ackLater( const P
 
     // The delivered message list is only needed for the recover method
     // which is only used with client ack.
-    deliveredCounter++;
+    internal->deliveredCounter++;
 
-    Pointer<MessageAck> oldPendingAck = pendingAck;
-    pendingAck.reset( new MessageAck() );
-    pendingAck->setConsumerId( dispatch->getConsumerId() );
-    pendingAck->setAckType( (unsigned char)ackType );
-    pendingAck->setDestination( dispatch->getDestination() );
-    pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() );
-    pendingAck->setMessageCount( deliveredCounter );
+    Pointer<MessageAck> oldPendingAck = internal->pendingAck;
+    internal->pendingAck.reset( new MessageAck() );
+    internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
+    internal->pendingAck->setAckType( (unsigned char)ackType );
+    internal->pendingAck->setDestination( dispatch->getDestination() );
+    internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId()
);
+    internal->pendingAck->setMessageCount( internal->deliveredCounter );
 
     if( oldPendingAck == NULL ) {
-        pendingAck->setFirstMessageId( pendingAck->getLastMessageId() );
-    } else if ( oldPendingAck->getAckType() == pendingAck->getAckType() ) {
-        pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
+        internal->pendingAck->setFirstMessageId( internal->pendingAck->getLastMessageId()
);
+    } else if ( oldPendingAck->getAckType() == internal->pendingAck->getAckType()
) {
+        internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId()
);
     } else {
         // old pending ack being superseded by ack of another type, if is is not a delivered
         // ack and hence important, send it now so it is not lost.
@@ -766,33 +809,33 @@ void ActiveMQConsumer::ackLater( const P
     }
 
     if( session->isTransacted() ) {
-        pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId()
);
+        internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId()
);
     }
 
-    if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( deliveredCounter -
additionalWindowSize ) ) {
-        session->oneway( pendingAck );
-        pendingAck.reset( NULL );
-        deliveredCounter = 0;
-        additionalWindowSize = 0;
+    if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( internal->deliveredCounter
- internal->additionalWindowSize ) ) {
+        session->oneway( this->internal->pendingAck );
+        internal->pendingAck.reset( NULL );
+        internal->deliveredCounter = 0;
+        internal->additionalWindowSize = 0;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<MessageAck> ActiveMQConsumer::makeAckForAllDeliveredMessages( int type ) {
 
-    synchronized( &dispatchedMessages ) {
+    synchronized( &internal->dispatchedMessages ) {
 
-        if( !dispatchedMessages.empty() ) {
+        if( !internal->dispatchedMessages.empty() ) {
 
-            Pointer<MessageDispatch> dispatched = dispatchedMessages.front();
+            Pointer<MessageDispatch> dispatched = internal->dispatchedMessages.front();
 
             Pointer<MessageAck> ack( new MessageAck() );
             ack->setAckType( (unsigned char)type );
             ack->setConsumerId( dispatched->getConsumerId() );
             ack->setDestination( dispatched->getDestination() );
-            ack->setMessageCount( (int)dispatchedMessages.size() );
+            ack->setMessageCount( (int)internal->dispatchedMessages.size() );
             ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
-            ack->setFirstMessageId( dispatchedMessages.back()->getMessage()->getMessageId()
);
+            ack->setFirstMessageId( internal->dispatchedMessages.back()->getMessage()->getMessageId()
);
 
             return ack;
         }
@@ -820,8 +863,8 @@ void ActiveMQConsumer::acknowledge( cons
 
             session->oneway( ack );
 
-            synchronized( &dispatchedMessages ) {
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->dispatchedMessages.iterator() );
+            synchronized( &internal->dispatchedMessages ) {
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
                 while( iter->hasNext() ) {
                     if( iter->next() == dispatch ) {
                         iter->remove();
@@ -844,7 +887,7 @@ void ActiveMQConsumer::acknowledge() {
 
     try{
 
-        synchronized( &dispatchedMessages ) {
+        synchronized( &internal->dispatchedMessages ) {
 
             // Acknowledge all messages so far.
             Pointer<MessageAck> ack =
@@ -860,14 +903,16 @@ void ActiveMQConsumer::acknowledge() {
             }
 
             session->oneway( ack );
-            pendingAck.reset( NULL );
+            this->internal->pendingAck.reset( NULL );
 
             // Adjust the counters
-            deliveredCounter = Math::max( 0, deliveredCounter - (int)dispatchedMessages.size());
-            additionalWindowSize = Math::max(0, additionalWindowSize - (int)dispatchedMessages.size());
+            this->internal->deliveredCounter =
+                Math::max( 0, internal->deliveredCounter - (int)internal->dispatchedMessages.size());
+            this->internal->additionalWindowSize =
+                Math::max(0, internal->additionalWindowSize - (int)internal->dispatchedMessages.size());
 
             if( !session->isTransacted() ) {
-                dispatchedMessages.clear();
+                this->internal->dispatchedMessages.clear();
             }
         }
     }
@@ -877,43 +922,43 @@ void ActiveMQConsumer::acknowledge() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::commit() {
 
-    synchronized( &dispatchedMessages ) {
-        dispatchedMessages.clear();
+    synchronized( &(this->internal->dispatchedMessages) ) {
+        this->internal->dispatchedMessages.clear();
     }
-    redeliveryDelay = 0;
+    this->internal->redeliveryDelay = 0;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::rollback() {
 
-    synchronized( unconsumedMessages.get() ) {
+    synchronized( internal->unconsumedMessages.get() ) {
 
-        synchronized( &dispatchedMessages ) {
-            if( dispatchedMessages.empty() ) {
+        synchronized( &internal->dispatchedMessages ) {
+            if( internal->dispatchedMessages.empty() ) {
                 return;
             }
 
             // Only increase the redelivery delay after the first redelivery..
-            Pointer<MessageDispatch> lastMsg = dispatchedMessages.front();
+            Pointer<MessageDispatch> lastMsg = internal->dispatchedMessages.front();
             const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
             if( currentRedeliveryCount > 0 ) {
-                redeliveryDelay = this->redeliveryPolicy->getNextRedeliveryDelay( redeliveryDelay
);
+                internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(
internal->redeliveryDelay );
             } else {
-                redeliveryDelay = this->redeliveryPolicy->getInitialRedeliveryDelay();
+                internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
             }
 
             Pointer<MessageId> firstMsgId =
-                dispatchedMessages.back()->getMessage()->getMessageId();
+                internal->dispatchedMessages.back()->getMessage()->getMessageId();
 
-            std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
dispatchedMessages.iterator() );
+            std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
 
             while( iter->hasNext() ) {
                 Pointer<Message> message = iter->next()->getMessage();
                 message->setRedeliveryCounter( message->getRedeliveryCounter() + 1
);
             }
 
-            if( this->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES
&&
-                lastMsg->getRedeliveryCounter() > this->redeliveryPolicy->getMaximumRedeliveries()
) {
+            if( this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES
&&
+                lastMsg->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()
) {
 
                 // We need to NACK the messages so that they get sent to the DLQ.
                 // Acknowledge the last message.
@@ -921,15 +966,15 @@ void ActiveMQConsumer::rollback() {
                 ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
                 ack->setConsumerId( this->consumerInfo->getConsumerId() );
                 ack->setDestination( lastMsg->getDestination() );
-                ack->setMessageCount( (int)dispatchedMessages.size() );
+                ack->setMessageCount( (int)internal->dispatchedMessages.size() );
                 ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
                 ack->setFirstMessageId( firstMsgId );
 
                 session->oneway( ack );
                 // Adjust the window size.
-                additionalWindowSize =
-                    Math::max( 0, additionalWindowSize - (int)dispatchedMessages.size() );
-                redeliveryDelay = 0;
+                internal->additionalWindowSize =
+                    Math::max( 0, internal->additionalWindowSize - (int)internal->dispatchedMessages.size()
);
+                internal->redeliveryDelay = 0;
 
             } else {
 
@@ -939,7 +984,7 @@ void ActiveMQConsumer::rollback() {
                     ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
                     ack->setConsumerId( this->consumerInfo->getConsumerId() );
                     ack->setDestination( lastMsg->getDestination() );
-                    ack->setMessageCount( (int)dispatchedMessages.size() );
+                    ack->setMessageCount( (int)internal->dispatchedMessages.size()
);
                     ack->setLastMessageId( lastMsg->getMessage()->getMessageId()
);
                     ack->setFirstMessageId( firstMsgId );
 
@@ -947,15 +992,15 @@ void ActiveMQConsumer::rollback() {
                 }
 
                 // stop the delivery of messages.
-                unconsumedMessages->stop();
+                internal->unconsumedMessages->stop();
 
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
dispatchedMessages.iterator() );
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
 
                 while( iter->hasNext() ) {
-                    unconsumedMessages->enqueueFirst( iter->next() );
+                    internal->unconsumedMessages->enqueueFirst( iter->next() );
                 }
 
-                if (redeliveryDelay > 0 && !unconsumedMessages->isClosed())
{
+                if( internal->redeliveryDelay > 0 && !internal->unconsumedMessages->isClosed()
) {
                     // TODO
                     // Start up the delivery again a little later.
                     //scheduler.executeAfterDelay(new Runnable() {
@@ -975,13 +1020,13 @@ void ActiveMQConsumer::rollback() {
                 }
 
             }
-            deliveredCounter -= (int)dispatchedMessages.size();
-            dispatchedMessages.clear();
+            internal->deliveredCounter -= (int)internal->dispatchedMessages.size();
+            internal->dispatchedMessages.clear();
         }
     }
 
-    if( this->listener != NULL ) {
-        session->redispatch( *unconsumedMessages );
+    if( this->internal->listener != NULL ) {
+        session->redispatch( *internal->unconsumedMessages );
     }
 }
 
@@ -990,17 +1035,17 @@ void ActiveMQConsumer::dispatch( const P
 
     try {
 
-        synchronized( unconsumedMessages.get() ) {
+        synchronized( internal->unconsumedMessages.get() ) {
 
             clearMessagesInProgress();
-            if( this->clearDispatchList ) {
+            if( this->internal->clearDispatchList ) {
                 // we are reconnecting so lets flush the in progress
                 // messages
-                clearDispatchList = false;
-                unconsumedMessages->clear();
+                internal->clearDispatchList = false;
+                internal->unconsumedMessages->clear();
             }
 
-            if( !unconsumedMessages->isClosed() ) {
+            if( !internal->unconsumedMessages->isClosed() ) {
 
                 // Don't dispatch expired messages, ack it and then destroy it
                 if( dispatch->getMessage()->isExpired() ) {
@@ -1010,15 +1055,15 @@ void ActiveMQConsumer::dispatch( const P
                     return;
                 }
 
-                synchronized( &listenerMutex ) {
+                synchronized( &internal->listenerMutex ) {
                     // If we have a listener, send the message.
-                    if( this->listener != NULL && unconsumedMessages->isRunning()
) {
+                    if( this->internal->listener != NULL && internal->unconsumedMessages->isRunning()
) {
 
                         // Preprocessing.
                         beforeMessageIsConsumed( dispatch );
 
                         // Notify the listener
-                        this->listener->onMessage(
+                        this->internal->listener->onMessage(
                             dynamic_cast<cms::Message*>( dispatch->getMessage().get()
) );
 
                         // Postprocessing
@@ -1027,7 +1072,7 @@ void ActiveMQConsumer::dispatch( const P
                     } else {
 
                         // No listener, add it to the unconsumed messages list
-                        this->unconsumedMessages->enqueue( dispatch );
+                        this->internal->unconsumedMessages->enqueue( dispatch );
                     }
                 }
             }
@@ -1046,7 +1091,7 @@ void ActiveMQConsumer::sendPullRequest( 
         this->checkClosed();
 
         // There are still local message, consume them first.
-        if( !this->unconsumedMessages->isEmpty() ) {
+        if( !this->internal->unconsumedMessages->isEmpty() ) {
             return;
         }
 
@@ -1077,16 +1122,16 @@ void ActiveMQConsumer::checkClosed() con
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumer::iterate() {
 
-    synchronized( &listenerMutex ) {
+    synchronized( &internal->listenerMutex ) {
 
-        if( this->listener != NULL ) {
+        if( this->internal->listener != NULL ) {
 
-            Pointer<MessageDispatch> dispatch = unconsumedMessages->dequeueNoWait();
+            Pointer<MessageDispatch> dispatch = internal->unconsumedMessages->dequeueNoWait();
             if( dispatch != NULL ) {
 
                 try {
                     beforeMessageIsConsumed( dispatch );
-                    this->listener->onMessage(
+                    this->internal->listener->onMessage(
                         dynamic_cast<cms::Message*>( dispatch->getMessage().get()
) );
                     afterMessageIsConsumed( dispatch, false );
                 } catch( ActiveMQException& ex ) {
@@ -1104,22 +1149,22 @@ bool ActiveMQConsumer::iterate() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::inProgressClearRequired() {
 
-    inProgressClearRequiredFlag = true;
+    this->internal->inProgressClearRequiredFlag = true;
     // Clears dispatched messages async to avoid lock contention with inprogress acks.
-    clearDispatchList = true;
+    this->internal->clearDispatchList = true;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::clearMessagesInProgress() {
-    if( inProgressClearRequiredFlag ) {
-        synchronized( unconsumedMessages.get() ) {
-            if( inProgressClearRequiredFlag ) {
+    if( this->internal->inProgressClearRequiredFlag ) {
+        synchronized( this->internal->unconsumedMessages.get() ) {
+            if( this->internal->inProgressClearRequiredFlag ) {
 
                 // TODO - Rollback duplicates.
 
                 // allow dispatch on this connection to resume
                 this->session->getConnection()->setTransportInterruptionProcessingComplete();
-                inProgressClearRequiredFlag = false;
+                this->internal->inProgressClearRequiredFlag = false;
             }
         }
     }
@@ -1138,7 +1183,7 @@ bool ActiveMQConsumer::isAutoAcknowledge
 
 ////////////////////////////////////////////////////////////////////////////////
 int ActiveMQConsumer::getMessageAvailableCount() const {
-    return this->unconsumedMessages->size();
+    return this->internal->unconsumedMessages->size();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1219,11 +1264,64 @@ void ActiveMQConsumer::applyDestinationO
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
     if( policy != NULL ) {
-        this->redeliveryPolicy.reset( policy );
+        this->internal->redeliveryPolicy.reset( policy );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 RedeliveryPolicy* ActiveMQConsumer::getRedeliveryPolicy() const {
-    return this->redeliveryPolicy.get();
+    return this->internal->redeliveryPolicy.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageListener* ActiveMQConsumer::getMessageListener() const {
+    return this->internal->listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const Pointer<commands::ConsumerInfo>& ActiveMQConsumer::getConsumerInfo() const
{
+    this->checkClosed();
+    return this->consumerInfo;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const Pointer<commands::ConsumerId>& ActiveMQConsumer::getConsumerId() const {
+    this->checkClosed();
+    return this->consumerInfo->getConsumerId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::isSynchronizationRegistered() const {
+    return this->internal->synchronizationRegistered;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setSynchronizationRegistered( bool value ) {
+    this->internal->synchronizationRegistered = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConsumer::getLastDeliveredSequenceId() const {
+    return this->internal->lastDeliveredSequenceId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setLastDeliveredSequenceId( long long value ) {
+    this->internal->lastDeliveredSequenceId = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setFailureError( decaf::lang::Exception* error ) {
+    if( error != NULL ) {
+        this->internal->failureError.reset( error->clone() );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Exception* ActiveMQConsumer::getFailureError() const {
+    if( this->internal->failureError == NULL ) {
+        return NULL;
+    }
+
+    return this->internal->failureError.get();
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1025714&r1=1025713&r2=1025714&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Wed
Oct 20 20:46:54 2010
@@ -43,6 +43,7 @@ namespace core{
     using decaf::util::concurrent::atomic::AtomicBoolean;
 
     class ActiveMQSession;
+    class ActiveMQConsumerMembers;
 
     class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
                                         public Dispatcher
@@ -50,89 +51,19 @@ namespace core{
     private:
 
         /**
-         * The session that owns this Consumer
+         * Internal Class that holds Members of this class, allows for changes without API
breakage.
          */
-        ActiveMQSession* session;
-
-        /**
-         * The Consumer info for this Consumer
-         */
-        Pointer<commands::ConsumerInfo> consumerInfo;
-
-        /**
-         * The Message Listener for this Consumer
-         */
-        cms::MessageListener* listener;
-
-        /**
-         * Mutex to Protect access to the listener during delivery.
-         */
-        decaf::util::concurrent::Mutex listenerMutex;
-
-        /**
-         * Is the consumer currently delivering acks.
-         */
-        AtomicBoolean deliveringAcks;
-
-        /**
-         * Has this Consumer been started yet.
-         */
-        AtomicBoolean started;
-
-        /**
-         * Queue of unconsumed messages.
-         */
-        Pointer<MessageDispatchChannel> unconsumedMessages;
-
-        /**
-         * Queue of consumed messages.
-         */
-        decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch> >
dispatchedMessages;
-
-        /**
-         * The last delivered message's BrokerSequenceId.
-         */
-        long long lastDeliveredSequenceId;
-
-        /**
-         * Next Ack to go out.
-         */
-        Pointer<commands::MessageAck> pendingAck;
-
-        /**
-         * How many message's have been delivered so far since the last Ack was sent.
-         */
-        int deliveredCounter;
-
-        /**
-         * How big to grow the ack window next time.
-         */
-        int additionalWindowSize;
-
-        /**
-         * Has the Synchronization been added for this transaction
-         */
-        volatile bool synchronizationRegistered;
-
-        /**
-         * Boolean indicating if in progress messages should be cleared.
-         */
-        bool clearDispatchList;
+        ActiveMQConsumerMembers* internal;
 
         /**
-         * Indicates if inprogress messages are to be cleared.
+         * The ActiveMQSession that owns this class instance.
          */
-        bool inProgressClearRequiredFlag;
-
-        /**
-         * The redelivery delay used for the last set of redeliveries.
-         */
-        long long redeliveryDelay;
+        ActiveMQSession* session;
 
         /**
-         * The policy to use when Message Redelivery is in progress.
+         * The ConsumerInfo object for this class instance.
          */
-        Pointer<RedeliveryPolicy> redeliveryPolicy;
+        Pointer<commands::ConsumerInfo> consumerInfo;
 
     private:
 
@@ -174,9 +105,7 @@ namespace core{
 
         virtual void setMessageListener( cms::MessageListener* listener );
 
-        virtual cms::MessageListener* getMessageListener() const {
-            return this->listener;
-        }
+        virtual cms::MessageListener* getMessageListener() const;
 
         virtual std::string getMessageSelector() const;
 
@@ -220,19 +149,13 @@ namespace core{
          * Get the Consumer information for this consumer
          * @return Reference to a Consumer Info Object
          */
-        const Pointer<commands::ConsumerInfo>& getConsumerInfo() const {
-            this->checkClosed();
-            return this->consumerInfo;
-        }
+        const Pointer<commands::ConsumerInfo>& getConsumerInfo() const;
 
         /**
          * Get the Consumer Id for this consumer
          * @return Reference to a Consumer Id Object
          */
-        const Pointer<commands::ConsumerId>& getConsumerId() const {
-            this->checkClosed();
-            return this->consumerInfo->getConsumerId();
-        }
+        const Pointer<commands::ConsumerId>& getConsumerId() const;
 
         /**
          * @returns if this Consumer has been closed.
@@ -243,17 +166,13 @@ namespace core{
          * Has this Consumer Transaction Synchronization been added to the transaction
          * @return true if the synchronization has been added.
          */
-        bool isSynchronizationRegistered() const {
-            return this->synchronizationRegistered;
-        }
+        bool isSynchronizationRegistered() const ;
 
         /**
          * Sets the Synchronization Registered state of this consumer.
          * @param value - true if registered false otherwise.
          */
-        void setSynchronizationRegistered( bool value ) {
-            this->synchronizationRegistered = value;
-        }
+        void setSynchronizationRegistered( bool value );
 
         /**
          * Deliver any pending messages to the registered MessageListener if there
@@ -285,9 +204,7 @@ namespace core{
          *
          * @returns long long containing the sequence id of the last delivered Message.
          */
-        long long getLastDeliveredSequenceId() const {
-            return this->lastDeliveredSequenceId;
-        }
+        long long getLastDeliveredSequenceId() const;
 
         /**
          * Sets the value of the Last Delivered Sequence Id
@@ -295,9 +212,7 @@ namespace core{
          * @param value
          *      The new value to assign to the Last Delivered Sequence Id property.
          */
-        void setLastDeliveredSequenceId( long long value ) {
-            this->lastDeliveredSequenceId = value;
-        }
+        void setLastDeliveredSequenceId( long long value );
 
         /**
          * @returns the number of Message's this consumer is waiting to Dispatch.
@@ -323,6 +238,22 @@ namespace core{
          */
         RedeliveryPolicy* getRedeliveryPolicy() const;
 
+        /**
+         * Sets the Exception that has caused this Consumer to be in a failed state.
+         *
+         * @param error
+         *      The error that is to be thrown when a Receive call is made.
+         */
+        void setFailureError( decaf::lang::Exception* error );
+
+        /**
+         * Gets the error that caused this Consumer to be in a Failed state, or NULL if
+         * there is no Error.
+         *
+         * @returns pointer to the error that faulted this Consumer or NULL.
+         */
+        decaf::lang::Exception* getFailureError() const;
+
     protected:
 
         /**



Mime
View raw message