activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1459956 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/state/ main/activemq/transport/failover/ main/decaf/util/ test/activemq/state/
Date Fri, 22 Mar 2013 19:48:19 GMT
Author: tabish
Date: Fri Mar 22 19:48:18 2013
New Revision: 1459956

URL: http://svn.apache.org/r1459956
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-470

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp Fri Mar 22 19:48:18 2013
@@ -18,7 +18,11 @@
 #include "ConnectionStateTracker.h"
 
 #include <decaf/lang/Runnable.h>
+#include <decaf/util/HashCode.h>
+#include <decaf/util/LinkedHashMap.h>
+#include <decaf/util/MapEntry.h>
 #include <decaf/util/NoSuchElementException.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
 
 #include <activemq/commands/ConsumerControl.h>
 #include <activemq/commands/ExceptionResponse.h>
@@ -38,9 +42,108 @@ using namespace decaf::io;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace decaf {
+namespace util {
+
+    template<>
+    struct HashCode<MessageId> : public HashCodeUnaryBase<const MessageId&> {
+        int operator()(const MessageId& arg) const {
+            return decaf::util::HashCode<std::string>()(arg.toString());
+        }
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
 namespace activemq {
 namespace state {
 
+
+    class MessageCache : public LinkedHashMap<Pointer<MessageId>, Pointer<Command>, HashCode< Pointer<MessageId> > > {
+    protected:
+
+        ConnectionStateTracker* parent;
+
+    public:
+
+        int currentCacheSize;
+
+    public:
+
+        MessageCache(ConnectionStateTracker* parent) :
+            LinkedHashMap<Pointer<MessageId>, Pointer<Command> >(), parent(parent), currentCacheSize(0) {
+        }
+
+        virtual ~MessageCache() {}
+
+        virtual bool removeEldestEntry(const MapEntry<Pointer<MessageId>, Pointer<Command> >& eldest) {
+            bool result = currentCacheSize > parent->getMaxMessageCacheSize();
+            if (result) {
+                Pointer<Message> message = eldest.getValue().dynamicCast<Message>();
+                currentCacheSize -= message->getSize();
+            }
+            return result;
+        }
+    };
+
+    class MessagePullCache : public LinkedHashMap<std::string, Pointer<Command> > {
+    protected:
+
+        ConnectionStateTracker* parent;
+
+    public:
+
+        MessagePullCache(ConnectionStateTracker* parent) :
+            LinkedHashMap<std::string, Pointer<Command> >(), parent(parent) {
+        }
+
+        virtual ~MessagePullCache() {}
+
+        virtual bool removeEldestEntry(const MapEntry<std::string, Pointer<Command> >& eldest AMQCPP_UNUSED) {
+            return size() > parent->getMaxMessagePullCacheSize();
+        }
+    };
+
+    class StateTrackerImpl {
+    private:
+
+        StateTrackerImpl(const StateTrackerImpl&);
+        StateTrackerImpl& operator= (const StateTrackerImpl&);
+
+    public:
+
+        /** Parent ConnectionStateTracker */
+        ConnectionStateTracker* parent;
+
+        /** Creates a unique marker for this state tracker */
+        const Pointer<Tracked> TRACKED_RESPONSE_MARKER;
+
+        /** Map holding the ConnectionStates, indexed by the ConnectionId */
+        ConcurrentStlMap<Pointer<ConnectionId>, Pointer<ConnectionState>, ConnectionId::COMPARATOR> connectionStates;
+
+        /** Store Messages if trackMessages == true */
+        MessageCache messageCache;
+
+        /** Store MessagePull commands for replay */
+        MessagePullCache messagePullCache;
+
+        StateTrackerImpl(ConnectionStateTracker * parent) : parent(parent),
+                                                            TRACKED_RESPONSE_MARKER(new Tracked()),
+                                                            connectionStates(),
+                                                            messageCache(parent),
+                                                            messagePullCache(parent) {
+        }
+
+        ~StateTrackerImpl() {
+            try {
+                connectionStates.clear();
+                messageCache.clear();
+                messagePullCache.clear();
+            }
+            AMQ_CATCHALL_NOTHROW()
+        }
+    };
+
     class RemoveTransactionAction : public Runnable {
     private:
 
@@ -62,7 +165,7 @@ namespace state {
 
         virtual void run() {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
-            Pointer<ConnectionState> cs = stateTracker->connectionStates.get(connectionId);
+            Pointer<ConnectionState> cs = stateTracker->impl->connectionStates.get(connectionId);
             Pointer<TransactionState> txState = cs->removeTransactionState(info->getTransactionId());
             if (txState != NULL) {
                 txState->clear();
@@ -73,10 +176,7 @@ namespace state {
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ConnectionStateTracker::ConnectionStateTracker() : TRACKED_RESPONSE_MARKER( new Tracked() ),
-                                                   connectionStates(),
-                                                   messageCache(),
-                                                   messagePullCache(),
+ConnectionStateTracker::ConnectionStateTracker() : impl(new StateTrackerImpl(this)),
                                                    trackTransactions(false),
                                                    restoreSessions(true),
                                                    restoreConsumers(true),
@@ -84,12 +184,16 @@ ConnectionStateTracker::ConnectionStateT
                                                    restoreTransaction(true),
                                                    trackMessages(true),
                                                    trackTransactionProducers(true),
-                                                   maxCacheSize(128 * 1024),
-                                                   currentCacheSize(0) {
+                                                   maxMessageCacheSize(128 * 1024),
+                                                   maxMessagePullCacheSize(10) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ConnectionStateTracker::~ConnectionStateTracker() {
+    try {
+        delete impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -117,13 +221,7 @@ void ConnectionStateTracker::trackBack(P
             if (trackMessages && command->isMessage()) {
                 Pointer<Message> message = command.dynamicCast<Message>();
                 if (message->getTransactionId() == NULL) {
-                    currentCacheSize = currentCacheSize + message->getSize();
-                }
-            } else {
-                Pointer<MessagePull> messagePull = command.dynamicCast<MessagePull>();
-                if (messagePull != NULL) {
-                    // just needs to be a rough estimate of size, ~4 identifiers
-                    currentCacheSize += 400;
+                    this->impl->messageCache.currentCacheSize += message->getSize();
                 }
             }
         }
@@ -138,7 +236,9 @@ void ConnectionStateTracker::restore(Poi
 
     try {
 
-        Pointer<Iterator<Pointer<ConnectionState> > > iterator(this->connectionStates.values().iterator());
+        Pointer<Iterator<Pointer<ConnectionState> > > iterator(
+            this->impl->connectionStates.values().iterator());
+
         while (iterator->hasNext()) {
             Pointer<ConnectionState> state = iterator->next();
 
@@ -158,12 +258,12 @@ void ConnectionStateTracker::restore(Poi
         }
 
         // Now we flush messages
-        Pointer<Iterator<Pointer<Command> > > messages(this->messageCache.values().iterator());
+        Pointer<Iterator<Pointer<Command> > > messages(this->impl->messageCache.values().iterator());
         while (messages->hasNext()) {
             transport->oneway(messages->next());
         }
 
-        Pointer<Iterator<Pointer<Command> > > messagePullIter(this->messagePullCache.values().iterator());
+        Pointer<Iterator<Pointer<Command> > > messagePullIter(this->impl->messagePullCache.values().iterator());
         while (messagePullIter->hasNext()) {
             transport->oneway(messagePullIter->next());
         }
@@ -261,7 +361,8 @@ void ConnectionStateTracker::doRestoreCo
     try {
 
         // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
-        Pointer<ConnectionState> connectionState = connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId());
+        Pointer<ConnectionState> connectionState =
+            this->impl->connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId());
         bool connectionInterruptionProcessingComplete = connectionState->isConnectionInterruptProcessingComplete();
 
         Pointer<Iterator<Pointer<ConsumerState> > > state(sessionState->getConsumerStates().iterator());
@@ -322,12 +423,12 @@ Pointer<Command> ConnectionStateTracker:
 
     try {
         if (info != NULL) {
-            Pointer<ConnectionState> cs = connectionStates.get(info->getConnectionId());
+            Pointer<ConnectionState> cs = this->impl->connectionStates.get(info->getConnectionId());
             if (cs != NULL && info->getDestination()->isTemporary()) {
                 cs->addTempDestination(Pointer<DestinationInfo>(info->cloneDataStructure()));
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -339,12 +440,12 @@ Pointer<Command> ConnectionStateTracker:
 
     try {
         if (info != NULL) {
-            Pointer<ConnectionState> cs = connectionStates.get(info->getConnectionId());
+            Pointer<ConnectionState> cs = this->impl->connectionStates.get(info->getConnectionId());
             if (cs != NULL && info->getDestination()->isTemporary()) {
                 cs->removeTempDestination(info->getDestination());
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -360,7 +461,7 @@ Pointer<Command> ConnectionStateTracker:
             if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
                 if (connectionId != NULL) {
-                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                     if (cs != NULL) {
                         Pointer<SessionState> ss = cs->getSessionState(sessionId);
                         if (ss != NULL) {
@@ -370,7 +471,7 @@ Pointer<Command> ConnectionStateTracker:
                 }
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -386,7 +487,7 @@ Pointer<Command> ConnectionStateTracker:
             if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
                 if (connectionId != NULL) {
-                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                     if (cs != NULL) {
                         Pointer<SessionState> ss = cs->getSessionState(sessionId);
                         if (ss != NULL) {
@@ -396,7 +497,7 @@ Pointer<Command> ConnectionStateTracker:
                 }
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -413,7 +514,7 @@ Pointer<Command> ConnectionStateTracker:
             if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
                 if (connectionId != NULL) {
-                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                     if (cs != NULL) {
                         Pointer<SessionState> ss = cs->getSessionState(sessionId);
                         if (ss != NULL) {
@@ -423,7 +524,7 @@ Pointer<Command> ConnectionStateTracker:
                 }
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -439,7 +540,7 @@ Pointer<Command> ConnectionStateTracker:
             if (sessionId != NULL) {
                 Pointer<ConnectionId> connectionId = sessionId->getParentId();
                 if (connectionId != NULL) {
-                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                     if (cs != NULL) {
                         Pointer<SessionState> ss = cs->getSessionState(sessionId);
                         if (ss != NULL) {
@@ -449,7 +550,7 @@ Pointer<Command> ConnectionStateTracker:
                 }
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -464,13 +565,13 @@ Pointer<Command> ConnectionStateTracker:
         if (info != NULL) {
             Pointer<ConnectionId> connectionId = info->getSessionId()->getParentId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     cs->addSession(Pointer<SessionInfo>(info->cloneDataStructure()));
                 }
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -485,13 +586,13 @@ Pointer<Command> ConnectionStateTracker:
         if (id != NULL) {
             Pointer<ConnectionId> connectionId = id->getParentId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     cs->removeSession(Pointer<SessionId>(id->cloneDataStructure()));
                 }
             }
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -504,9 +605,10 @@ Pointer<Command> ConnectionStateTracker:
     try {
         if (info != NULL) {
             Pointer<ConnectionInfo> infoCopy(info->cloneDataStructure());
-            connectionStates.put(info->getConnectionId(), Pointer<ConnectionState>(new ConnectionState(infoCopy)));
+            this->impl->connectionStates.put(
+                info->getConnectionId(), Pointer<ConnectionState>(new ConnectionState(infoCopy)));
         }
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -518,10 +620,10 @@ Pointer<Command> ConnectionStateTracker:
 
     try {
         if (id != NULL) {
-            connectionStates.remove(Pointer<ConnectionId>(id->cloneDataStructure()));
+            this->impl->connectionStates.remove(Pointer<ConnectionId>(id->cloneDataStructure()));
         }
 
-        return TRACKED_RESPONSE_MARKER;
+        return this->impl->TRACKED_RESPONSE_MARKER;
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -539,7 +641,7 @@ Pointer<Command> ConnectionStateTracker:
                 Pointer<ConnectionId> connectionId = producerId->getParentId()->getParentId();
 
                 if (connectionId != NULL) {
-                    Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                    Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                     if (cs != NULL) {
                         Pointer<TransactionState> transactionState = cs->getTransactionState(message->getTransactionId());
                         if (transactionState != NULL) {
@@ -554,9 +656,10 @@ Pointer<Command> ConnectionStateTracker:
                         }
                     }
                 }
-                return TRACKED_RESPONSE_MARKER;
+                return this->impl->TRACKED_RESPONSE_MARKER;
             } else if (trackMessages) {
-                messageCache.put(message->getMessageId(), Pointer<Message>(message->cloneDataStructure()));
+                this->impl->messageCache.put(
+                    message->getMessageId(), Pointer<Message>(message->cloneDataStructure()));
             }
         }
 
@@ -575,7 +678,7 @@ Pointer<Command> ConnectionStateTracker:
         if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     cs->addTransactionState(info->getTransactionId());
                     Pointer<TransactionState> transactionState = cs->getTransactionState(info->getTransactionId());
@@ -583,7 +686,7 @@ Pointer<Command> ConnectionStateTracker:
                 }
             }
 
-            return TRACKED_RESPONSE_MARKER;
+            return this->impl->TRACKED_RESPONSE_MARKER;
         }
 
         return Pointer<Response>();
@@ -601,7 +704,7 @@ Pointer<Command> ConnectionStateTracker:
         if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     Pointer<TransactionState> transactionState = cs->getTransactionState(info->getTransactionId());
                     if (transactionState != NULL) {
@@ -610,7 +713,7 @@ Pointer<Command> ConnectionStateTracker:
                 }
             }
 
-            return TRACKED_RESPONSE_MARKER;
+            return this->impl->TRACKED_RESPONSE_MARKER;
         }
 
         return Pointer<Response>();
@@ -628,7 +731,7 @@ Pointer<Command> ConnectionStateTracker:
         if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     Pointer<TransactionState> transactionState = cs->getTransactionState(info->getTransactionId());
                     if (transactionState != NULL) {
@@ -655,7 +758,7 @@ Pointer<Command> ConnectionStateTracker:
         if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     Pointer<TransactionState> transactionState = cs->getTransactionState(info->getTransactionId());
                     if (transactionState != NULL) {
@@ -682,7 +785,7 @@ Pointer<Command> ConnectionStateTracker:
         if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     Pointer<TransactionState> transactionState = cs->getTransactionState(info->getTransactionId());
                     if (transactionState != NULL) {
@@ -709,7 +812,7 @@ Pointer<Command> ConnectionStateTracker:
         if (trackTransactions && info != NULL) {
             Pointer<ConnectionId> connectionId = info->getConnectionId();
             if (connectionId != NULL) {
-                Pointer<ConnectionState> cs = connectionStates.get(connectionId);
+                Pointer<ConnectionState> cs = this->impl->connectionStates.get(connectionId);
                 if (cs != NULL) {
                     Pointer<TransactionState> transactionState = cs->getTransactionState(info->getTransactionId());
                     if (transactionState != NULL) {
@@ -718,7 +821,7 @@ Pointer<Command> ConnectionStateTracker:
                 }
             }
 
-            return TRACKED_RESPONSE_MARKER;
+            return this->impl->TRACKED_RESPONSE_MARKER;
         }
 
         return Pointer<Response>();
@@ -735,7 +838,7 @@ Pointer<Command> ConnectionStateTracker:
 
         if (pull != NULL && pull->getDestination() != NULL && pull->getConsumerId() != NULL) {
             std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString();
-            messagePullCache.put(id, Pointer<Command>(pull->cloneDataStructure()));
+            this->impl->messagePullCache.put(id, Pointer<Command>(pull->cloneDataStructure()));
         }
 
         return Pointer<Command>();
@@ -748,7 +851,7 @@ Pointer<Command> ConnectionStateTracker:
 ////////////////////////////////////////////////////////////////////////////////
 void ConnectionStateTracker::connectionInterruptProcessingComplete(transport::Transport* transport, Pointer<ConnectionId> connectionId) {
 
-    Pointer<ConnectionState> connectionState = connectionStates.get(connectionId);
+    Pointer<ConnectionState> connectionState = this->impl->connectionStates.get(connectionId);
 
     if (connectionState != NULL) {
 
@@ -779,7 +882,7 @@ void ConnectionStateTracker::connectionI
 ////////////////////////////////////////////////////////////////////////////////
 void ConnectionStateTracker::transportInterrupted() {
 
-    Pointer<Iterator<Pointer<ConnectionState> > > state(this->connectionStates.values().iterator());
+    Pointer<Iterator<Pointer<ConnectionState> > > state(this->impl->connectionStates.values().iterator());
     while (state->hasNext()) {
         state->next()->setConnectionInterruptProcessingComplete(false);
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h Fri Mar 22 19:48:18 2013
@@ -30,32 +30,18 @@
 #include <activemq/state/Tracked.h>
 #include <activemq/transport/Transport.h>
 
-#include <decaf/util/concurrent/ConcurrentStlMap.h>
 #include <decaf/lang/Pointer.h>
 
 namespace activemq {
 namespace state {
 
     class RemoveTransactionAction;
-    using decaf::lang::Pointer;
-    using decaf::util::concurrent::ConcurrentStlMap;
+    class StateTrackerImpl;
 
     class AMQCPP_API ConnectionStateTracker: public CommandVisitorAdapter {
     private:
 
-        /** Creates a unique marker for this state tracker */
-        const Pointer<Tracked> TRACKED_RESPONSE_MARKER;
-
-        /** Map holding the ConnectionStates, indexed by the ConnectionId */
-        ConcurrentStlMap<Pointer<ConnectionId>, Pointer<ConnectionState>, ConnectionId::COMPARATOR> connectionStates;
-
-        // TODO - The Map doesn't have a way to automatically remove the eldest Entry
-        //        Either we need to implement something similar to LinkedHashMap or find
-        //        some other way of tracking the eldest entry into the map and removing it
-        //        if the cache size is exceeded.
-        ConcurrentStlMap<Pointer<MessageId>, Pointer<Command>, MessageId::COMPARATOR> messageCache;
-
-        ConcurrentStlMap<std::string, Pointer<Command> > messagePullCache;
+        StateTrackerImpl* impl;
 
         bool trackTransactions;
         bool restoreSessions;
@@ -64,8 +50,8 @@ namespace state {
         bool restoreTransaction;
         bool trackMessages;
         bool trackTransactionProducers;
-        int maxCacheSize;
-        int currentCacheSize;
+        int maxMessageCacheSize;
+        int maxMessagePullCacheSize;
 
         friend class RemoveTransactionAction;
 
@@ -77,50 +63,50 @@ namespace state {
 
         Pointer<Tracked> track(Pointer<Command> command);
 
-        void trackBack(Pointer<Command> command);
+        void trackBack(decaf::lang::Pointer<Command> command);
 
-        void restore(Pointer<transport::Transport> transport);
+        void restore(decaf::lang::Pointer<transport::Transport> transport);
 
         void connectionInterruptProcessingComplete(
-            transport::Transport* transport, Pointer<ConnectionId> connectionId);
+            transport::Transport* transport, decaf::lang::Pointer<ConnectionId> connectionId);
 
         void transportInterrupted();
 
-        virtual Pointer<Command> processDestinationInfo(DestinationInfo* info);
+        virtual decaf::lang::Pointer<Command> processDestinationInfo(DestinationInfo* info);
 
-        virtual Pointer<Command> processRemoveDestination(DestinationInfo* info);
+        virtual decaf::lang::Pointer<Command> processRemoveDestination(DestinationInfo* info);
 
-        virtual Pointer<Command> processProducerInfo(ProducerInfo* info);
+        virtual decaf::lang::Pointer<Command> processProducerInfo(ProducerInfo* info);
 
-        virtual Pointer<Command> processRemoveProducer(ProducerId* id);
+        virtual decaf::lang::Pointer<Command> processRemoveProducer(ProducerId* id);
 
-        virtual Pointer<Command> processConsumerInfo(ConsumerInfo* info);
+        virtual decaf::lang::Pointer<Command> processConsumerInfo(ConsumerInfo* info);
 
-        virtual Pointer<Command> processRemoveConsumer(ConsumerId* id);
+        virtual decaf::lang::Pointer<Command> processRemoveConsumer(ConsumerId* id);
 
-        virtual Pointer<Command> processSessionInfo(SessionInfo* info);
+        virtual decaf::lang::Pointer<Command> processSessionInfo(SessionInfo* info);
 
-        virtual Pointer<Command> processRemoveSession(SessionId* id);
+        virtual decaf::lang::Pointer<Command> processRemoveSession(SessionId* id);
 
-        virtual Pointer<Command> processConnectionInfo(ConnectionInfo* info);
+        virtual decaf::lang::Pointer<Command> processConnectionInfo(ConnectionInfo* info);
 
-        virtual Pointer<Command> processRemoveConnection(ConnectionId* id);
+        virtual decaf::lang::Pointer<Command> processRemoveConnection(ConnectionId* id);
 
-        virtual Pointer<Command> processMessage(Message* message);
+        virtual decaf::lang::Pointer<Command> processMessage(Message* message);
 
-        virtual Pointer<Command> processBeginTransaction(TransactionInfo* info);
+        virtual decaf::lang::Pointer<Command> processBeginTransaction(TransactionInfo* info);
 
-        virtual Pointer<Command> processPrepareTransaction(TransactionInfo* info);
+        virtual decaf::lang::Pointer<Command> processPrepareTransaction(TransactionInfo* info);
 
-        virtual Pointer<Command> processCommitTransactionOnePhase(TransactionInfo* info);
+        virtual decaf::lang::Pointer<Command> processCommitTransactionOnePhase(TransactionInfo* info);
 
-        virtual Pointer<Command> processCommitTransactionTwoPhase(TransactionInfo* info);
+        virtual decaf::lang::Pointer<Command> processCommitTransactionTwoPhase(TransactionInfo* info);
 
-        virtual Pointer<Command> processRollbackTransaction(TransactionInfo* info);
+        virtual decaf::lang::Pointer<Command> processRollbackTransaction(TransactionInfo* info);
 
-        virtual Pointer<Command> processEndTransaction(TransactionInfo* info);
+        virtual decaf::lang::Pointer<Command> processEndTransaction(TransactionInfo* info);
 
-        virtual Pointer<Command> processMessagePull(MessagePull* pull);
+        virtual decaf::lang::Pointer<Command> processMessagePull(MessagePull* pull);
 
         bool isRestoreConsumers() const {
             return this->restoreConsumers;
@@ -170,12 +156,20 @@ namespace state {
             this->trackMessages = trackMessages;
         }
 
-        int getMaxCacheSize() const {
-            return this->maxCacheSize;
+        int getMaxMessageCacheSize() const {
+            return this->maxMessageCacheSize;
+        }
+
+        void setMaxMessageCacheSize(int maxMessageCacheSize) {
+            this->maxMessageCacheSize = maxMessageCacheSize;
+        }
+
+        int getMaxMessagePullCacheSize() const {
+            return this->maxMessagePullCacheSize;
         }
 
-        void setMaxCacheSize(int maxCacheSize) {
-            this->maxCacheSize = maxCacheSize;
+        void setMaxMessagePullCacheSize(int maxMessagePullCacheSize) {
+            this->maxMessagePullCacheSize = maxMessagePullCacheSize;
         }
 
         bool isTrackTransactionProducers() const {
@@ -188,15 +182,20 @@ namespace state {
 
     private:
 
-        void doRestoreTransactions(Pointer<transport::Transport> transport, Pointer<ConnectionState> connectionState);
+        void doRestoreTransactions(decaf::lang::Pointer<transport::Transport> transport,
+                                   decaf::lang::Pointer<ConnectionState> connectionState);
 
-        void doRestoreSessions(Pointer<transport::Transport> transport, Pointer<ConnectionState> connectionState);
+        void doRestoreSessions(decaf::lang::Pointer<transport::Transport> transport,
+                               decaf::lang::Pointer<ConnectionState> connectionState);
 
-        void doRestoreConsumers(Pointer<transport::Transport> transport, Pointer<SessionState> sessionState);
+        void doRestoreConsumers(decaf::lang::Pointer<transport::Transport> transport,
+                                decaf::lang::Pointer<SessionState> sessionState);
 
-        void doRestoreProducers(Pointer<transport::Transport> transport, Pointer<SessionState> sessionState);
+        void doRestoreProducers(decaf::lang::Pointer<transport::Transport> transport,
+                                decaf::lang::Pointer<SessionState> sessionState);
 
-        void doRestoreTempDestinations(Pointer<transport::Transport> transport, Pointer<ConnectionState> connectionState);
+        void doRestoreTempDestinations(decaf::lang::Pointer<transport::Transport> transport,
+                                       decaf::lang::Pointer<ConnectionState> connectionState);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Fri Mar 22 19:48:18 2013
@@ -86,6 +86,7 @@ namespace failover {
         bool trackMessages;
         bool trackTransactionProducers;
         int maxCacheSize;
+        int maxPullCacheSize;
         bool connectionInterruptProcessingComplete;
         bool firstConnection;
         bool updateURIsSupported;
@@ -134,6 +135,7 @@ namespace failover {
             trackMessages(false),
             trackTransactionProducers(true),
             maxCacheSize(128*1024),
+            maxPullCacheSize(10),
             connectionInterruptProcessingComplete(false),
             firstConnection(true),
             updateURIsSupported(true),
@@ -560,7 +562,8 @@ void FailoverTransport::start() {
             }
             this->impl->taskRunner->start();
 
-            stateTracker.setMaxCacheSize(this->getMaxCacheSize());
+            stateTracker.setMaxMessageCacheSize(this->getMaxCacheSize());
+            stateTracker.setMaxMessagePullCacheSize(this->getMaxPullCacheSize());
             stateTracker.setTrackMessages(this->isTrackMessages());
             stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
 
@@ -1256,6 +1259,16 @@ void FailoverTransport::setMaxCacheSize(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getMaxPullCacheSize() const {
+    return this->impl->maxPullCacheSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxPullCacheSize(int value) {
+    this->impl->maxPullCacheSize = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isReconnectSupported() const {
     return this->impl->reconnectSupported;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Fri Mar 22 19:48:18 2013
@@ -212,6 +212,10 @@ namespace failover {
 
         void setMaxCacheSize(int value);
 
+        int getMaxPullCacheSize() const;
+
+        void setMaxPullCacheSize(int value);
+
         bool isReconnectSupported() const;
 
         void setReconnectSupported(bool value);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Fri Mar 22 19:48:18 2013
@@ -102,6 +102,8 @@ Pointer<Transport> FailoverTransportFact
             Boolean::parseBoolean(topLvlProperties.getProperty("trackMessages", "false")));
         transport->setMaxCacheSize(
             Integer::parseInt(topLvlProperties.getProperty("maxCacheSize", "131072")));
+        transport->setMaxPullCacheSize(
+            Integer::parseInt(topLvlProperties.getProperty("maxPullCacheSize", "10")));
         transport->setUpdateURIsSupported(
             Boolean::parseBoolean(topLvlProperties.getProperty("updateURIsSupported", "true")));
         transport->setPriorityBackup(

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h Fri Mar 22 19:48:18 2013
@@ -123,7 +123,7 @@ namespace util {
 
     protected:
 
-        virtual bool removeEldestEntry(const MapEntry<K, V>& eldest) {
+        virtual bool removeEldestEntry(const MapEntry<K, V>& eldest DECAF_UNUSED) {
             if (this->size() > maxCacheSize) {
                 return true;
             }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h Fri Mar 22 19:48:18 2013
@@ -678,7 +678,7 @@ namespace util {
          *
          * @return true if the eldest member should be removed.
          */
-        virtual bool removeEldestEntry(const MapEntry<K, V>& eldest) {
+        virtual bool removeEldestEntry(const MapEntry<K, V>& eldest DECAF_UNUSED) {
             return false;
         }
 
@@ -691,7 +691,7 @@ namespace util {
          * @param eldest
          *      The MapEntry value that is about to be removed from the Map.
          */
-        virtual void onEviction(const MapEntry<K, V>& eldest) {}
+        virtual void onEviction(const MapEntry<K, V>& eldest DECAF_UNUSED) {}
 
     public:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp Fri Mar 22 19:48:18 2013
@@ -17,56 +17,252 @@
 
 #include "ConnectionStateTrackerTest.h"
 
+#include <activemq/transport/Transport.h>
+#include <activemq/wireformat/WireFormat.h>
 #include <activemq/state/ConnectionStateTracker.h>
 #include <activemq/state/ConsumerState.h>
 #include <activemq/state/SessionState.h>
+#include <activemq/commands/ActiveMQTopic.h>
+#include <activemq/commands/Message.h>
 #include <activemq/commands/ConnectionInfo.h>
 #include <activemq/commands/SessionInfo.h>
+#include <activemq/commands/Message.h>
 #include <decaf/lang/Pointer.h>
+#include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <decaf/util/LinkedList.h>
 
 using namespace std;
 using namespace activemq;
 using namespace activemq::state;
 using namespace activemq::commands;
+using namespace activemq::transport;
+using namespace activemq::wireformat;
+using namespace decaf::util;
 using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class TrackingTransport : public activemq::transport::Transport {
+    public:
+
+        LinkedList< Pointer<Command> > connections;
+        LinkedList< Pointer<Command> > sessions;
+        LinkedList< Pointer<Command> > producers;
+        LinkedList< Pointer<Command> > consumers;
+        LinkedList< Pointer<Command> > messages;
+        LinkedList< Pointer<Command> > messagePulls;
+
+    public:
+
+        virtual ~TrackingTransport() {}
+
+        virtual void start() {}
+
+        virtual void stop() {}
+
+        virtual void close() {}
+
+        virtual void oneway(const Pointer<Command> command) {
+            if (command->isConnectionInfo()) {
+                connections.add(command);
+            } else if (command->isSessionInfo()) {
+                sessions.add(command);
+            } else if (command->isProducerInfo()) {
+                producers.add(command);
+            } else if (command->isConsumerInfo()) {
+                consumers.add(command);
+            } else if (command->isMessage()) {
+                messages.add(command);
+            } else if (command->isMessagePull()) {
+                messagePulls.add(command);
+            }
+        }
+
+        virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
+                                                     const Pointer<ResponseCallback> responseCallback) {
+            throw UnsupportedOperationException();
+        }
+
+        virtual Pointer<Response> request(const Pointer<Command> command) {
+            throw UnsupportedOperationException();
+        }
+
+        virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout) {
+            throw UnsupportedOperationException();
+        }
+
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+            return Pointer<wireformat::WireFormat>();
+        }
+
+        virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat) {
+        }
+
+        virtual void setTransportListener(TransportListener* listener) {
+        }
+
+        virtual TransportListener* getTransportListener() const {
+            return NULL;
+        }
+
+        virtual Transport* narrow(const std::type_info& typeId) {
+            return NULL;
+        }
+
+        virtual bool isFaultTolerant() const {
+            return false;
+        }
+
+        virtual bool isConnected() const {
+            return true;
+        }
+
+        virtual bool isClosed() const {
+            return false;
+        }
+
+        virtual bool isReconnectSupported() const {
+            return false;
+        }
+
+        virtual bool isUpdateURIsSupported() const {
+            return false;
+        }
+
+        virtual std::string getRemoteAddress() const {
+            return "";
+        }
+
+        virtual void reconnect(const decaf::net::URI& uri)  {
+        }
+
+        virtual void updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& uris) {
+        }
+
+    };
+
+    class ConnectionData {
+    public:
+
+        Pointer<ConnectionInfo> connection;
+        Pointer<SessionInfo> session;
+        Pointer<ConsumerInfo> consumer;
+        Pointer<ProducerInfo> producer;
+
+    };
+
+    ConnectionData createConnectionState(ConnectionStateTracker& tracker) {
+
+        ConnectionData conn;
+
+        Pointer<ConnectionId> connectionId(new ConnectionId);
+        connectionId->setValue("CONNECTION");
+        conn.connection.reset(new ConnectionInfo);
+        conn.connection->setConnectionId(connectionId);
+
+        Pointer<SessionId> session_id(new SessionId);
+        session_id->setConnectionId("CONNECTION");
+        session_id->setValue(12345);
+        conn.session.reset(new SessionInfo);
+        conn.session->setSessionId(session_id);
+
+        Pointer<ConsumerId> consumer_id(new ConsumerId);
+        consumer_id->setConnectionId("CONNECTION");
+        consumer_id->setSessionId(12345);
+        consumer_id->setValue(42);
+        conn.consumer.reset(new ConsumerInfo);
+        conn.consumer->setConsumerId(consumer_id);
+
+        Pointer<ProducerId> producer_id(new ProducerId);
+        producer_id->setConnectionId("CONNECTION");
+        producer_id->setSessionId(12345);
+        producer_id->setValue(42);
+        conn.producer.reset(new ProducerInfo);
+        conn.producer->setProducerId(producer_id);
+
+        tracker.processConnectionInfo(conn.connection.get());
+        tracker.processSessionInfo(conn.session.get());
+        tracker.processConsumerInfo(conn.consumer.get());
+        tracker.processProducerInfo(conn.producer.get());
+
+        return conn;
+    }
+
+    void clearConnectionState(ConnectionStateTracker& tracker, ConnectionData& conn) {
+        tracker.processRemoveProducer(conn.producer->getProducerId().get());
+        tracker.processRemoveConsumer(conn.consumer->getConsumerId().get());
+        tracker.processRemoveSession(conn.session->getSessionId().get());
+        tracker.processRemoveConnection(conn.connection->getConnectionId().get());
+    }
+
+}
 
 ////////////////////////////////////////////////////////////////////////////////
 void ConnectionStateTrackerTest::test() {
 
-    Pointer<ConnectionId> conn_id( new ConnectionId );
-    conn_id->setValue( "CONNECTION" );
-    Pointer<ConnectionInfo> conn_info( new ConnectionInfo );
-    conn_info->setConnectionId( conn_id );
-
-    Pointer<SessionId> session_id( new SessionId );
-    session_id->setConnectionId( "CONNECTION" );
-    session_id->setValue( 12345 );
-    Pointer<SessionInfo> session_info( new SessionInfo );
-    session_info->setSessionId( session_id );
-
-    Pointer<ConsumerId> consumer_id( new ConsumerId );
-    consumer_id->setConnectionId( "CONNECTION" );
-    consumer_id->setSessionId( 12345 );
-    consumer_id->setValue( 42 );
-    Pointer<ConsumerInfo> consumer_info( new ConsumerInfo );
-    consumer_info->setConsumerId( consumer_id );
-
-    Pointer<ProducerId> producer_id( new ProducerId );
-    producer_id->setConnectionId( "CONNECTION" );
-    producer_id->setSessionId( 12345 );
-    producer_id->setValue( 42 );
-    Pointer<ProducerInfo> producer_info( new ProducerInfo );
-    producer_info->setProducerId( producer_id );
+    ConnectionStateTracker tracker;
+    ConnectionData conn = createConnectionState(tracker);
+    clearConnectionState(tracker, conn);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTrackerTest::testMessageCache() {
+
+    Pointer<TrackingTransport> transport(new TrackingTransport);
+    ConnectionStateTracker tracker;
+    tracker.setTrackMessages(true);
+
+    ConnectionData conn = createConnectionState(tracker);
 
+    int messageSize;
+    {
+        decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
+        id->setProducerId(conn.producer->getProducerId());
+        Pointer<Message> message(new Message);
+        messageSize = message->getSize();
+    }
+
+    tracker.setMaxMessageCacheSize(messageSize * 3);
+
+    int sequenceId = 1;
+
+    for (int i = 0; i < 100; ++i) {
+        decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
+        id->setProducerId(conn.producer->getProducerId());
+        id->setProducerSequenceId(sequenceId++);
+        Pointer<Message> message(new Message);
+        message->setMessageId(id);
+
+        tracker.processMessage(message.get());
+        tracker.trackBack(message);
+    }
+
+    tracker.restore(transport);
+
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Should only be three messages", 4, transport->messages.size());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTrackerTest::testMessagePullCache() {
+
+    Pointer<TrackingTransport> transport(new TrackingTransport);
     ConnectionStateTracker tracker;
-    tracker.processConnectionInfo( conn_info.get() );
-    tracker.processSessionInfo( session_info.get() );
-    tracker.processConsumerInfo( consumer_info.get() );
-    tracker.processProducerInfo( producer_info.get() );
-
-    tracker.processRemoveProducer( producer_id.get() );
-    tracker.processRemoveConsumer( consumer_id.get() );
-    tracker.processRemoveSession( session_id.get() );
-    tracker.processRemoveConnection( conn_id.get() );
+    tracker.setTrackMessages(true);
+
+    ConnectionData conn = createConnectionState(tracker);
+
+    for (int i = 0; i < 100; ++i) {
+        Pointer<commands::MessagePull> pull(new commands::MessagePull());
+        Pointer<ActiveMQDestination> destination(new ActiveMQTopic("TEST" + Integer::toString(i)));
+        pull->setConsumerId(conn.consumer->getConsumerId());
+        pull->setDestination(destination);
+        tracker.processMessagePull(pull.get());
+        tracker.trackBack(pull);
+    }
+
+    tracker.restore(transport);
 
+    CPPUNIT_ASSERT_EQUAL_MESSAGE("Should only be three message pulls", 10, transport->messagePulls.size());
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h Fri Mar 22 19:48:18 2013
@@ -28,6 +28,8 @@ namespace state {
 
         CPPUNIT_TEST_SUITE( ConnectionStateTrackerTest );
         CPPUNIT_TEST( test );
+        CPPUNIT_TEST( testMessageCache );
+        CPPUNIT_TEST( testMessagePullCache );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -36,6 +38,9 @@ namespace state {
         virtual ~ConnectionStateTrackerTest() {}
 
         void test();
+        void testMessageCache();
+        void testMessagePullCache();
+
     };
 
 }}



Mime
View raw message