activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1326223 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
Date Sat, 14 Apr 2012 22:43:44 GMT
Author: tabish
Date: Sat Apr 14 22:43:44 2012
New Revision: 1326223

URL: http://svn.apache.org/viewvc?rev=1326223&view=rev
Log:
Schedule the consumer to start when the redelivery delay says we should.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1326223&r1=1326222&r2=1326223&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
Sat Apr 14 22:43:44 2012
@@ -232,15 +232,15 @@ namespace kernels {
 
     private:
 
-        IndividualAckHandler( const IndividualAckHandler& );
-        IndividualAckHandler& operator= ( const IndividualAckHandler& );
+        IndividualAckHandler(const IndividualAckHandler&);
+        IndividualAckHandler& operator=(const IndividualAckHandler&);
 
     public:
 
-        IndividualAckHandler( ActiveMQConsumerKernel* consumer, const Pointer<MessageDispatch>&
dispatch ) :
+        IndividualAckHandler(ActiveMQConsumerKernel* consumer, const Pointer<MessageDispatch>&
dispatch) :
             consumer(consumer), dispatch(dispatch) {
 
-            if( consumer == NULL ) {
+            if (consumer == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Ack Handler Created with NULL consumer.");
             }
@@ -270,12 +270,12 @@ namespace kernels {
 
     private:
 
-        StartConsumerTask( const StartConsumerTask& );
-        StartConsumerTask& operator= ( const StartConsumerTask& );
+        StartConsumerTask(const StartConsumerTask&);
+        StartConsumerTask& operator=(const StartConsumerTask&);
 
     public:
 
-        StartConsumerTask( ActiveMQConsumerKernel* consumer ) : Runnable(), consumer(NULL)
{
+        StartConsumerTask(ActiveMQConsumerKernel* consumer) : Runnable(), consumer(NULL)
{
 
             if (consumer == NULL) {
                 throw NullPointerException(
@@ -407,7 +407,7 @@ bool ActiveMQConsumerKernel::isClosed() 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::close() {
 
-    try{
+    try {
         if (!this->isClosed()) {
 
             if (this->session->getTransactionContext() != NULL &&
@@ -447,7 +447,7 @@ void ActiveMQConsumerKernel::doClose() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::dispose() {
 
-    try{
+    try {
         if (!this->isClosed()) {
 
             if (!session->isTransacted()) {
@@ -568,7 +568,7 @@ decaf::lang::Pointer<MessageDispatch> Ac
 ////////////////////////////////////////////////////////////////////////////////
 cms::Message* ActiveMQConsumerKernel::receive() {
 
-    try{
+    try {
 
         this->checkClosed();
 
@@ -667,7 +667,7 @@ cms::Message* ActiveMQConsumerKernel::re
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::setMessageListener( cms::MessageListener* listener ) {
 
-    try{
+    try {
 
         this->checkClosed();
 
@@ -679,9 +679,7 @@ void ActiveMQConsumerKernel::setMessageL
 
         if( listener != NULL ) {
 
-            // Now that we have a valid message listener,
-            // redispatch all the messages that it missed.
-
+            // Now that we have a valid message listener, redispatch all the messages that
it missed.
             bool wasStarted = session->isStarted();
             if( wasStarted ) {
                 session->stop();
@@ -706,7 +704,7 @@ void ActiveMQConsumerKernel::setMessageL
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::beforeMessageIsConsumed( const Pointer<MessageDispatch>&
dispatch ) {
+void ActiveMQConsumerKernel::beforeMessageIsConsumed(const Pointer<MessageDispatch>&
dispatch) {
 
     // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
     // we set the handler in the message to this object and send it out.
@@ -735,52 +733,50 @@ void ActiveMQConsumerKernel::beforeMessa
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::afterMessageIsConsumed( const Pointer<MessageDispatch>&
message,
-                                               bool messageExpired ) {
+void ActiveMQConsumerKernel::afterMessageIsConsumed(const Pointer<MessageDispatch>&
message, bool messageExpired ) {
 
-    try{
+    try {
 
-        if( this->internal->unconsumedMessages->isClosed() ) {
+        if (this->internal->unconsumedMessages->isClosed()) {
             return;
         }
 
-        if( messageExpired == true ) {
-            ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        if (messageExpired == true) {
+            ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
         }
 
-        if( session->isTransacted() ) {
+        if (session->isTransacted()) {
             return;
-        } else if( isAutoAcknowledgeEach() ) {
+        } else if (isAutoAcknowledgeEach()) {
 
-            if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
+            if (this->internal->deliveringAcks.compareAndSet(false, true)) {
 
-                synchronized( &this->internal->dispatchedMessages ) {
-                    if( !this->internal->dispatchedMessages.isEmpty() ) {
-                        Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
-                            ActiveMQConstants::ACK_TYPE_CONSUMED );
+                synchronized(&this->internal->dispatchedMessages) {
+                    if (!this->internal->dispatchedMessages.isEmpty()) {
+                        Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
 
-                        if( ack != NULL ) {
+                        if (ack != NULL) {
                             this->internal->dispatchedMessages.clear();
-                            session->oneway( ack );
+                            session->oneway(ack);
                         }
                     }
                 }
 
-                this->internal->deliveringAcks.set( false );
+                this->internal->deliveringAcks.set(false);
             }
 
-        } else if( isAutoAcknowledgeBatch() ) {
-            ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
-        } else if( session->isClientAcknowledge() || session->isIndividualAcknowledge()
) {
+        } else if (isAutoAcknowledgeBatch()) {
+            ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
+        } else if (session->isClientAcknowledge() || session->isIndividualAcknowledge())
{
 
             bool messageUnackedByConsumer = false;
 
-            synchronized( &this->internal->dispatchedMessages ) {
+            synchronized(&this->internal->dispatchedMessages) {
                 messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message);
             }
 
-            if( messageUnackedByConsumer ) {
-                this->ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
+            if (messageUnackedByConsumer) {
+                this->ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
             }
 
         } else {
@@ -795,39 +791,39 @@ void ActiveMQConsumerKernel::afterMessag
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::deliverAcks() {
 
-    try{
+    try {
 
         Pointer<MessageAck> ack;
 
-        if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
+        if (this->internal->deliveringAcks.compareAndSet(false, true)) {
 
-            if( isAutoAcknowledgeEach() ) {
+            if (isAutoAcknowledgeEach()) {
 
-                synchronized( &this->internal->dispatchedMessages ) {
+                synchronized(&this->internal->dispatchedMessages) {
 
-                    ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED
);
+                    ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
 
-                    if( ack != NULL ) {
+                    if (ack != NULL) {
                         this->internal->dispatchedMessages.clear();
                     } else {
-                        ack.swap( internal->pendingAck );
+                        ack.swap(internal->pendingAck);
                     }
                 }
 
-            } else if( this->internal->pendingAck != NULL &&
-                       this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED
) {
+            } else if (this->internal->pendingAck != NULL && this->internal->pendingAck->getAckType()
== ActiveMQConstants::ACK_TYPE_CONSUMED) {
 
-                ack.swap( this->internal->pendingAck );
+                ack.swap(this->internal->pendingAck);
             }
 
-            if( ack != NULL ) {
+            if (ack != NULL) {
 
-                try{
-                    this->session->oneway( ack );
-                } catch(...) {}
+                try {
+                    this->session->oneway(ack);
+                } catch (...) {
+                }
 
             } else {
-                this->internal->deliveringAcks.set( false );
+                this->internal->deliveringAcks.set(false);
             }
         }
     }
@@ -837,17 +833,17 @@ void ActiveMQConsumerKernel::deliverAcks
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::ackLater( const Pointer<MessageDispatch>& dispatch,
int ackType ) {
+void ActiveMQConsumerKernel::ackLater(const Pointer<MessageDispatch>& dispatch,
int ackType) {
 
     // Don't acknowledge now, but we may need to let the broker know the
     // consumer got the message to expand the pre-fetch window
-    if( session->isTransacted() ) {
+    if (session->isTransacted()) {
         session->doStartTransaction();
-        if( !this->internal->synchronizationRegistered ) {
+        if (!this->internal->synchronizationRegistered) {
             this->internal->synchronizationRegistered = true;
 
-            Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
-            this->session->getTransactionContext()->addSynchronization( sync );
+            Pointer<Synchronization> sync(new TransactionSynhcronization(this));
+            this->session->getTransactionContext()->addSynchronization(sync);
         }
     }
 
@@ -856,53 +852,53 @@ void ActiveMQConsumerKernel::ackLater( c
     this->internal->deliveredCounter++;
 
     Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
-    this->internal->pendingAck.reset( new MessageAck() );
-    this->internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
-    this->internal->pendingAck->setAckType( (unsigned char)ackType );
-    this->internal->pendingAck->setDestination( dispatch->getDestination() );
-    this->internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId()
);
-    this->internal->pendingAck->setMessageCount( internal->deliveredCounter );
-
-    if( oldPendingAck == NULL ) {
-        this->internal->pendingAck->setFirstMessageId( this->internal->pendingAck->getLastMessageId()
);
-    } else if ( oldPendingAck->getAckType() == this->internal->pendingAck->getAckType()
) {
-        this->internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId()
);
+    this->internal->pendingAck.reset(new MessageAck());
+    this->internal->pendingAck->setConsumerId(dispatch->getConsumerId());
+    this->internal->pendingAck->setAckType((unsigned char) ackType);
+    this->internal->pendingAck->setDestination(dispatch->getDestination());
+    this->internal->pendingAck->setLastMessageId(dispatch->getMessage()->getMessageId());
+    this->internal->pendingAck->setMessageCount(internal->deliveredCounter);
+
+    if (oldPendingAck == NULL) {
+        this->internal->pendingAck->setFirstMessageId(this->internal->pendingAck->getLastMessageId());
+    } else if (oldPendingAck->getAckType() == this->internal->pendingAck->getAckType())
{
+        this->internal->pendingAck->setFirstMessageId(oldPendingAck->getFirstMessageId());
     } else {
         // old pending ack being superseded by ack of another type, if is is not a delivered
         // ack and hence important, send it now so it is not lost.
-        if( oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED ) {
-            session->oneway( oldPendingAck );
+        if (oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED) {
+            session->oneway(oldPendingAck);
         }
     }
 
-    if( session->isTransacted() ) {
-        this->internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId()
);
+    if (session->isTransacted()) {
+        this->internal->pendingAck->setTransactionId(this->session->getTransactionContext()->getTransactionId());
     }
 
-    if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( internal->deliveredCounter
- internal->additionalWindowSize ) ) {
-        session->oneway( this->internal->pendingAck );
-        this->internal->pendingAck.reset( NULL );
+    if ((0.5 * this->consumerInfo->getPrefetchSize()) <= (internal->deliveredCounter
- internal->additionalWindowSize)) {
+        session->oneway(this->internal->pendingAck);
+        this->internal->pendingAck.reset(NULL);
         this->internal->deliveredCounter = 0;
         this->internal->additionalWindowSize = 0;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<MessageAck> ActiveMQConsumerKernel::makeAckForAllDeliveredMessages( int type
) {
+Pointer<MessageAck> ActiveMQConsumerKernel::makeAckForAllDeliveredMessages(int type)
{
 
     synchronized( &this->internal->dispatchedMessages ) {
 
-        if( !this->internal->dispatchedMessages.isEmpty() ) {
+        if (!this->internal->dispatchedMessages.isEmpty()) {
 
             Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
 
-            Pointer<MessageAck> ack( new MessageAck() );
-            ack->setAckType( (unsigned char)type );
-            ack->setConsumerId( dispatched->getConsumerId() );
-            ack->setDestination( dispatched->getDestination() );
-            ack->setMessageCount( (int)this->internal->dispatchedMessages.size()
);
-            ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
-            ack->setFirstMessageId( this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId()
);
+            Pointer<MessageAck> ack(new MessageAck());
+            ack->setAckType((unsigned char) type);
+            ack->setConsumerId(dispatched->getConsumerId());
+            ack->setDestination(dispatched->getDestination());
+            ack->setMessageCount((int) this->internal->dispatchedMessages.size());
+            ack->setLastMessageId(dispatched->getMessage()->getMessageId());
+            ack->setFirstMessageId(this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId());
 
             return ack;
         }
@@ -912,28 +908,28 @@ Pointer<MessageAck> ActiveMQConsumerKern
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::acknowledge( const Pointer<commands::MessageDispatch>&
dispatch ) {
+void ActiveMQConsumerKernel::acknowledge(const Pointer<commands::MessageDispatch>&
dispatch) {
 
-    try{
+    try {
 
         this->checkClosed();
 
-        if( this->session->isIndividualAcknowledge() ) {
+        if (this->session->isIndividualAcknowledge()) {
 
-            Pointer<MessageAck> ack( new MessageAck() );
-            ack->setAckType( ActiveMQConstants::ACK_TYPE_CONSUMED );
-            ack->setConsumerId( this->consumerInfo->getConsumerId() );
-            ack->setDestination( this->consumerInfo->getDestination() );
-            ack->setMessageCount( 1 );
-            ack->setLastMessageId( dispatch->getMessage()->getMessageId() );
-            ack->setFirstMessageId( dispatch->getMessage()->getMessageId() );
-
-            session->oneway( ack );
-
-            synchronized( &this->internal->dispatchedMessages ) {
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
-                while( iter->hasNext() ) {
-                    if( iter->next() == dispatch ) {
+            Pointer<MessageAck> ack(new MessageAck());
+            ack->setAckType(ActiveMQConstants::ACK_TYPE_CONSUMED);
+            ack->setConsumerId(this->consumerInfo->getConsumerId());
+            ack->setDestination(this->consumerInfo->getDestination());
+            ack->setMessageCount(1);
+            ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+            ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+
+            session->oneway(ack);
+
+            synchronized(&this->internal->dispatchedMessages) {
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(this->internal->dispatchedMessages.iterator());
+                while(iter->hasNext()) {
+                    if (iter->next() == dispatch) {
                         iter->remove();
                         break;
                     }
@@ -941,9 +937,7 @@ void ActiveMQConsumerKernel::acknowledge
             }
 
         } else {
-            throw IllegalStateException(
-                __FILE__, __LINE__,
-                "Session is not in IndividualAcknowledge mode." );
+            throw IllegalStateException(__FILE__, __LINE__, "Session is not in IndividualAcknowledge
mode." );
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -952,33 +946,30 @@ void ActiveMQConsumerKernel::acknowledge
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::acknowledge() {
 
-    try{
+    try {
 
-        synchronized( &this->internal->dispatchedMessages ) {
+        synchronized(&this->internal->dispatchedMessages) {
 
             // Acknowledge all messages so far.
-            Pointer<MessageAck> ack =
-                makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
+            Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
 
-            if( ack == NULL ) {
+            if (ack == NULL) {
                 return;
             }
 
-            if( session->isTransacted() ) {
+            if (session->isTransacted()) {
                 session->doStartTransaction();
-                ack->setTransactionId( session->getTransactionContext()->getTransactionId()
);
+                ack->setTransactionId(session->getTransactionContext()->getTransactionId());
             }
 
-            session->oneway( ack );
-            this->internal->pendingAck.reset( NULL );
+            session->oneway(ack);
+            this->internal->pendingAck.reset(NULL);
 
             // Adjust the counters
-            this->internal->deliveredCounter =
-                Math::max( 0, this->internal->deliveredCounter - (int)this->internal->dispatchedMessages.size());
-            this->internal->additionalWindowSize =
-                Math::max(0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size());
+            this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter
- (int) this->internal->dispatchedMessages.size());
+            this->internal->additionalWindowSize = Math::max(0, this->internal->additionalWindowSize
- (int) this->internal->dispatchedMessages.size());
 
-            if( !session->isTransacted() ) {
+            if (!session->isTransacted()) {
                 this->internal->dispatchedMessages.clear();
             }
         }
@@ -989,7 +980,7 @@ void ActiveMQConsumerKernel::acknowledge
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::commit() {
 
-    synchronized( &(this->internal->dispatchedMessages) ) {
+    synchronized(&(this->internal->dispatchedMessages)) {
         this->internal->dispatchedMessages.clear();
     }
     this->internal->redeliveryDelay = 0;
@@ -998,139 +989,131 @@ void ActiveMQConsumerKernel::commit() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::rollback() {
 
-    synchronized( this->internal->unconsumedMessages.get() ) {
+    synchronized(this->internal->unconsumedMessages.get()) {
 
-        synchronized( &this->internal->dispatchedMessages ) {
-            if( this->internal->dispatchedMessages.isEmpty() ) {
+        synchronized(&this->internal->dispatchedMessages) {
+            if (this->internal->dispatchedMessages.isEmpty()) {
                 return;
             }
 
             // Only increase the redelivery delay after the first redelivery..
             Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
             const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
-            if( currentRedeliveryCount > 0 ) {
-                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(
internal->redeliveryDelay );
+            if (currentRedeliveryCount > 0) {
+                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(internal->redeliveryDelay);
             } else {
                 this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
             }
 
-            Pointer<MessageId> firstMsgId =
-                this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
+            Pointer<MessageId> firstMsgId = this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
 
-            std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
+            std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(internal->dispatchedMessages.iterator());
 
-            while( iter->hasNext() ) {
+            while (iter->hasNext()) {
                 Pointer<Message> message = iter->next()->getMessage();
-                message->setRedeliveryCounter( message->getRedeliveryCounter() + 1
);
+                message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
             }
 
-            if( this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES
&&
-                lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()
) {
+            if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES
+                && lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries())
{
 
                 // We need to NACK the messages so that they get sent to the DLQ.
                 // Acknowledge the last message.
-                Pointer<MessageAck> ack( new MessageAck() );
-                ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
-                ack->setConsumerId( this->consumerInfo->getConsumerId() );
-                ack->setDestination( lastMsg->getDestination() );
-                ack->setMessageCount( (int)this->internal->dispatchedMessages.size()
);
-                ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
-                ack->setFirstMessageId( firstMsgId );
+                Pointer<MessageAck> ack(new MessageAck());
+                ack->setAckType(ActiveMQConstants::ACK_TYPE_POISON);
+                ack->setConsumerId(this->consumerInfo->getConsumerId());
+                ack->setDestination(lastMsg->getDestination());
+                ack->setMessageCount((int) this->internal->dispatchedMessages.size());
+                ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
+                ack->setFirstMessageId(firstMsgId);
 
-                session->oneway( ack );
+                session->oneway(ack);
                 // Adjust the window size.
-                this->internal->additionalWindowSize =
-                    Math::max( 0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size()
);
+                this->internal->additionalWindowSize = Math::max(0,
+                    this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
                 this->internal->redeliveryDelay = 0;
 
             } else {
 
                 // only redelivery_ack after first delivery
-                if( currentRedeliveryCount > 0 ) {
-                    Pointer<MessageAck> ack( new MessageAck() );
-                    ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
-                    ack->setConsumerId( this->consumerInfo->getConsumerId() );
-                    ack->setDestination( lastMsg->getDestination() );
-                    ack->setMessageCount( (int)this->internal->dispatchedMessages.size()
);
-                    ack->setLastMessageId( lastMsg->getMessage()->getMessageId()
);
-                    ack->setFirstMessageId( firstMsgId );
+                if (currentRedeliveryCount > 0) {
+                    Pointer<MessageAck> ack(new MessageAck());
+                    ack->setAckType(ActiveMQConstants::ACK_TYPE_REDELIVERED);
+                    ack->setConsumerId(this->consumerInfo->getConsumerId());
+                    ack->setDestination(lastMsg->getDestination());
+                    ack->setMessageCount((int) this->internal->dispatchedMessages.size());
+                    ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
+                    ack->setFirstMessageId(firstMsgId);
 
-                    session->oneway( ack );
+                    session->oneway(ack);
                 }
 
                 // stop the delivery of messages.
                 this->internal->unconsumedMessages->stop();
 
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
-
-                while( iter->hasNext() ) {
-                    this->internal->unconsumedMessages->enqueueFirst( iter->next()
);
+                std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
+                    this->internal->dispatchedMessages.iterator());
+                while (iter->hasNext()) {
+                    this->internal->unconsumedMessages->enqueueFirst(iter->next());
                 }
 
-                if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()
) {
-                    // TODO - Can't do this until we can control object lifetime.
-                    // Start up the delivery again a little later.
-                    // this->internal->scheduler->executeAfterDelay(
-                    //    new StartConsumerTask(this), internal->redeliveryDelay);
-                    start();
+                if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed())
{
+                    this->internal->scheduler->executeAfterDelay(
+                        new StartConsumerTask(this), internal->redeliveryDelay);
                 } else {
                     start();
                 }
-
             }
-            this->internal->deliveredCounter -= (int)internal->dispatchedMessages.size();
+            this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
             this->internal->dispatchedMessages.clear();
         }
     }
 
-    if( this->internal->listener != NULL ) {
-        session->redispatch( *this->internal->unconsumedMessages );
+    if (this->internal->listener != NULL) {
+        session->redispatch(*this->internal->unconsumedMessages);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::dispatch( const Pointer<MessageDispatch>& dispatch
) {
+void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
{
 
     try {
 
-        synchronized( this->internal->unconsumedMessages.get() ) {
+        synchronized(this->internal->unconsumedMessages.get()) {
 
             clearMessagesInProgress();
-            if( this->internal->clearDispatchList ) {
+            if (this->internal->clearDispatchList) {
                 // we are reconnecting so lets flush the in progress
                 // messages
                 this->internal->clearDispatchList = false;
                 this->internal->unconsumedMessages->clear();
             }
 
-            if( !this->internal->unconsumedMessages->isClosed() ) {
+            if (!this->internal->unconsumedMessages->isClosed()) {
 
                 // Don't dispatch expired messages, ack it and then destroy it
-                if( dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired()
) {
-                    this->ackLater( dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED );
+                if (dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired())
{
+                    this->ackLater(dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED);
 
                     // stop now, don't queue
                     return;
                 }
 
-                synchronized( &this->internal->listenerMutex ) {
-                    // If we have a listener, send the message.
-                    if( this->internal->listener != NULL && internal->unconsumedMessages->isRunning()
) {
+                synchronized(&this->internal->listenerMutex) {
 
-                        // Preprocessing.
-                        beforeMessageIsConsumed( dispatch );
+                    // If we have a listener, send the message.
+                    if (this->internal->listener != NULL && internal->unconsumedMessages->isRunning())
{
 
-                        // Notify the listener
+                        beforeMessageIsConsumed(dispatch);
                         this->internal->listener->onMessage(
-                            dynamic_cast<cms::Message*>( dispatch->getMessage().get()
) );
-
-                        // Postprocessing
-                        afterMessageIsConsumed( dispatch, false );
+                            dynamic_cast<cms::Message*> (dispatch->getMessage().get()));
+                        afterMessageIsConsumed(dispatch, false);
 
                     } else {
 
-                        // No listener, add it to the unconsumed messages list
-                        this->internal->unconsumedMessages->enqueue( dispatch );
+                        // No listener, add it to the unconsumed messages list it will get
pushed on the
+                        // next receive call or when a new listener is added.
+                        this->internal->unconsumedMessages->enqueue(dispatch);
                     }
                 }
             }
@@ -1142,25 +1125,25 @@ void ActiveMQConsumerKernel::dispatch( c
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::sendPullRequest( long long timeout ) {
+void ActiveMQConsumerKernel::sendPullRequest(long long timeout) {
 
     try {
 
         this->checkClosed();
 
         // There are still local message, consume them first.
-        if( !this->internal->unconsumedMessages->isEmpty() ) {
+        if (!this->internal->unconsumedMessages->isEmpty()) {
             return;
         }
 
-        if( this->consumerInfo->getPrefetchSize() == 0 ) {
+        if (this->consumerInfo->getPrefetchSize() == 0) {
 
-            Pointer<MessagePull> messagePull( new MessagePull() );
-            messagePull->setConsumerId( this->consumerInfo->getConsumerId() );
-            messagePull->setDestination( this->consumerInfo->getDestination() );
-            messagePull->setTimeout( timeout );
+            Pointer<MessagePull> messagePull(new MessagePull());
+            messagePull->setConsumerId(this->consumerInfo->getConsumerId());
+            messagePull->setDestination(this->consumerInfo->getDestination());
+            messagePull->setTimeout(timeout);
 
-            this->session->oneway( messagePull );
+            this->session->oneway(messagePull);
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -1170,30 +1153,26 @@ void ActiveMQConsumerKernel::sendPullReq
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::checkClosed() const {
-    if( this->isClosed() ) {
-        throw ActiveMQException(
-            __FILE__, __LINE__,
-            "ActiveMQConsumerKernel - Consumer Already Closed" );
+    if (this->isClosed()) {
+        throw ActiveMQException(__FILE__, __LINE__, "Consumer Already Closed" );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumerKernel::iterate() {
 
-    synchronized( &this->internal->listenerMutex ) {
-
-        if( this->internal->listener != NULL ) {
-
+    synchronized(&this->internal->listenerMutex) {
+        if (this->internal->listener != NULL) {
             Pointer<MessageDispatch> dispatch = internal->unconsumedMessages->dequeueNoWait();
-            if( dispatch != NULL ) {
+            if (dispatch != NULL) {
 
                 try {
-                    beforeMessageIsConsumed( dispatch );
+                    beforeMessageIsConsumed(dispatch);
                     this->internal->listener->onMessage(
-                        dynamic_cast<cms::Message*>( dispatch->getMessage().get()
) );
-                    afterMessageIsConsumed( dispatch, false );
-                } catch( ActiveMQException& ex ) {
-                    this->session->fire( ex );
+                        dynamic_cast<cms::Message*> (dispatch->getMessage().get()));
+                    afterMessageIsConsumed(dispatch, false);
+                } catch (ActiveMQException& ex) {
+                    this->session->fire(ex);
                 }
 
                 return true;
@@ -1214,9 +1193,9 @@ void ActiveMQConsumerKernel::inProgressC
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::clearMessagesInProgress() {
-    if( this->internal->inProgressClearRequiredFlag ) {
-        synchronized( this->internal->unconsumedMessages.get() ) {
-            if( this->internal->inProgressClearRequiredFlag ) {
+    if (this->internal->inProgressClearRequiredFlag) {
+        synchronized(this->internal->unconsumedMessages.get()) {
+            if (this->internal->inProgressClearRequiredFlag) {
 
                 // TODO - Rollback duplicates.
 
@@ -1245,7 +1224,7 @@ int ActiveMQConsumerKernel::getMessageAv
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::applyDestinationOptions( const Pointer<ConsumerInfo>&
info ) {
+void ActiveMQConsumerKernel::applyDestinationOptions(const Pointer<ConsumerInfo>&
info) {
 
     decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
 
@@ -1302,8 +1281,8 @@ void ActiveMQConsumerKernel::applyDestin
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
-    if( policy != NULL ) {
+void ActiveMQConsumerKernel::setRedeliveryPolicy(RedeliveryPolicy* policy) {
+    if (policy != NULL) {
         this->internal->redeliveryPolicy.reset(policy);
     }
 }
@@ -1351,9 +1330,9 @@ void ActiveMQConsumerKernel::setLastDeli
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::setFailureError( decaf::lang::Exception* error ) {
+void ActiveMQConsumerKernel::setFailureError(decaf::lang::Exception* error) {
     if (error != NULL) {
-        this->internal->failureError.reset( error->clone() );
+        this->internal->failureError.reset(error->clone());
     }
 }
 



Mime
View raw message