activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1199994 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state: ConnectionStateTracker.cpp ConnectionStateTracker.h
Date Wed, 09 Nov 2011 21:59:51 GMT
Author: tabish
Date: Wed Nov  9 21:59:51 2011
New Revision: 1199994

URL: http://svn.apache.org/viewvc?rev=1199994&view=rev
Log:
Apply patch for: https://issues.apache.org/jira/browse/AMQCPP-384

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1199994&r1=1199993&r2=1199994&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
Wed Nov  9 21:59:51 2011
@@ -108,11 +108,18 @@ Pointer<Tracked> ConnectionStateTracker:
 void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
 
     try{
-        if( trackMessages && command != NULL && command->isMessage() )
{
-            Pointer<Message> message =
-                command.dynamicCast<Message>();
-            if( message->getTransactionId() == NULL ) {
-                currentCacheSize = currentCacheSize + message->getSize();
+        if (command != NULL) {
+            if (trackMessages && command->isMessage()) {
+                Pointer<Message> message = command.dynamicCast<Message>();
+                if (message->getTransactionId() == NULL) {
+                    currentCacheSize = currentCacheSize + message->getSize();
+                }
+            } else {
+                Pointer<MessagePull> messagePull = command.dynamicCast<MessagePull>();
+                if (messagePull != NULL) {
+                    // just needs to be a rough estimate of size, ~4 identifiers
+                    currentCacheSize += 400;
+                }
             }
         }
     }
@@ -148,12 +155,19 @@ void ConnectionStateTracker::restore( co
         }
 
         // Now we flush messages
-        std::vector< Pointer<Message> > messages = messageCache.values();
-        std::vector< Pointer<Message> >::const_iterator messageIter = messages.begin();
+        std::vector< Pointer<Command> > messages = messageCache.values();
+        std::vector< Pointer<Command> >::const_iterator messageIter = messages.begin();
 
         for( ; messageIter != messages.end(); ++messageIter ) {
             transport->oneway( *messageIter );
         }
+
+        std::vector< Pointer<Command> > messagePulls = messagePullCache.values();
+        std::vector< Pointer<Command> >::const_iterator messagePullIter = messagePulls.begin();
+
+        for(; messagePullIter != messagePulls.end(); ++messagePullIter) {
+            transport->oneway(*messagePullIter);
+        }
     }
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -790,8 +804,25 @@ Pointer<Command> ConnectionStateTracker:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processMessagePull(MessagePull* pull) {
+
+    try{
+
+        if (pull != NULL && pull->getDestination() != NULL && pull->getConsumerId()
!= NULL) {
+            std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString();
+            messagePullCache.put(id, Pointer<Command>(pull->cloneDataStructure()));
+        }
+
+        return Pointer<Command>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ConnectionStateTracker::connectionInterruptProcessingComplete(
-    transport::Transport* transport, const Pointer<ConnectionId>& connectionId
) {
+    transport::Transport* transport, const Pointer<ConnectionId>& connectionId)
{
 
     Pointer<ConnectionState> connectionState = connectionStates.get( connectionId );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h?rev=1199994&r1=1199993&r2=1199994&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
Wed Nov  9 21:59:51 2011
@@ -54,9 +54,11 @@ namespace state {
         //        Either we need to implement something similar to LinkedHashMap or find
         //        some other way of tracking the eldest entry into the map and removing it
         //        if the cache size is exceeded.
-        ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
+        ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
                           MessageId::COMPARATOR > messageCache;
 
+        ConcurrentStlMap< std::string, Pointer<Command> > messagePullCache;
+
         bool trackTransactions;
         bool restoreSessions;
         bool restoreConsumers;
@@ -122,6 +124,8 @@ namespace state {
 
         virtual Pointer<Command> processEndTransaction( TransactionInfo* info );
 
+        virtual Pointer<Command> processMessagePull( MessagePull* pull );
+
         bool isRestoreConsumers() const {
             return this->restoreConsumers;
         }



Mime
View raw message