Author: tabish
Date: Wed Nov 9 21:59:51 2011
New Revision: 1199994
URL: http://svn.apache.org/viewvc?rev=1199994&view=rev
Log:
Apply patch for: https://issues.apache.org/jira/browse/AMQCPP-384
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1199994&r1=1199993&r2=1199994&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
Wed Nov 9 21:59:51 2011
@@ -108,11 +108,18 @@ Pointer<Tracked> ConnectionStateTracker:
void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
try{
- if( trackMessages && command != NULL && command->isMessage() )
{
- Pointer<Message> message =
- command.dynamicCast<Message>();
- if( message->getTransactionId() == NULL ) {
- currentCacheSize = currentCacheSize + message->getSize();
+ if (command != NULL) {
+ if (trackMessages && command->isMessage()) {
+ Pointer<Message> message = command.dynamicCast<Message>();
+ if (message->getTransactionId() == NULL) {
+ currentCacheSize = currentCacheSize + message->getSize();
+ }
+ } else {
+ Pointer<MessagePull> messagePull = command.dynamicCast<MessagePull>();
+ if (messagePull != NULL) {
+ // just needs to be a rough estimate of size, ~4 identifiers
+ currentCacheSize += 400;
+ }
}
}
}
@@ -148,12 +155,19 @@ void ConnectionStateTracker::restore( co
}
// Now we flush messages
- std::vector< Pointer<Message> > messages = messageCache.values();
- std::vector< Pointer<Message> >::const_iterator messageIter = messages.begin();
+ std::vector< Pointer<Command> > messages = messageCache.values();
+ std::vector< Pointer<Command> >::const_iterator messageIter = messages.begin();
for( ; messageIter != messages.end(); ++messageIter ) {
transport->oneway( *messageIter );
}
+
+ std::vector< Pointer<Command> > messagePulls = messagePullCache.values();
+ std::vector< Pointer<Command> >::const_iterator messagePullIter = messagePulls.begin();
+
+ for(; messagePullIter != messagePulls.end(); ++messagePullIter) {
+ transport->oneway(*messagePullIter);
+ }
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -790,8 +804,25 @@ Pointer<Command> ConnectionStateTracker:
}
////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processMessagePull(MessagePull* pull) {
+
+ try{
+
+ if (pull != NULL && pull->getDestination() != NULL && pull->getConsumerId()
!= NULL) {
+ std::string id = pull->getDestination()->toString() + "::" + pull->getConsumerId()->toString();
+ messagePullCache.put(id, Pointer<Command>(pull->cloneDataStructure()));
+ }
+
+ return Pointer<Command>();
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ConnectionStateTracker::connectionInterruptProcessingComplete(
- transport::Transport* transport, const Pointer<ConnectionId>& connectionId
) {
+ transport::Transport* transport, const Pointer<ConnectionId>& connectionId)
{
Pointer<ConnectionState> connectionState = connectionStates.get( connectionId );
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h?rev=1199994&r1=1199993&r2=1199994&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
Wed Nov 9 21:59:51 2011
@@ -54,9 +54,11 @@ namespace state {
// Either we need to implement something similar to LinkedHashMap or find
// some other way of tracking the eldest entry into the map and removing it
// if the cache size is exceeded.
- ConcurrentStlMap< Pointer<MessageId>, Pointer<Message>,
+ ConcurrentStlMap< Pointer<MessageId>, Pointer<Command>,
MessageId::COMPARATOR > messageCache;
+ ConcurrentStlMap< std::string, Pointer<Command> > messagePullCache;
+
bool trackTransactions;
bool restoreSessions;
bool restoreConsumers;
@@ -122,6 +124,8 @@ namespace state {
virtual Pointer<Command> processEndTransaction( TransactionInfo* info );
+ virtual Pointer<Command> processMessagePull( MessagePull* pull );
+
bool isRestoreConsumers() const {
return this->restoreConsumers;
}
|