Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 24C0EFAFA for ; Mon, 1 Apr 2013 21:56:26 +0000 (UTC) Received: (qmail 56993 invoked by uid 500); 1 Apr 2013 21:56:26 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 56918 invoked by uid 500); 1 Apr 2013 21:56:26 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 56907 invoked by uid 99); 1 Apr 2013 21:56:26 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Apr 2013 21:56:26 +0000 X-ASF-Spam-Status: No, hits=-1998.9 required=5.0 tests=ADVANCE_FEE_2_NEW_FORM,ALL_TRUSTED,FILL_THIS_FORM X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Apr 2013 21:56:22 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 98FBE238899C; Mon, 1 Apr 2013 21:56:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1463311 [2/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ main/activemq/core/kernels/ test/activemq/core/ Date: Mon, 01 Apr 2013 21:56:01 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130401215602.98FBE238899C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Mon Apr 1 21:56:01 2013 @@ -25,6 +25,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -63,10 +66,22 @@ using namespace decaf::util; using namespace decaf::util::concurrent; //////////////////////////////////////////////////////////////////////////////// -namespace activemq{ +namespace activemq { namespace core { namespace kernels { + class PreviouslyDeliveredMap : public HashMap, bool> { + public: + + Pointer transactionId; + + PreviouslyDeliveredMap(Pointer transactionId) : + transactionId(transactionId) { + } + + virtual ~PreviouslyDeliveredMap() {} + }; + class ActiveMQConsumerKernelConfig { private: @@ -95,6 +110,21 @@ namespace kernels { Pointer failureError; Pointer scheduler; int hashCode; + Pointer previouslyDeliveredMessages; + long long failoverRedeliveryWaitPeriod; + bool transactedIndividualAck; + bool nonBlockingRedelivery; + bool optimizeAcknowledge; + long long optimizeAckTimestamp; + long long optimizeAcknowledgeTimeOut; + long long optimizedAckScheduledAckInterval; + Runnable* optimizedAckTask; + int ackCounter; + int dispatchedCount; + Pointer executor; + ActiveMQSessionKernel* session; + ActiveMQConsumerKernel* parent; + Pointer info; ActiveMQConsumerKernelConfig() : listener(NULL), messageAvailableListener(NULL), @@ -115,8 +145,185 @@ namespace kernels { redeliveryPolicy(), failureError(), scheduler(), - hashCode() { + hashCode(), + previouslyDeliveredMessages(), + failoverRedeliveryWaitPeriod(0), + transactedIndividualAck(false), + nonBlockingRedelivery(false), + optimizeAcknowledge(false), + optimizeAckTimestamp(System::currentTimeMillis()), + optimizeAcknowledgeTimeOut(), + optimizedAckScheduledAckInterval(), + optimizedAckTask(), + ackCounter(), + dispatchedCount(), + executor(), + session(), + parent(), + info() { + } + + bool isTimeForOptimizedAck(int prefetchSize) const { + if (ackCounter + deliveredCounter >= (prefetchSize * 0.65)) { + return true; + } + + long long nextAckTime = optimizeAckTimestamp + optimizeAcknowledgeTimeOut; + + if (optimizeAcknowledgeTimeOut > 0 && System::currentTimeMillis() >= nextAckTime) { + return true; + } + + return false; + } + + void doClearMessagesInProgress() { + if (this->inProgressClearRequiredFlag) { + synchronized(this->unconsumedMessages.get()) { + if (this->inProgressClearRequiredFlag) { + + // ensure messages that were not yet consumed are rolled back up front as they + // may get redelivered to another consumer by the Broker. + std::vector< Pointer > list = this->unconsumedMessages->removeAll(); + if (!this->info->isBrowser()) { + std::vector >::const_iterator iter = list.begin(); + + for (; iter != list.end(); ++iter) { + Pointer md = *iter; + this->session->getConnection()->rollbackDuplicate(this->parent, md->getMessage()); + } + } + + // allow dispatch on this connection to resume + this->session->getConnection()->setTransportInterruptionProcessingComplete(); + this->inProgressClearRequiredFlag = false; + + // Wake up any blockers and allow them to recheck state. + this->unconsumedMessages->notifyAll(); + } + } + } + } + + void doClearDispatchList() { + if (clearDispatchList) { + synchronized (&this->dispatchedMessages) { + if (clearDispatchList) { + if (!dispatchedMessages.isEmpty()) { + if (session->isTransacted()) { + if (previouslyDeliveredMessages == NULL) { + previouslyDeliveredMessages.reset(new PreviouslyDeliveredMap( + session->getTransactionContext()->getTransactionId())); + } + + Pointer > > iter(dispatchedMessages.iterator()); + + while (iter->hasNext()) { + Pointer dispatch = iter->next(); + previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), false); + } + } else { + dispatchedMessages.clear(); + pendingAck.reset(NULL); + } + } + clearDispatchList = false; + } + } + } + } + + void doClearPreviouslyDelivered() { + if (previouslyDeliveredMessages != NULL) { + previouslyDeliveredMessages->clear(); + previouslyDeliveredMessages.reset(NULL); + } } + + // called with deliveredMessages locked + void removeFromDeliveredMessages(Pointer key) { + Pointer< Iterator< Pointer > > iter(this->dispatchedMessages.iterator()); + while (iter->hasNext()) { + Pointer candidate = iter->next(); + if (key->equals(candidate->getMessage()->getMessageId().get())) { + session->getConnection()->rollbackDuplicate(this->parent, candidate->getMessage()); + iter->remove(); + break; + } + } + } + + // called with unconsumedMessages && deliveredMessages locked remove any message + // not re-delivered as they can't be replayed to this consumer on rollback + void rollbackPreviouslyDeliveredAndNotRedelivered() { + if (previouslyDeliveredMessages != NULL) { + + Set, bool> >& entries = previouslyDeliveredMessages->entrySet(); + Pointer, bool> > > iter(entries.iterator()); + while (iter->hasNext()) { + MapEntry, bool> entry = iter->next(); + if (!entry.getValue()) { + removeFromDeliveredMessages(entry.getKey()); + } + } + + doClearPreviouslyDelivered(); + } + } + + void rollbackOnFailedRecoveryRedelivery() { + if (previouslyDeliveredMessages != NULL) { + // if any previously delivered messages was not re-delivered, transaction is invalid + // and must roll back as messages have been dispatched elsewhere. + int numberNotReplayed = 0; + Set, bool> >& entries = previouslyDeliveredMessages->entrySet(); + Pointer, bool> > > iter(entries.iterator()); + while (iter->hasNext()) { + MapEntry, bool> entry = iter->next(); + if (!entry.getValue()) { + numberNotReplayed++; + } + } + if (numberNotReplayed > 0) { + std::string message = std::string("rolling back transaction (") + + previouslyDeliveredMessages->transactionId->toString() + + ") post failover recovery. " + Integer::toString(numberNotReplayed) + + " previously delivered message(s) not replayed to consumer: " + + info->getConsumerId()->toString(); + throw cms::TransactionRolledBackException(message); + } + } + } + + void waitForRedeliveries() { + if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != NULL) { + long expiry = System::currentTimeMillis() + failoverRedeliveryWaitPeriod; + int numberNotReplayed; + do { + numberNotReplayed = 0; + synchronized (&this->dispatchedMessages) { + if (previouslyDeliveredMessages != NULL) { + Set, bool> >& entries = previouslyDeliveredMessages->entrySet(); + Pointer, bool> > > iter(entries.iterator()); + while (iter->hasNext()) { + MapEntry, bool> entry = iter->next(); + if (!entry.getValue()) { + numberNotReplayed++; + } + } + } + } + if (numberNotReplayed > 0) { + try { + Thread::sleep(Math::max(500LL, failoverRedeliveryWaitPeriod/4)); + } catch (InterruptedException& ex) { + break; + } + } + } while (numberNotReplayed > 0 && expiry < System::currentTimeMillis()); + } + } + }; }}} @@ -150,7 +357,16 @@ namespace { virtual ~TransactionSynhcronization() {} virtual void beforeEnd() { - consumer->acknowledge(); + + if (false) { // TODO transactedIndividualAck) { +// consumer->clearDispatchList(); +// consumer->waitForRedeliveries(); +// synchronized(deliveredMessages) { +// consumer->rollbackOnFailedRecoveryRedelivery(); +// } + } else { + consumer->acknowledge(); + } consumer->setSynchronizationRegistered(false); } @@ -276,6 +492,7 @@ namespace { private: Pointer consumer; + ActiveMQSessionKernel* session; private: @@ -284,7 +501,9 @@ namespace { public: - StartConsumerTask(Pointer consumer) : Runnable(), consumer(consumer) { + StartConsumerTask(Pointer consumer, ActiveMQSessionKernel* session) : + Runnable(), consumer(consumer), session(session) { + if (consumer == NULL) { throw NullPointerException( __FILE__, __LINE__, "Synchronization Created with NULL Consumer."); @@ -299,11 +518,105 @@ namespace { this->consumer->start(); } } catch(cms::CMSException& ex) { - // TODO - Need Connection onAsyncException method. + // TODO + // this->session->getConnection()->onAsyncException(ex); } } }; + class AsyncMessageAckTask : public Runnable { + private: + + Pointer ack; + ActiveMQSessionKernel* session; + ActiveMQConsumerKernelConfig* impl; + + private: + + AsyncMessageAckTask(const AsyncMessageAckTask&); + AsyncMessageAckTask& operator=(const AsyncMessageAckTask&); + + public: + + AsyncMessageAckTask(Pointer ack, ActiveMQSessionKernel* session, ActiveMQConsumerKernelConfig* impl) : + Runnable(), ack(ack), session(session), impl(impl) {} + virtual ~AsyncMessageAckTask() {} + + virtual void run() { + try { + this->session->sendAck(ack, true); + this->impl->deliveringAcks.set(false); + } catch(Exception& ex) { + this->impl->deliveringAcks.set(false); + } catch(cms::CMSException& ex) { + this->impl->deliveringAcks.set(false); + } + } + }; + + class OptimizedAckTask : public Runnable { + private: + + ActiveMQConsumerKernel* consumer; + ActiveMQConsumerKernelConfig* impl; + + private: + + OptimizedAckTask(const OptimizedAckTask&); + OptimizedAckTask& operator=(const OptimizedAckTask&); + + public: + + OptimizedAckTask(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) : + Runnable(), consumer(consumer), impl(impl) {} + virtual ~OptimizedAckTask() {} + + virtual void run() { + try { + if (impl->optimizeAcknowledge && !impl->unconsumedMessages->isClosed()) { + this->consumer->deliverAcks(); + } + } catch(Exception& ex) { + } + } + }; + + class NonBlockingRedeliveryTask : public Runnable { + private: + + ActiveMQSessionKernel* session; + Pointer consumer; + ActiveMQConsumerKernelConfig* impl; + LinkedList > redeliveries; + + private: + + NonBlockingRedeliveryTask(const NonBlockingRedeliveryTask&); + NonBlockingRedeliveryTask& operator=(const NonBlockingRedeliveryTask&); + + public: + + NonBlockingRedeliveryTask(ActiveMQSessionKernel* session, Pointer consumer, ActiveMQConsumerKernelConfig* impl) : + Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() { + + this->redeliveries.copy(impl->dispatchedMessages); + } + virtual ~NonBlockingRedeliveryTask() {} + + virtual void run() { + try { + if (!impl->unconsumedMessages->isClosed()) { + Pointer > > iter(redeliveries.iterator()); + while (iter->hasNext() && !impl->unconsumedMessages->isClosed()) { + Pointer dispatch = iter->next(); + session->dispatch(dispatch); + } + } + } catch (Exception& e) { + session->getConnection()->onAsyncException(e); + } + } + }; } //////////////////////////////////////////////////////////////////////////////// @@ -356,14 +669,20 @@ ActiveMQConsumerKernel::ActiveMQConsumer consumerInfo->setSubscriptionName(name); consumerInfo->setSelector(selector); consumerInfo->setPrefetchSize(prefetch); + consumerInfo->setCurrentPrefetchSize(prefetch); consumerInfo->setMaximumPendingMessageLimit(maxPendingMessageCount); consumerInfo->setBrowser(browser); consumerInfo->setDispatchAsync(dispatchAsync); consumerInfo->setNoLocal(noLocal); + consumerInfo->setExclusive(session->getConnection()->isExclusiveConsumer()); + consumerInfo->setRetroactive(session->getConnection()->isUseRetroactiveConsumer()); // Initialize Consumer Data this->session = session; this->consumerInfo = consumerInfo; + this->internal->session = session; + this->internal->parent = this; + this->internal->info = consumerInfo; this->internal->hashCode = id->getHashCode(); this->internal->lastDeliveredSequenceId = -1; this->internal->synchronizationRegistered = false; @@ -388,6 +707,23 @@ ActiveMQConsumerKernel::ActiveMQConsumer applyDestinationOptions(this->consumerInfo); + if (session->getConnection()->isOptimizeAcknowledge() && session->isAutoAcknowledge() && !consumerInfo->isBrowser()) { + this->internal->optimizeAcknowledge = true; + } + + if (this->internal->optimizeAcknowledge) { + this->internal->optimizeAcknowledgeTimeOut = session->getConnection()->getOptimizeAcknowledgeTimeOut(); + this->setOptimizedAckScheduledAckInterval( + session->getConnection()->getOptimizedAckScheduledAckInterval()); + } + + consumerInfo->setOptimizedAcknowledge(this->internal->optimizeAcknowledge); + this->internal->failoverRedeliveryWaitPeriod = + session->getConnection()->getConsumerFailoverRedeliveryWaitPeriod(); + this->internal->nonBlockingRedelivery = session->getConnection()->isNonBlockingRedelivery(); + this->internal->transactedIndividualAck = + session->getConnection()->isTransactedIndividualAck() || this->internal->nonBlockingRedelivery; + if (this->consumerInfo->getPrefetchSize() < 0) { delete this->internal; throw IllegalArgumentException( @@ -423,7 +759,7 @@ void ActiveMQConsumerKernel::start() { //////////////////////////////////////////////////////////////////////////////// void ActiveMQConsumerKernel::stop() { - this->internal->started.set( false ); + this->internal->started.set(false); this->internal->unconsumedMessages->stop(); } @@ -443,8 +779,6 @@ void ActiveMQConsumerKernel::close() { Pointer sync(new CloseSynhcronization(this)); this->session->getTransactionContext()->addSynchronization(sync); - - doClose(); } else { doClose(); } @@ -457,6 +791,9 @@ void ActiveMQConsumerKernel::close() { void ActiveMQConsumerKernel::doClose() { try { + // Store interrupted state and clear so that Transport operations don't + // throw InterruptedException and we ensure that resources are clened up. + bool interrupted = Thread::interrupted(); dispose(); // Remove at the Broker Side, consumer has been removed from the local @@ -466,6 +803,9 @@ void ActiveMQConsumerKernel::doClose() { info->setObjectId(this->consumerInfo->getConsumerId()); info->setLastDeliveredSequenceId(this->internal->lastDeliveredSequenceId); this->session->oneway(info); + if (interrupted) { + Thread::currentThread()->interrupt(); + } } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -481,10 +821,40 @@ void ActiveMQConsumerKernel::dispose() { if (!session->isTransacted()) { deliverAcks(); + if (isAutoAcknowledgeBatch()) { + acknowledge(); + } } this->internal->started.set(false); + if (this->internal->executor != NULL) { + this->internal->executor->shutdown(); + this->internal->executor->awaitTermination(60, TimeUnit::SECONDS); + this->internal->executor.reset(NULL); + } + + if (this->internal->optimizedAckTask != NULL) { + this->session->getScheduler()->cancel(this->internal->optimizedAckTask); + this->internal->optimizedAckTask = NULL; + } + + if (session->isClientAcknowledge()) { + if (!this->consumerInfo->isBrowser()) { + // roll back duplicates that aren't acknowledged + ArrayList< Pointer > tmp; + synchronized(&this->internal->dispatchedMessages) { + tmp.copy(this->internal->dispatchedMessages); + } + Pointer< Iterator > > iter(tmp.iterator()); + while (iter->hasNext()) { + Pointer msg = iter->next(); + this->session->getConnection()->rollbackDuplicate(this, msg->getMessage()); + } + tmp.clear(); + } + } + // Identifies any errors encountered during shutdown. bool haveException = false; ActiveMQException error; @@ -553,8 +923,6 @@ decaf::lang::Pointer Ac try { - this->checkClosed(); - // Calculate the deadline long long deadline = 0; if (timeout > 0) { @@ -566,7 +934,6 @@ decaf::lang::Pointer Ac Pointer dispatch = this->internal->unconsumedMessages->dequeue(timeout); if (dispatch == NULL) { - if (timeout > 0 && !this->internal->unconsumedMessages->isClosed()) { timeout = Math::max(deadline - System::currentTimeMillis(), 0LL); } else { @@ -576,13 +943,9 @@ decaf::lang::Pointer Ac return Pointer (); } } - } else if (dispatch->getMessage() == NULL) { - return Pointer (); - } else if (dispatch->getMessage()->isExpired()) { - beforeMessageIsConsumed(dispatch); afterMessageIsConsumed(dispatch, true); if (timeout > 0) { @@ -597,6 +960,9 @@ decaf::lang::Pointer Ac } return Pointer(); + } catch (InterruptedException& ex) { + Thread::currentThread()->interrupt(); + throw CMSExceptionSupport::create(ex); } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } @@ -607,6 +973,7 @@ cms::Message* ActiveMQConsumerKernel::re try { this->checkClosed(); + this->checkMessageListener(); // Send a request for a new message if needed this->sendPullRequest(0); @@ -633,6 +1000,7 @@ cms::Message* ActiveMQConsumerKernel::re try { this->checkClosed(); + this->checkMessageListener(); // Send a request for a new message if needed this->sendPullRequest(millisecs); @@ -659,6 +1027,7 @@ cms::Message* ActiveMQConsumerKernel::re try { this->checkClosed(); + this->checkMessageListener(); // Send a request for a new message if needed this->sendPullRequest(-1); @@ -718,8 +1087,7 @@ void ActiveMQConsumerKernel::setMessageL } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::beforeMessageIsConsumed(const Pointer& dispatch) { - +void ActiveMQConsumerKernel::beforeMessageIsConsumed(Pointer dispatch) { this->internal->lastDeliveredSequenceId = dispatch->getMessage()->getMessageId()->getBrokerSequenceId(); if (!isAutoAcknowledgeBatch()) { @@ -730,54 +1098,100 @@ void ActiveMQConsumerKernel::beforeMessa } if (this->session->isTransacted()) { - ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED); + if (this->internal->transactedIndividualAck) { + immediateIndividualTransactedAck(dispatch); + } else { + ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED); + } } } } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::afterMessageIsConsumed(const Pointer& message, bool messageExpired ) { +void ActiveMQConsumerKernel::immediateIndividualTransactedAck(Pointer dispatch) { + // acks accumulate on the broker pending transaction completion to indicate delivery status + registerSync(); + + Pointer ack(new MessageAck); + + ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL); + ack->setConsumerId(dispatch->getConsumerId()); + ack->setDestination(dispatch->getDestination()); + ack->setLastMessageId(dispatch->getMessage()->getMessageId()); + ack->setMessageCount(1); + ack->setTransactionId(this->session->getTransactionContext()->getTransactionId()); + + this->session->syncRequest(ack); +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::registerSync() { + this->session->doStartTransaction(); + if (!this->internal->synchronizationRegistered) { + this->internal->synchronizationRegistered = true; + Pointer sync(new TransactionSynhcronization(this)); + this->session->getTransactionContext()->addSynchronization(sync); + } +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer message, bool messageExpired ) { try { if (this->internal->unconsumedMessages->isClosed()) { return; - } - - if (messageExpired == true) { - synchronized(&this->internal->dispatchedMessages) { - this->internal->dispatchedMessages.remove(message); - } + } else if (messageExpired == true) { ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED); return; - } - - if (session->isTransacted()) { + } else if (session->isTransacted()) { return; - } else if (isAutoAcknowledgeEach()) { + } + if (isAutoAcknowledgeEach()) { if (this->internal->deliveringAcks.compareAndSet(false, true)) { - synchronized(&this->internal->dispatchedMessages) { if (!this->internal->dispatchedMessages.isEmpty()) { - Pointer ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED); + if (this->internal->optimizeAcknowledge) { - if (ack != NULL) { - this->internal->dispatchedMessages.clear(); - session->oneway(ack); + this->internal->ackCounter++; + if (this->internal->isTimeForOptimizedAck(this->consumerInfo->getPrefetchSize())) { + Pointer ack = + makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED); + if (ack != NULL) { + this->internal->dispatchedMessages.clear(); + this->internal->ackCounter = 0; + this->session->sendAck(ack); + this->internal->optimizeAckTimestamp = System::currentTimeMillis(); + } + + // As further optimization send ack for expired messages when there + // are any. This resets the deliveredCounter to 0 so that we won't + // send standard acks with every message just because the deliveredCounter + // just below 0.5 * prefetch as used in ackLater() + if (this->internal->pendingAck != NULL && this->internal->deliveredCounter > 0) { + this->session->sendAck(this->internal->pendingAck); + this->internal->pendingAck.reset(NULL); + this->internal->deliveredCounter = 0; + } + } + } else { + Pointer ack = + makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED); + if (ack != NULL) { + this->internal->dispatchedMessages.clear(); + session->oneway(ack); + } } } } this->internal->deliveringAcks.set(false); } - } else if (isAutoAcknowledgeBatch()) { ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED); } else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) { - bool messageUnackedByConsumer = false; - synchronized(&this->internal->dispatchedMessages) { messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message); } @@ -785,9 +1199,8 @@ void ActiveMQConsumerKernel::afterMessag if (messageUnackedByConsumer) { this->ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED); } - } else { - throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" ); + throw IllegalStateException(__FILE__, __LINE__, "Invalid Session State"); } } AMQ_CATCH_RETHROW(ActiveMQException) @@ -801,34 +1214,30 @@ void ActiveMQConsumerKernel::deliverAcks try { Pointer ack; - if (this->internal->deliveringAcks.compareAndSet(false, true)) { if (isAutoAcknowledgeEach()) { - synchronized(&this->internal->dispatchedMessages) { - ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED); - if (ack != NULL) { this->internal->dispatchedMessages.clear(); + this->internal->ackCounter = 0; } else { ack.swap(internal->pendingAck); } } - - } else if (this->internal->pendingAck != NULL && this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) { - + } else if (this->internal->pendingAck != NULL && + this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) { ack.swap(this->internal->pendingAck); } if (ack != NULL) { - - try { - this->session->oneway(ack); - } catch (...) { + if (this->internal->executor == NULL) { + this->internal->executor.reset(Executors::newSingleThreadExecutor()); } + this->internal->executor->submit( + new AsyncMessageAckTask(ack, this->session, this->internal), true); } else { this->internal->deliveringAcks.set(false); } @@ -840,18 +1249,12 @@ void ActiveMQConsumerKernel::deliverAcks } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::ackLater(const Pointer& dispatch, int ackType) { +void ActiveMQConsumerKernel::ackLater(Pointer dispatch, int ackType) { // Don't acknowledge now, but we may need to let the broker know the // consumer got the message to expand the pre-fetch window if (session->isTransacted()) { - session->doStartTransaction(); - if (!this->internal->synchronizationRegistered) { - this->internal->synchronizationRegistered = true; - - Pointer sync(new TransactionSynhcronization(this)); - this->session->getTransactionContext()->addSynchronization(sync); - } + registerSync(); } // The delivered message list is only needed for the recover method @@ -882,8 +1285,10 @@ void ActiveMQConsumerKernel::ackLater(co this->internal->pendingAck->setTransactionId(this->session->getTransactionContext()->getTransactionId()); } - if ((0.5 * this->consumerInfo->getPrefetchSize()) <= (internal->deliveredCounter - internal->additionalWindowSize)) { - session->oneway(this->internal->pendingAck); + // Need to evaluate both expired and normal messages as otherwise consumer may get stalled + int pendingAcks = (internal->deliveredCounter + internal->ackCounter) - internal->additionalWindowSize; + if ((0.5 * this->consumerInfo->getPrefetchSize()) <= pendingAcks) { + session->sendAck(this->internal->pendingAck); this->internal->pendingAck.reset(NULL); this->internal->deliveredCounter = 0; this->internal->additionalWindowSize = 0; @@ -915,36 +1320,27 @@ Pointer ActiveMQConsumerKern } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::acknowledge(const Pointer& dispatch) { - - try { - - this->checkClosed(); +void ActiveMQConsumerKernel::acknowledge(Pointer dispatch) { + this->acknowledge(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL); +} - if (this->session->isIndividualAcknowledge()) { +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::acknowledge(Pointer dispatch, int ackType) { - Pointer ack(new MessageAck()); - ack->setAckType(ActiveMQConstants::ACK_TYPE_CONSUMED); - ack->setConsumerId(this->consumerInfo->getConsumerId()); - ack->setDestination(this->consumerInfo->getDestination()); - ack->setMessageCount(1); - ack->setLastMessageId(dispatch->getMessage()->getMessageId()); - ack->setFirstMessageId(dispatch->getMessage()->getMessageId()); + try { - session->oneway(ack); + Pointer ack(new MessageAck()); + ack->setAckType((unsigned char) ackType); + ack->setConsumerId(this->consumerInfo->getConsumerId()); + ack->setDestination(this->consumerInfo->getDestination()); + ack->setMessageCount(1); + ack->setLastMessageId(dispatch->getMessage()->getMessageId()); + ack->setFirstMessageId(dispatch->getMessage()->getMessageId()); - synchronized(&this->internal->dispatchedMessages) { - std::auto_ptr< Iterator< Pointer > > iter(this->internal->dispatchedMessages.iterator()); - while(iter->hasNext()) { - if (iter->next() == dispatch) { - iter->remove(); - break; - } - } - } + session->sendAck(ack); - } else { - throw IllegalStateException(__FILE__, __LINE__, "Session is not in IndividualAcknowledge mode." ); + synchronized(&this->internal->dispatchedMessages) { + this->internal->dispatchedMessages.remove(dispatch); } } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() @@ -955,6 +1351,9 @@ void ActiveMQConsumerKernel::acknowledge try { + clearDispatchList(); + this->internal->waitForRedeliveries(); + synchronized(&this->internal->dispatchedMessages) { // Acknowledge all messages so far. @@ -965,12 +1364,13 @@ void ActiveMQConsumerKernel::acknowledge } if (session->isTransacted()) { + this->internal->rollbackOnFailedRecoveryRedelivery(); session->doStartTransaction(); ack->setTransactionId(session->getTransactionContext()->getTransactionId()); } - session->oneway(ack); this->internal->pendingAck.reset(NULL); + session->sendAck(ack); // Adjust the counters this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->dispatchedMessages.size()); @@ -998,7 +1398,22 @@ void ActiveMQConsumerKernel::rollback() synchronized(this->internal->unconsumedMessages.get()) { + if (this->internal->optimizeAcknowledge) { + // remove messages read but not acknowledged at the broker yet through optimizeAcknowledge + if (!this->consumerInfo->isBrowser()) { + synchronized(&this->internal->dispatchedMessages) { + for (int i = 0; (i < this->internal->dispatchedMessages.size()) && + (i < this->internal->ackCounter); i++) { + // ensure we don't filter this as a duplicate + Pointer md = this->internal->dispatchedMessages.removeLast(); + session->getConnection()->rollbackDuplicate(this, md->getMessage()); + } + } + } + } + synchronized(&this->internal->dispatchedMessages) { + this->internal->rollbackPreviouslyDeliveredAndNotRedelivered(); if (this->internal->dispatchedMessages.isEmpty()) { return; } @@ -1014,15 +1429,16 @@ void ActiveMQConsumerKernel::rollback() Pointer firstMsgId = this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId(); - std::auto_ptr > > iter(internal->dispatchedMessages.iterator()); - + Pointer > > iter(internal->dispatchedMessages.iterator()); while (iter->hasNext()) { Pointer message = iter->next()->getMessage(); message->setRedeliveryCounter(message->getRedeliveryCounter() + 1); + // ensure we don't filter this as a duplicate + session->getConnection()->rollbackDuplicate(this, message); } - if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES - && lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) { + if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES && + lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) { // We need to NACK the messages so that they get sent to the DLQ. // Acknowledge the last message. @@ -1033,8 +1449,9 @@ void ActiveMQConsumerKernel::rollback() ack->setMessageCount((int) this->internal->dispatchedMessages.size()); ack->setLastMessageId(lastMsg->getMessage()->getMessageId()); ack->setFirstMessageId(firstMsgId); + // TODO - ack->setPoisonCause() - session->oneway(ack); + session->sendAck(ack, true); // Adjust the window size. this->internal->additionalWindowSize = Math::max(0, this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size()); @@ -1055,22 +1472,33 @@ void ActiveMQConsumerKernel::rollback() session->oneway(ack); } - // stop the delivery of messages. - this->internal->unconsumedMessages->stop(); - - std::auto_ptr > > iter( - this->internal->dispatchedMessages.iterator()); - while (iter->hasNext()) { - this->internal->unconsumedMessages->enqueueFirst(iter->next()); - } + if (this->internal->nonBlockingRedelivery) { - if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) { - Pointer self = - this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId()); - this->internal->scheduler->executeAfterDelay( - new StartConsumerTask(self), internal->redeliveryDelay); + if (!this->internal->unconsumedMessages->isClosed()) { + Pointer self = + this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId()); + this->session->getScheduler()->executeAfterDelay( + new NonBlockingRedeliveryTask(session, self, this->internal), + this->internal->redeliveryDelay); + } } else { - start(); + // stop the delivery of messages. + this->internal->unconsumedMessages->stop(); + + std::auto_ptr > > iter( + this->internal->dispatchedMessages.iterator()); + while (iter->hasNext()) { + this->internal->unconsumedMessages->enqueueFirst(iter->next()); + } + + if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) { + Pointer self = + this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId()); + this->internal->scheduler->executeAfterDelay( + new StartConsumerTask(self, session), internal->redeliveryDelay); + } else { + start(); + } } } this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size(); @@ -1088,40 +1516,94 @@ void ActiveMQConsumerKernel::dispatch(co try { - synchronized(this->internal->unconsumedMessages.get()) { + clearMessagesInProgress(); + clearDispatchList(); - clearMessagesInProgress(); - if (this->internal->clearDispatchList) { - // we are reconnecting so lets flush the in progress messages - this->internal->clearDispatchList = false; - this->internal->unconsumedMessages->clear(); - } + synchronized(this->internal->unconsumedMessages.get()) { if (!this->internal->unconsumedMessages->isClosed()) { - // Don't dispatch expired messages, ack it and then destroy it - if (dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired()) { - this->ackLater(dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED); - return; - } - - synchronized(&this->internal->listenerMutex) { + if (this->consumerInfo->isBrowser() || !session->getConnection()->isDuplicate(this, dispatch->getMessage())) { - // If we have a listener, send the message. - if (this->internal->listener != NULL && internal->unconsumedMessages->isRunning()) { - - Pointer message = createCMSMessage(dispatch); - beforeMessageIsConsumed(dispatch); - this->internal->listener->onMessage(message.get()); - afterMessageIsConsumed(dispatch, false); + synchronized(&this->internal->listenerMutex) { + if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) { + Pointer message = createCMSMessage(dispatch); + beforeMessageIsConsumed(dispatch); + try { + bool expired = dispatch->getMessage()->isExpired(); + if (!expired) { + this->internal->listener->onMessage(message.get()); + } + afterMessageIsConsumed(dispatch, expired); + } catch (RuntimeException& e) { + if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) { + // Schedule redelivery and possible DLQ processing + // dispatch->setRollbackCause(e); // TODO + rollback(); + } else { + // Transacted or Client ack: Deliver the next message. + afterMessageIsConsumed(dispatch, false); + } + } + } else { + if (!this->internal->unconsumedMessages->isRunning()) { + // delayed redelivery, ensure it can be re delivered + session->getConnection()->rollbackDuplicate(this, dispatch->getMessage()); + } + this->internal->unconsumedMessages->enqueue(dispatch); + if (this->internal->messageAvailableListener != NULL) { + this->internal->messageAvailableListener->onMessageAvailable(this); + } + } + } + } else { + if (!session->isTransacted()) { + Pointer ack(new MessageAck()); + ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL); + ack->setConsumerId(this->consumerInfo->getConsumerId()); + ack->setDestination(dispatch->getDestination()); + ack->setMessageCount(1); + ack->setLastMessageId(dispatch->getMessage()->getMessageId()); + ack->setFirstMessageId(dispatch->getMessage()->getMessageId()); + session->sendAck(ack); } else { - - // No listener, add it to the unconsumed messages list it will get pushed on the - // next receive call or when a new listener is added. - this->internal->unconsumedMessages->enqueue(dispatch); - if (this->internal->messageAvailableListener != NULL) { - this->internal->messageAvailableListener->onMessageAvailable(this); + bool needsPoisonAck = false; + synchronized (&this->internal->dispatchedMessages) { + if (this->internal->previouslyDeliveredMessages != NULL) { + this->internal->previouslyDeliveredMessages->put( + dispatch->getMessage()->getMessageId(), true); + } else { + // delivery while pending redelivery to another consumer on the same connection + // not waiting for redelivery will help here + needsPoisonAck = true; + } + } + if (needsPoisonAck) { + Pointer poisonAck(new MessageAck()); + poisonAck->setAckType(ActiveMQConstants::ACK_TYPE_POISON); + poisonAck->setConsumerId(this->consumerInfo->getConsumerId()); + poisonAck->setDestination(dispatch->getDestination()); + poisonAck->setMessageCount(1); + poisonAck->setLastMessageId(dispatch->getMessage()->getMessageId()); + poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId()); +// TODO +// poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: " +// + session.getConnection().getConnectionInfo().getConnectionId())); + session->sendAck(poisonAck); + } else { + if (this->internal->transactedIndividualAck) { + immediateIndividualTransactedAck(dispatch); + } else { + Pointer ack(new MessageAck()); + ack->setAckType(ActiveMQConstants::ACK_TYPE_DELIVERED); + ack->setConsumerId(this->consumerInfo->getConsumerId()); + ack->setDestination(dispatch->getDestination()); + ack->setMessageCount(1); + ack->setLastMessageId(dispatch->getMessage()->getMessageId()); + ack->setFirstMessageId(dispatch->getMessage()->getMessageId()); + session->sendAck(ack); + } } } } @@ -1187,7 +1669,7 @@ void ActiveMQConsumerKernel::sendPullReq try { - this->checkClosed(); + clearDispatchList(); // There are still local message, consume them first. if (!this->internal->unconsumedMessages->isEmpty()) { @@ -1217,6 +1699,16 @@ void ActiveMQConsumerKernel::checkClosed } //////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::checkMessageListener() const { + if (this->internal->listener != NULL) { + throw cms::IllegalStateException( + "Cannot synchronously receive a message when a MessageListener is set"); + } + + this->session->checkMessageListener(); +} + +//////////////////////////////////////////////////////////////////////////////// bool ActiveMQConsumerKernel::iterate() { synchronized(&this->internal->listenerMutex) { @@ -1234,7 +1726,6 @@ bool ActiveMQConsumerKernel::iterate() { //////////////////////////////////////////////////////////////////////////////// void ActiveMQConsumerKernel::inProgressClearRequired() { - this->internal->inProgressClearRequiredFlag = true; // Clears dispatched messages async to avoid lock contention with inprogress acks. this->internal->clearDispatchList = true; @@ -1246,17 +1737,35 @@ void ActiveMQConsumerKernel::clearMessag synchronized(this->internal->unconsumedMessages.get()) { if (this->internal->inProgressClearRequiredFlag) { - // TODO - Rollback duplicates. + // ensure messages that were not yet consumed are rolled back up front as they + // may get redelivered to another consumer by the Broker. + std::vector< Pointer > list = this->internal->unconsumedMessages->removeAll(); + if (!this->consumerInfo->isBrowser()) { + std::vector< Pointer >::const_iterator iter = list.begin(); + + for (; iter != list.end(); ++iter) { + Pointer md = *iter; + this->session->getConnection()->rollbackDuplicate(this, md->getMessage()); + } + } // allow dispatch on this connection to resume this->session->getConnection()->setTransportInterruptionProcessingComplete(); this->internal->inProgressClearRequiredFlag = false; + + // Wake up any blockers and allow them to recheck state. + this->internal->unconsumedMessages->notifyAll(); } } } } //////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::clearDispatchList() { + this->internal->doClearDispatchList(); +} + +//////////////////////////////////////////////////////////////////////////////// bool ActiveMQConsumerKernel::isAutoAcknowledgeEach() const { return this->session->isAutoAcknowledge() || (this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue()); @@ -1273,12 +1782,11 @@ int ActiveMQConsumerKernel::getMessageAv } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::applyDestinationOptions(const Pointer& info) { +void ActiveMQConsumerKernel::applyDestinationOptions(Pointer info) { decaf::lang::Pointer amqDestination = info->getDestination(); - // Get any options specified in the destination and apply them to the - // ConsumerInfo object. + // Get any options specified in the destination and apply them to the ConsumerInfo object. const ActiveMQProperties& options = amqDestination->getOptions(); std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL); @@ -1307,7 +1815,6 @@ void ActiveMQConsumerKernel::applyDestin } std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT); - if (options.hasProperty(maxPendingMsgLimitStr)) { info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr))); } @@ -1322,11 +1829,10 @@ void ActiveMQConsumerKernel::applyDestin info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr))); } - std::string networkSubscriptionStr = "consumer.networkSubscription"; - - if (options.hasProperty(networkSubscriptionStr)) { - info->setNetworkSubscription(Boolean::parseBoolean(options.getProperty(networkSubscriptionStr))); - } + this->internal->nonBlockingRedelivery = Boolean::parseBoolean( + options.getProperty("consumer.nonBlockingRedelivery", "false")); + this->internal->nonBlockingRedelivery = Boolean::parseBoolean( + options.getProperty("consumer.transactedIndividualAck", "false")); } //////////////////////////////////////////////////////////////////////////////// @@ -1372,7 +1878,7 @@ bool ActiveMQConsumerKernel::isSynchroni } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::setSynchronizationRegistered( bool value ) { +void ActiveMQConsumerKernel::setSynchronizationRegistered(bool value) { this->internal->synchronizationRegistered = value; } @@ -1382,11 +1888,31 @@ long long ActiveMQConsumerKernel::getLas } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConsumerKernel::setLastDeliveredSequenceId( long long value ) { +void ActiveMQConsumerKernel::setLastDeliveredSequenceId(long long value) { this->internal->lastDeliveredSequenceId = value; } //////////////////////////////////////////////////////////////////////////////// +bool ActiveMQConsumerKernel::isTransactedIndividualAck() const { + return this->internal->transactedIndividualAck; +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::setTransactedIndividualAck(bool value) { + this->internal->transactedIndividualAck = value; +} + +//////////////////////////////////////////////////////////////////////////////// +long long ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod() const { + return this->internal->failoverRedeliveryWaitPeriod; +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod(long long value) { + this->internal->failoverRedeliveryWaitPeriod = value; +} + +//////////////////////////////////////////////////////////////////////////////// void ActiveMQConsumerKernel::setFailureError(decaf::lang::Exception* error) { if (error != NULL) { this->internal->failureError.reset(error->clone()); @@ -1427,3 +1953,49 @@ cms::MessageAvailableListener* ActiveMQC int ActiveMQConsumerKernel::getHashCode() const { return this->internal->hashCode; } + +//////////////////////////////////////////////////////////////////////////////// +long long ActiveMQConsumerKernel::getOptimizedAckScheduledAckInterval() const { + return this->internal->optimizedAckScheduledAckInterval; +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::setOptimizedAckScheduledAckInterval(long long value) { + this->internal->optimizedAckScheduledAckInterval = value; + + if (this->internal->optimizedAckTask != NULL) { + try { + this->session->getScheduler()->cancel(this->internal->optimizedAckTask); + this->internal->optimizedAckTask = NULL; + } catch (Exception& e) { + this->internal->optimizedAckTask = NULL; + throw CMSExceptionSupport::create(e); + } + } + + // Should we periodically send out all outstanding acks. + if (this->internal->optimizeAcknowledge && this->internal->optimizedAckScheduledAckInterval > 0) { + this->internal->optimizedAckTask = new OptimizedAckTask(this, this->internal); + + try { + this->session->getScheduler()->executePeriodically( + this->internal->optimizedAckTask, this->internal->optimizedAckScheduledAckInterval); + } catch (Exception& e) { + throw CMSExceptionSupport::create(e); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +bool ActiveMQConsumerKernel::isOptimizeAcknowledge() const { + return this->internal->optimizeAcknowledge; +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConsumerKernel::setOptimizeAcknowledge(bool value) { + if (this->internal->optimizeAcknowledge && !value) { + deliverAcks(); + } + + this->internal->optimizeAcknowledge = value; +} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1463311&r1=1463310&r2=1463311&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Mon Apr 1 21:56:01 2013 @@ -109,8 +109,6 @@ namespace kernels { virtual std::string getMessageSelector() const; - virtual void acknowledge(const Pointer& dispatch); - virtual void setMessageTransformer(cms::MessageTransformer* transformer); virtual cms::MessageTransformer* getMessageTransformer() const; @@ -131,6 +129,20 @@ namespace kernels { void acknowledge(); /** + * Method called to acknowledge the Message contained in the given MessageDispatch + * + * @throw CMSException if an error occurs while ack'ing the message. + */ + void acknowledge(Pointer dispatch); + + /** + * Method called to acknowledge the Message contained in the given MessageDispatch + * + * @throw CMSException if an error occurs while ack'ing the message. + */ + void acknowledge(Pointer dispatch, int ackType); + + /** * Called to Commit the current set of messages in this Transaction * * @throw ActiveMQException if an error occurs while performing the operation. @@ -220,6 +232,37 @@ namespace kernels { long long getLastDeliveredSequenceId() const; /** + * Will Message's in a transaction be acknowledged using the Individual Acknowledge mode. + * + * @return true if individual transacted acknowledge is enabled. + */ + bool isTransactedIndividualAck() const; + + /** + * Set if Message's in a transaction be acknowledged using the Individual Acknowledge mode. + * + * @param value + * True if individual transacted acknowledge is enabled. + */ + void setTransactedIndividualAck(bool value); + + /** + * Returns the delay after a failover before Message redelivery starts. + * + * @returns time in milliseconds to wait after failover. + */ + long long setFailoverRedeliveryWaitPeriod() const; + + /** + * Sets the time in milliseconds to delay after failover before starting + * message redelivery. + * + * @param value + * Time in milliseconds to delay after failover for redelivery start. + */ + void setFailoverRedeliveryWaitPeriod(long long value); + + /** * Sets the value of the Last Delivered Sequence Id * * @param value @@ -280,6 +323,37 @@ namespace kernels { */ bool isInUse(Pointer destination) const; + /** + * Time in Milliseconds before an automatic acknowledge is done for any outstanding + * delivered Messages. A value less than one means no task is scheduled. + * + * @returns time in milliseconds for the scheduled ack task. + */ + long long getOptimizedAckScheduledAckInterval() const; + + /** + * Sets the time in Milliseconds to schedule an automatic acknowledge of outstanding + * messages when optimize acknowledge is enabled. A value less than one means disable + * any scheduled tasks. + * + * @param value + * The time interval to send scheduled acks. + */ + void setOptimizedAckScheduledAckInterval(long long value); + + /** + * @returns true if this consumer is using optimize acknowledge mode. + */ + bool isOptimizeAcknowledge() const; + + /** + * Enable or disable optimized acknowledge for this consumer. + * + * @param value + * True if optimize acknowledge is enabled, false otherwise. + */ + void setOptimizeAcknowledge(bool value); + protected: /** @@ -303,50 +377,41 @@ namespace kernels { * Pre-consume processing * @param dispatch - the message being consumed. */ - void beforeMessageIsConsumed(const Pointer& dispatch); + void beforeMessageIsConsumed(Pointer dispatch); /** * Post-consume processing * @param dispatch - the consumed message * @param messageExpired - flag indicating if the message has expired. */ - void afterMessageIsConsumed(const Pointer& dispatch, bool messageExpired); + void afterMessageIsConsumed(Pointer dispatch, bool messageExpired); private: - // Creates a deliverable cms::Message from a received MessageDispatch, transforming if needed - // and configuring appropriate ack handlers. Pointer createCMSMessage(Pointer dispatch); - // Using options from the Destination URI override any settings that are - // defined for this consumer. - void applyDestinationOptions(const Pointer& info); - - // If supported sends a message pull request to the service provider asking - // for the delivery of a new message. This is used in the case where the - // service provider has been configured with a zero prefetch or is only - // capable of delivering messages on a pull basis. No request is made if - // there are already messages in the unconsumed queue since there's no need - // for a server round-trip in that instance. + void applyDestinationOptions(Pointer info); + void sendPullRequest(long long timeout); - // Checks for the closed state and throws if so. void checkClosed() const; - // Sends an ack as needed in order to keep them coming in if the current - // ack mode allows the consumer to receive up to the prefetch limit before - // an real ack is sent. - void ackLater(const Pointer& message, int ackType); + void checkMessageListener() const; + + void ackLater(Pointer message, int ackType); + + void immediateIndividualTransactedAck(Pointer dispatch); - // Create an Ack Message that acks all messages that have been delivered so far. Pointer makeAckForAllDeliveredMessages(int type); - // Should Acks be sent on each dispatched message bool isAutoAcknowledgeEach() const; - // Can Acks be batched for less network overhead. bool isAutoAcknowledgeBatch() const; + void registerSync(); + + void clearDispatchList(); + }; }}} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Apr 1 21:56:01 2013 @@ -105,7 +105,8 @@ namespace kernels{ SessionConfig() : synchronizationRegistered(false), producerLock(), producers(), consumerLock(), consumers(), - scheduler(), closeSync(), sendMutex(), transformer(NULL), hashCode() {} + scheduler(), closeSync(), sendMutex(), transformer(NULL), + hashCode() {} ~SessionConfig() {} }; @@ -1455,3 +1456,32 @@ Pointer ActiveMQSe int ActiveMQSessionKernel::getHashCode() const { return this->config->hashCode; } + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQSessionKernel::checkMessageListener() const { + + this->config->consumerLock.readLock().lock(); + try { + Pointer > > iter(this->config->consumers.iterator()); + while (iter->hasNext()) { + Pointer consumer = iter->next(); + if (consumer->getMessageListener() != NULL) { + throw cms::IllegalStateException( + "Cannot synchronously receive a message when a MessageListener is set"); + } + } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; + } +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQSessionKernel::sendAck(Pointer ack, bool async) { + if (async || this->connection->isSendAcksAsync() || this->isTransacted()) { + this->connection->oneway(ack); + } else { + this->connection->syncRequest(ack); + } +} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1463311&r1=1463310&r2=1463311&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Mon Apr 1 21:56:01 2013 @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -542,8 +543,31 @@ namespace kernels { */ bool iterateConsumers(); + /** + * Checks if any MessageConsumer owned by this Session has a set MessageListener + * and throws an exception if so. This enforces the rule that the MessageConsumers + * belonging to a Session either operate in sync or async receive as a group. + */ + void checkMessageListener() const; + + /** + * Returns a Hash Code for this Session based on its SessionId. + * + * @returns an int hash code based on the string balue of SessionId. + */ virtual int getHashCode() const; + /** + * Sends the given MessageAck command to the Broker either via Synchronous call or + * an Asynchronous call depending on the value of the async parameter. + * + * @param ack + * The MessageAck command to send. + * @param async + * True if the command can be sent asynchronously. + */ + void sendAck(decaf::lang::Pointer ack, bool async = false); + private: /** Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp Mon Apr 1 21:56:01 2013 @@ -18,6 +18,7 @@ #include "ConnectionAuditTest.h" #include +#include #include #include #include @@ -104,7 +105,7 @@ void ConnectionAuditTest::testIsDuplicat list.add(id); message->setMessageId(id); - CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message)); + CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher.get(), message)); } int index = list.size() -1 -audit.getAuditDepth(); @@ -112,7 +113,7 @@ void ConnectionAuditTest::testIsDuplicat Pointer id = list.get(index); message->setMessageId(id); CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), - audit.isDuplicate(dispatcher, message)); + audit.isDuplicate(dispatcher.get(), message)); } } @@ -140,7 +141,7 @@ void ConnectionAuditTest::testRollbackDu list.add(id); message->setMessageId(id); - CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message)); + CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher.get(), message)); } int index = list.size() -1 -audit.getAuditDepth(); @@ -148,9 +149,9 @@ void ConnectionAuditTest::testRollbackDu Pointer id = list.get(index); message->setMessageId(id); CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), - audit.isDuplicate(dispatcher, message)); - audit.rollbackDuplicate(dispatcher, message); + audit.isDuplicate(dispatcher.get(), message)); + audit.rollbackDuplicate(dispatcher.get(), message); CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(), - !audit.isDuplicate(dispatcher, message)); + !audit.isDuplicate(dispatcher.get(), message)); } }