Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 921DCFB24 for ; Fri, 22 Mar 2013 19:48:50 +0000 (UTC) Received: (qmail 31486 invoked by uid 500); 22 Mar 2013 19:48:50 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 31445 invoked by uid 500); 22 Mar 2013 19:48:50 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 31432 invoked by uid 99); 22 Mar 2013 19:48:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Mar 2013 19:48:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Mar 2013 19:48:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D488B23888EA; Fri, 22 Mar 2013 19:48:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1459956 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/state/ main/activemq/transport/failover/ main/decaf/util/ test/activemq/state/ Date: Fri, 22 Mar 2013 19:48:19 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130322194820.D488B23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Fri Mar 22 19:48:18 2013 New Revision: 1459956 URL: http://svn.apache.org/r1459956 Log: fix for: https://issues.apache.org/jira/browse/AMQCPP-470 Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp Fri Mar 22 19:48:18 2013 @@ -18,7 +18,11 @@ #include "ConnectionStateTracker.h" #include +#include +#include +#include #include +#include #include #include @@ -38,9 +42,108 @@ using namespace decaf::io; using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// +namespace decaf { +namespace util { + + template<> + struct HashCode : public HashCodeUnaryBase { + int operator()(const MessageId& arg) const { + return decaf::util::HashCode()(arg.toString()); + } + }; + +}} + +//////////////////////////////////////////////////////////////////////////////// namespace activemq { namespace state { + + class MessageCache : public LinkedHashMap, Pointer, HashCode< Pointer > > { + protected: + + ConnectionStateTracker* parent; + + public: + + int currentCacheSize; + + public: + + MessageCache(ConnectionStateTracker* parent) : + LinkedHashMap, Pointer >(), parent(parent), currentCacheSize(0) { + } + + virtual ~MessageCache() {} + + virtual bool removeEldestEntry(const MapEntry, Pointer >& eldest) { + bool result = currentCacheSize > parent->getMaxMessageCacheSize(); + if (result) { + Pointer message = eldest.getValue().dynamicCast(); + currentCacheSize -= message->getSize(); + } + return result; + } + }; + + class MessagePullCache : public LinkedHashMap > { + protected: + + ConnectionStateTracker* parent; + + public: + + MessagePullCache(ConnectionStateTracker* parent) : + LinkedHashMap >(), parent(parent) { + } + + virtual ~MessagePullCache() {} + + virtual bool removeEldestEntry(const MapEntry >& eldest AMQCPP_UNUSED) { + return size() > parent->getMaxMessagePullCacheSize(); + } + }; + + class StateTrackerImpl { + private: + + StateTrackerImpl(const StateTrackerImpl&); + StateTrackerImpl& operator= (const StateTrackerImpl&); + + public: + + /** Parent ConnectionStateTracker */ + ConnectionStateTracker* parent; + + /** Creates a unique marker for this state tracker */ + const Pointer TRACKED_RESPONSE_MARKER; + + /** Map holding the ConnectionStates, indexed by the ConnectionId */ + ConcurrentStlMap, Pointer, ConnectionId::COMPARATOR> connectionStates; + + /** Store Messages if trackMessages == true */ + MessageCache messageCache; + + /** Store MessagePull commands for replay */ + MessagePullCache messagePullCache; + + StateTrackerImpl(ConnectionStateTracker * parent) : parent(parent), + TRACKED_RESPONSE_MARKER(new Tracked()), + connectionStates(), + messageCache(parent), + messagePullCache(parent) { + } + + ~StateTrackerImpl() { + try { + connectionStates.clear(); + messageCache.clear(); + messagePullCache.clear(); + } + AMQ_CATCHALL_NOTHROW() + } + }; + class RemoveTransactionAction : public Runnable { private: @@ -62,7 +165,7 @@ namespace state { virtual void run() { Pointer connectionId = info->getConnectionId(); - Pointer cs = stateTracker->connectionStates.get(connectionId); + Pointer cs = stateTracker->impl->connectionStates.get(connectionId); Pointer txState = cs->removeTransactionState(info->getTransactionId()); if (txState != NULL) { txState->clear(); @@ -73,10 +176,7 @@ namespace state { }} //////////////////////////////////////////////////////////////////////////////// -ConnectionStateTracker::ConnectionStateTracker() : TRACKED_RESPONSE_MARKER( new Tracked() ), - connectionStates(), - messageCache(), - messagePullCache(), +ConnectionStateTracker::ConnectionStateTracker() : impl(new StateTrackerImpl(this)), trackTransactions(false), restoreSessions(true), restoreConsumers(true), @@ -84,12 +184,16 @@ ConnectionStateTracker::ConnectionStateT restoreTransaction(true), trackMessages(true), trackTransactionProducers(true), - maxCacheSize(128 * 1024), - currentCacheSize(0) { + maxMessageCacheSize(128 * 1024), + maxMessagePullCacheSize(10) { } //////////////////////////////////////////////////////////////////////////////// ConnectionStateTracker::~ConnectionStateTracker() { + try { + delete impl; + } + AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// @@ -117,13 +221,7 @@ void ConnectionStateTracker::trackBack(P if (trackMessages && command->isMessage()) { Pointer message = command.dynamicCast(); if (message->getTransactionId() == NULL) { - currentCacheSize = currentCacheSize + message->getSize(); - } - } else { - Pointer messagePull = command.dynamicCast(); - if (messagePull != NULL) { - // just needs to be a rough estimate of size, ~4 identifiers - currentCacheSize += 400; + this->impl->messageCache.currentCacheSize += message->getSize(); } } } @@ -138,7 +236,9 @@ void ConnectionStateTracker::restore(Poi try { - Pointer > > iterator(this->connectionStates.values().iterator()); + Pointer > > iterator( + this->impl->connectionStates.values().iterator()); + while (iterator->hasNext()) { Pointer state = iterator->next(); @@ -158,12 +258,12 @@ void ConnectionStateTracker::restore(Poi } // Now we flush messages - Pointer > > messages(this->messageCache.values().iterator()); + Pointer > > messages(this->impl->messageCache.values().iterator()); while (messages->hasNext()) { transport->oneway(messages->next()); } - Pointer > > messagePullIter(this->messagePullCache.values().iterator()); + Pointer > > messagePullIter(this->impl->messagePullCache.values().iterator()); while (messagePullIter->hasNext()) { transport->oneway(messagePullIter->next()); } @@ -261,7 +361,8 @@ void ConnectionStateTracker::doRestoreCo try { // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete - Pointer connectionState = connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId()); + Pointer connectionState = + this->impl->connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId()); bool connectionInterruptionProcessingComplete = connectionState->isConnectionInterruptProcessingComplete(); Pointer > > state(sessionState->getConsumerStates().iterator()); @@ -322,12 +423,12 @@ Pointer ConnectionStateTracker: try { if (info != NULL) { - Pointer cs = connectionStates.get(info->getConnectionId()); + Pointer cs = this->impl->connectionStates.get(info->getConnectionId()); if (cs != NULL && info->getDestination()->isTemporary()) { cs->addTempDestination(Pointer(info->cloneDataStructure())); } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -339,12 +440,12 @@ Pointer ConnectionStateTracker: try { if (info != NULL) { - Pointer cs = connectionStates.get(info->getConnectionId()); + Pointer cs = this->impl->connectionStates.get(info->getConnectionId()); if (cs != NULL && info->getDestination()->isTemporary()) { cs->removeTempDestination(info->getDestination()); } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -360,7 +461,7 @@ Pointer ConnectionStateTracker: if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { @@ -370,7 +471,7 @@ Pointer ConnectionStateTracker: } } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -386,7 +487,7 @@ Pointer ConnectionStateTracker: if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { @@ -396,7 +497,7 @@ Pointer ConnectionStateTracker: } } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -413,7 +514,7 @@ Pointer ConnectionStateTracker: if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { @@ -423,7 +524,7 @@ Pointer ConnectionStateTracker: } } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -439,7 +540,7 @@ Pointer ConnectionStateTracker: if (sessionId != NULL) { Pointer connectionId = sessionId->getParentId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer ss = cs->getSessionState(sessionId); if (ss != NULL) { @@ -449,7 +550,7 @@ Pointer ConnectionStateTracker: } } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -464,13 +565,13 @@ Pointer ConnectionStateTracker: if (info != NULL) { Pointer connectionId = info->getSessionId()->getParentId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { cs->addSession(Pointer(info->cloneDataStructure())); } } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -485,13 +586,13 @@ Pointer ConnectionStateTracker: if (id != NULL) { Pointer connectionId = id->getParentId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { cs->removeSession(Pointer(id->cloneDataStructure())); } } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -504,9 +605,10 @@ Pointer ConnectionStateTracker: try { if (info != NULL) { Pointer infoCopy(info->cloneDataStructure()); - connectionStates.put(info->getConnectionId(), Pointer(new ConnectionState(infoCopy))); + this->impl->connectionStates.put( + info->getConnectionId(), Pointer(new ConnectionState(infoCopy))); } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -518,10 +620,10 @@ Pointer ConnectionStateTracker: try { if (id != NULL) { - connectionStates.remove(Pointer(id->cloneDataStructure())); + this->impl->connectionStates.remove(Pointer(id->cloneDataStructure())); } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } AMQ_CATCH_RETHROW(ActiveMQException) AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) @@ -539,7 +641,7 @@ Pointer ConnectionStateTracker: Pointer connectionId = producerId->getParentId()->getParentId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(message->getTransactionId()); if (transactionState != NULL) { @@ -554,9 +656,10 @@ Pointer ConnectionStateTracker: } } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } else if (trackMessages) { - messageCache.put(message->getMessageId(), Pointer(message->cloneDataStructure())); + this->impl->messageCache.put( + message->getMessageId(), Pointer(message->cloneDataStructure())); } } @@ -575,7 +678,7 @@ Pointer ConnectionStateTracker: if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { cs->addTransactionState(info->getTransactionId()); Pointer transactionState = cs->getTransactionState(info->getTransactionId()); @@ -583,7 +686,7 @@ Pointer ConnectionStateTracker: } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } return Pointer(); @@ -601,7 +704,7 @@ Pointer ConnectionStateTracker: if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { @@ -610,7 +713,7 @@ Pointer ConnectionStateTracker: } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } return Pointer(); @@ -628,7 +731,7 @@ Pointer ConnectionStateTracker: if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { @@ -655,7 +758,7 @@ Pointer ConnectionStateTracker: if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { @@ -682,7 +785,7 @@ Pointer ConnectionStateTracker: if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { @@ -709,7 +812,7 @@ Pointer ConnectionStateTracker: if (trackTransactions && info != NULL) { Pointer connectionId = info->getConnectionId(); if (connectionId != NULL) { - Pointer cs = connectionStates.get(connectionId); + Pointer cs = this->impl->connectionStates.get(connectionId); if (cs != NULL) { Pointer transactionState = cs->getTransactionState(info->getTransactionId()); if (transactionState != NULL) { @@ -718,7 +821,7 @@ Pointer ConnectionStateTracker: } } - return TRACKED_RESPONSE_MARKER; + return this->impl->TRACKED_RESPONSE_MARKER; } return Pointer(); @@ -735,7 +838,7 @@ Pointer ConnectionStateTracker: if (pull != NULL && pull->getDestination() != NULL && pull->getConsumerId() != NULL) { std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString(); - messagePullCache.put(id, Pointer(pull->cloneDataStructure())); + this->impl->messagePullCache.put(id, Pointer(pull->cloneDataStructure())); } return Pointer(); @@ -748,7 +851,7 @@ Pointer ConnectionStateTracker: //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::connectionInterruptProcessingComplete(transport::Transport* transport, Pointer connectionId) { - Pointer connectionState = connectionStates.get(connectionId); + Pointer connectionState = this->impl->connectionStates.get(connectionId); if (connectionState != NULL) { @@ -779,7 +882,7 @@ void ConnectionStateTracker::connectionI //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTracker::transportInterrupted() { - Pointer > > state(this->connectionStates.values().iterator()); + Pointer > > state(this->impl->connectionStates.values().iterator()); while (state->hasNext()) { state->next()->setConnectionInterruptProcessingComplete(false); } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h Fri Mar 22 19:48:18 2013 @@ -30,32 +30,18 @@ #include #include -#include #include namespace activemq { namespace state { class RemoveTransactionAction; - using decaf::lang::Pointer; - using decaf::util::concurrent::ConcurrentStlMap; + class StateTrackerImpl; class AMQCPP_API ConnectionStateTracker: public CommandVisitorAdapter { private: - /** Creates a unique marker for this state tracker */ - const Pointer TRACKED_RESPONSE_MARKER; - - /** Map holding the ConnectionStates, indexed by the ConnectionId */ - ConcurrentStlMap, Pointer, ConnectionId::COMPARATOR> connectionStates; - - // TODO - The Map doesn't have a way to automatically remove the eldest Entry - // Either we need to implement something similar to LinkedHashMap or find - // some other way of tracking the eldest entry into the map and removing it - // if the cache size is exceeded. - ConcurrentStlMap, Pointer, MessageId::COMPARATOR> messageCache; - - ConcurrentStlMap > messagePullCache; + StateTrackerImpl* impl; bool trackTransactions; bool restoreSessions; @@ -64,8 +50,8 @@ namespace state { bool restoreTransaction; bool trackMessages; bool trackTransactionProducers; - int maxCacheSize; - int currentCacheSize; + int maxMessageCacheSize; + int maxMessagePullCacheSize; friend class RemoveTransactionAction; @@ -77,50 +63,50 @@ namespace state { Pointer track(Pointer command); - void trackBack(Pointer command); + void trackBack(decaf::lang::Pointer command); - void restore(Pointer transport); + void restore(decaf::lang::Pointer transport); void connectionInterruptProcessingComplete( - transport::Transport* transport, Pointer connectionId); + transport::Transport* transport, decaf::lang::Pointer connectionId); void transportInterrupted(); - virtual Pointer processDestinationInfo(DestinationInfo* info); + virtual decaf::lang::Pointer processDestinationInfo(DestinationInfo* info); - virtual Pointer processRemoveDestination(DestinationInfo* info); + virtual decaf::lang::Pointer processRemoveDestination(DestinationInfo* info); - virtual Pointer processProducerInfo(ProducerInfo* info); + virtual decaf::lang::Pointer processProducerInfo(ProducerInfo* info); - virtual Pointer processRemoveProducer(ProducerId* id); + virtual decaf::lang::Pointer processRemoveProducer(ProducerId* id); - virtual Pointer processConsumerInfo(ConsumerInfo* info); + virtual decaf::lang::Pointer processConsumerInfo(ConsumerInfo* info); - virtual Pointer processRemoveConsumer(ConsumerId* id); + virtual decaf::lang::Pointer processRemoveConsumer(ConsumerId* id); - virtual Pointer processSessionInfo(SessionInfo* info); + virtual decaf::lang::Pointer processSessionInfo(SessionInfo* info); - virtual Pointer processRemoveSession(SessionId* id); + virtual decaf::lang::Pointer processRemoveSession(SessionId* id); - virtual Pointer processConnectionInfo(ConnectionInfo* info); + virtual decaf::lang::Pointer processConnectionInfo(ConnectionInfo* info); - virtual Pointer processRemoveConnection(ConnectionId* id); + virtual decaf::lang::Pointer processRemoveConnection(ConnectionId* id); - virtual Pointer processMessage(Message* message); + virtual decaf::lang::Pointer processMessage(Message* message); - virtual Pointer processBeginTransaction(TransactionInfo* info); + virtual decaf::lang::Pointer processBeginTransaction(TransactionInfo* info); - virtual Pointer processPrepareTransaction(TransactionInfo* info); + virtual decaf::lang::Pointer processPrepareTransaction(TransactionInfo* info); - virtual Pointer processCommitTransactionOnePhase(TransactionInfo* info); + virtual decaf::lang::Pointer processCommitTransactionOnePhase(TransactionInfo* info); - virtual Pointer processCommitTransactionTwoPhase(TransactionInfo* info); + virtual decaf::lang::Pointer processCommitTransactionTwoPhase(TransactionInfo* info); - virtual Pointer processRollbackTransaction(TransactionInfo* info); + virtual decaf::lang::Pointer processRollbackTransaction(TransactionInfo* info); - virtual Pointer processEndTransaction(TransactionInfo* info); + virtual decaf::lang::Pointer processEndTransaction(TransactionInfo* info); - virtual Pointer processMessagePull(MessagePull* pull); + virtual decaf::lang::Pointer processMessagePull(MessagePull* pull); bool isRestoreConsumers() const { return this->restoreConsumers; @@ -170,12 +156,20 @@ namespace state { this->trackMessages = trackMessages; } - int getMaxCacheSize() const { - return this->maxCacheSize; + int getMaxMessageCacheSize() const { + return this->maxMessageCacheSize; + } + + void setMaxMessageCacheSize(int maxMessageCacheSize) { + this->maxMessageCacheSize = maxMessageCacheSize; + } + + int getMaxMessagePullCacheSize() const { + return this->maxMessagePullCacheSize; } - void setMaxCacheSize(int maxCacheSize) { - this->maxCacheSize = maxCacheSize; + void setMaxMessagePullCacheSize(int maxMessagePullCacheSize) { + this->maxMessagePullCacheSize = maxMessagePullCacheSize; } bool isTrackTransactionProducers() const { @@ -188,15 +182,20 @@ namespace state { private: - void doRestoreTransactions(Pointer transport, Pointer connectionState); + void doRestoreTransactions(decaf::lang::Pointer transport, + decaf::lang::Pointer connectionState); - void doRestoreSessions(Pointer transport, Pointer connectionState); + void doRestoreSessions(decaf::lang::Pointer transport, + decaf::lang::Pointer connectionState); - void doRestoreConsumers(Pointer transport, Pointer sessionState); + void doRestoreConsumers(decaf::lang::Pointer transport, + decaf::lang::Pointer sessionState); - void doRestoreProducers(Pointer transport, Pointer sessionState); + void doRestoreProducers(decaf::lang::Pointer transport, + decaf::lang::Pointer sessionState); - void doRestoreTempDestinations(Pointer transport, Pointer connectionState); + void doRestoreTempDestinations(decaf::lang::Pointer transport, + decaf::lang::Pointer connectionState); }; Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Fri Mar 22 19:48:18 2013 @@ -86,6 +86,7 @@ namespace failover { bool trackMessages; bool trackTransactionProducers; int maxCacheSize; + int maxPullCacheSize; bool connectionInterruptProcessingComplete; bool firstConnection; bool updateURIsSupported; @@ -134,6 +135,7 @@ namespace failover { trackMessages(false), trackTransactionProducers(true), maxCacheSize(128*1024), + maxPullCacheSize(10), connectionInterruptProcessingComplete(false), firstConnection(true), updateURIsSupported(true), @@ -560,7 +562,8 @@ void FailoverTransport::start() { } this->impl->taskRunner->start(); - stateTracker.setMaxCacheSize(this->getMaxCacheSize()); + stateTracker.setMaxMessageCacheSize(this->getMaxCacheSize()); + stateTracker.setMaxMessagePullCacheSize(this->getMaxPullCacheSize()); stateTracker.setTrackMessages(this->isTrackMessages()); stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers()); @@ -1256,6 +1259,16 @@ void FailoverTransport::setMaxCacheSize( } //////////////////////////////////////////////////////////////////////////////// +int FailoverTransport::getMaxPullCacheSize() const { + return this->impl->maxPullCacheSize; +} + +//////////////////////////////////////////////////////////////////////////////// +void FailoverTransport::setMaxPullCacheSize(int value) { + this->impl->maxPullCacheSize = value; +} + +//////////////////////////////////////////////////////////////////////////////// bool FailoverTransport::isReconnectSupported() const { return this->impl->reconnectSupported; } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Fri Mar 22 19:48:18 2013 @@ -212,6 +212,10 @@ namespace failover { void setMaxCacheSize(int value); + int getMaxPullCacheSize() const; + + void setMaxPullCacheSize(int value); + bool isReconnectSupported() const; void setReconnectSupported(bool value); Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Fri Mar 22 19:48:18 2013 @@ -102,6 +102,8 @@ Pointer FailoverTransportFact Boolean::parseBoolean(topLvlProperties.getProperty("trackMessages", "false"))); transport->setMaxCacheSize( Integer::parseInt(topLvlProperties.getProperty("maxCacheSize", "131072"))); + transport->setMaxPullCacheSize( + Integer::parseInt(topLvlProperties.getProperty("maxPullCacheSize", "10"))); transport->setUpdateURIsSupported( Boolean::parseBoolean(topLvlProperties.getProperty("updateURIsSupported", "true"))); transport->setPriorityBackup( Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h Fri Mar 22 19:48:18 2013 @@ -123,7 +123,7 @@ namespace util { protected: - virtual bool removeEldestEntry(const MapEntry& eldest) { + virtual bool removeEldestEntry(const MapEntry& eldest DECAF_UNUSED) { if (this->size() > maxCacheSize) { return true; } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h Fri Mar 22 19:48:18 2013 @@ -678,7 +678,7 @@ namespace util { * * @return true if the eldest member should be removed. */ - virtual bool removeEldestEntry(const MapEntry& eldest) { + virtual bool removeEldestEntry(const MapEntry& eldest DECAF_UNUSED) { return false; } @@ -691,7 +691,7 @@ namespace util { * @param eldest * The MapEntry value that is about to be removed from the Map. */ - virtual void onEviction(const MapEntry& eldest) {} + virtual void onEviction(const MapEntry& eldest DECAF_UNUSED) {} public: Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp Fri Mar 22 19:48:18 2013 @@ -17,56 +17,252 @@ #include "ConnectionStateTrackerTest.h" +#include +#include #include #include #include +#include +#include #include #include +#include #include +#include +#include using namespace std; using namespace activemq; using namespace activemq::state; using namespace activemq::commands; +using namespace activemq::transport; +using namespace activemq::wireformat; +using namespace decaf::util; using namespace decaf::lang; +using namespace decaf::lang::exceptions; + +//////////////////////////////////////////////////////////////////////////////// +namespace { + + class TrackingTransport : public activemq::transport::Transport { + public: + + LinkedList< Pointer > connections; + LinkedList< Pointer > sessions; + LinkedList< Pointer > producers; + LinkedList< Pointer > consumers; + LinkedList< Pointer > messages; + LinkedList< Pointer > messagePulls; + + public: + + virtual ~TrackingTransport() {} + + virtual void start() {} + + virtual void stop() {} + + virtual void close() {} + + virtual void oneway(const Pointer command) { + if (command->isConnectionInfo()) { + connections.add(command); + } else if (command->isSessionInfo()) { + sessions.add(command); + } else if (command->isProducerInfo()) { + producers.add(command); + } else if (command->isConsumerInfo()) { + consumers.add(command); + } else if (command->isMessage()) { + messages.add(command); + } else if (command->isMessagePull()) { + messagePulls.add(command); + } + } + + virtual Pointer asyncRequest(const Pointer command, + const Pointer responseCallback) { + throw UnsupportedOperationException(); + } + + virtual Pointer request(const Pointer command) { + throw UnsupportedOperationException(); + } + + virtual Pointer request(const Pointer command, unsigned int timeout) { + throw UnsupportedOperationException(); + } + + virtual Pointer getWireFormat() const { + return Pointer(); + } + + virtual void setWireFormat(const Pointer wireFormat) { + } + + virtual void setTransportListener(TransportListener* listener) { + } + + virtual TransportListener* getTransportListener() const { + return NULL; + } + + virtual Transport* narrow(const std::type_info& typeId) { + return NULL; + } + + virtual bool isFaultTolerant() const { + return false; + } + + virtual bool isConnected() const { + return true; + } + + virtual bool isClosed() const { + return false; + } + + virtual bool isReconnectSupported() const { + return false; + } + + virtual bool isUpdateURIsSupported() const { + return false; + } + + virtual std::string getRemoteAddress() const { + return ""; + } + + virtual void reconnect(const decaf::net::URI& uri) { + } + + virtual void updateURIs(bool rebalance, const decaf::util::List& uris) { + } + + }; + + class ConnectionData { + public: + + Pointer connection; + Pointer session; + Pointer consumer; + Pointer producer; + + }; + + ConnectionData createConnectionState(ConnectionStateTracker& tracker) { + + ConnectionData conn; + + Pointer connectionId(new ConnectionId); + connectionId->setValue("CONNECTION"); + conn.connection.reset(new ConnectionInfo); + conn.connection->setConnectionId(connectionId); + + Pointer session_id(new SessionId); + session_id->setConnectionId("CONNECTION"); + session_id->setValue(12345); + conn.session.reset(new SessionInfo); + conn.session->setSessionId(session_id); + + Pointer consumer_id(new ConsumerId); + consumer_id->setConnectionId("CONNECTION"); + consumer_id->setSessionId(12345); + consumer_id->setValue(42); + conn.consumer.reset(new ConsumerInfo); + conn.consumer->setConsumerId(consumer_id); + + Pointer producer_id(new ProducerId); + producer_id->setConnectionId("CONNECTION"); + producer_id->setSessionId(12345); + producer_id->setValue(42); + conn.producer.reset(new ProducerInfo); + conn.producer->setProducerId(producer_id); + + tracker.processConnectionInfo(conn.connection.get()); + tracker.processSessionInfo(conn.session.get()); + tracker.processConsumerInfo(conn.consumer.get()); + tracker.processProducerInfo(conn.producer.get()); + + return conn; + } + + void clearConnectionState(ConnectionStateTracker& tracker, ConnectionData& conn) { + tracker.processRemoveProducer(conn.producer->getProducerId().get()); + tracker.processRemoveConsumer(conn.consumer->getConsumerId().get()); + tracker.processRemoveSession(conn.session->getSessionId().get()); + tracker.processRemoveConnection(conn.connection->getConnectionId().get()); + } + +} //////////////////////////////////////////////////////////////////////////////// void ConnectionStateTrackerTest::test() { - Pointer conn_id( new ConnectionId ); - conn_id->setValue( "CONNECTION" ); - Pointer conn_info( new ConnectionInfo ); - conn_info->setConnectionId( conn_id ); - - Pointer session_id( new SessionId ); - session_id->setConnectionId( "CONNECTION" ); - session_id->setValue( 12345 ); - Pointer session_info( new SessionInfo ); - session_info->setSessionId( session_id ); - - Pointer consumer_id( new ConsumerId ); - consumer_id->setConnectionId( "CONNECTION" ); - consumer_id->setSessionId( 12345 ); - consumer_id->setValue( 42 ); - Pointer consumer_info( new ConsumerInfo ); - consumer_info->setConsumerId( consumer_id ); - - Pointer producer_id( new ProducerId ); - producer_id->setConnectionId( "CONNECTION" ); - producer_id->setSessionId( 12345 ); - producer_id->setValue( 42 ); - Pointer producer_info( new ProducerInfo ); - producer_info->setProducerId( producer_id ); + ConnectionStateTracker tracker; + ConnectionData conn = createConnectionState(tracker); + clearConnectionState(tracker, conn); +} + +//////////////////////////////////////////////////////////////////////////////// +void ConnectionStateTrackerTest::testMessageCache() { + + Pointer transport(new TrackingTransport); + ConnectionStateTracker tracker; + tracker.setTrackMessages(true); + + ConnectionData conn = createConnectionState(tracker); + int messageSize; + { + decaf::lang::Pointer id(new commands::MessageId()); + id->setProducerId(conn.producer->getProducerId()); + Pointer message(new Message); + messageSize = message->getSize(); + } + + tracker.setMaxMessageCacheSize(messageSize * 3); + + int sequenceId = 1; + + for (int i = 0; i < 100; ++i) { + decaf::lang::Pointer id(new commands::MessageId()); + id->setProducerId(conn.producer->getProducerId()); + id->setProducerSequenceId(sequenceId++); + Pointer message(new Message); + message->setMessageId(id); + + tracker.processMessage(message.get()); + tracker.trackBack(message); + } + + tracker.restore(transport); + + CPPUNIT_ASSERT_EQUAL_MESSAGE("Should only be three messages", 4, transport->messages.size()); +} + +//////////////////////////////////////////////////////////////////////////////// +void ConnectionStateTrackerTest::testMessagePullCache() { + + Pointer transport(new TrackingTransport); ConnectionStateTracker tracker; - tracker.processConnectionInfo( conn_info.get() ); - tracker.processSessionInfo( session_info.get() ); - tracker.processConsumerInfo( consumer_info.get() ); - tracker.processProducerInfo( producer_info.get() ); - - tracker.processRemoveProducer( producer_id.get() ); - tracker.processRemoveConsumer( consumer_id.get() ); - tracker.processRemoveSession( session_id.get() ); - tracker.processRemoveConnection( conn_id.get() ); + tracker.setTrackMessages(true); + + ConnectionData conn = createConnectionState(tracker); + + for (int i = 0; i < 100; ++i) { + Pointer pull(new commands::MessagePull()); + Pointer destination(new ActiveMQTopic("TEST" + Integer::toString(i))); + pull->setConsumerId(conn.consumer->getConsumerId()); + pull->setDestination(destination); + tracker.processMessagePull(pull.get()); + tracker.trackBack(pull); + } + + tracker.restore(transport); + CPPUNIT_ASSERT_EQUAL_MESSAGE("Should only be three message pulls", 10, transport->messagePulls.size()); } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h?rev=1459956&r1=1459955&r2=1459956&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h Fri Mar 22 19:48:18 2013 @@ -28,6 +28,8 @@ namespace state { CPPUNIT_TEST_SUITE( ConnectionStateTrackerTest ); CPPUNIT_TEST( test ); + CPPUNIT_TEST( testMessageCache ); + CPPUNIT_TEST( testMessagePullCache ); CPPUNIT_TEST_SUITE_END(); public: @@ -36,6 +38,9 @@ namespace state { virtual ~ConnectionStateTrackerTest() {} void test(); + void testMessageCache(); + void testMessagePullCache(); + }; }}