activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r748196 - in /activemq/activemq-cpp/trunk/src/main/activemq/state: ConnectionState.h ConnectionStateTracker.cpp ConnectionStateTracker.h SessionState.h TransactionState.h
Date Thu, 26 Feb 2009 16:24:46 GMT
Author: tabish
Date: Thu Feb 26 16:24:42 2009
New Revision: 748196

URL: http://svn.apache.org/viewvc?rev=748196&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100

A mostly functional Connection State Tracker

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionState.h
    activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h
    activemq/activemq-cpp/trunk/src/main/activemq/state/SessionState.h
    activemq/activemq-cpp/trunk/src/main/activemq/state/TransactionState.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionState.h?rev=748196&r1=748195&r2=748196&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionState.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionState.h Thu Feb 26 16:24:42
2009
@@ -42,7 +42,7 @@
 namespace state {
 
     using decaf::lang::Pointer;
-    using decaf::util::concurrent::ConcurrentStlMap;
+    using namespace decaf::util;
     using namespace activemq::commands;
 
     class ConnectionState {
@@ -55,7 +55,7 @@
         ConcurrentStlMap< Pointer<SessionId>,
                           Pointer<SessionState>,
                           SessionId::COMPARATOR > sessions;
-        decaf::util::StlList< Pointer<DestinationInfo> > tempDestinations;
+        StlList< Pointer<DestinationInfo> > tempDestinations;
         decaf::util::concurrent::atomic::AtomicBoolean disposed;
 
     public:
@@ -103,9 +103,9 @@
             return transactions.get( id );
         }
 
-//        Collection<TransactionState> getTransactionStates() {
-//            return transactions.values();
-//        }
+        std::vector< Pointer<TransactionState> > getTransactionStates() const
{
+            return transactions.values();
+        }
 
         Pointer<TransactionState> removeTransactionState( const Pointer<TransactionId>&
id ) {
             return transactions.remove( id );
@@ -128,14 +128,14 @@
 //        Set<SessionId> getSessionIds() {
 //            return sessions.keySet();
 //        }
-//
-//        List<DestinationInfo> getTempDesinations() {
-//            return tempDestinations;
-//        }
-//
-//        Collection<SessionState> getSessionStates() {
-//            return sessions.values();
-//        }
+
+        const StlList< Pointer<DestinationInfo> >& getTempDesinations() const
{
+            return tempDestinations;
+        }
+
+        std::vector< Pointer<SessionState> > getSessionStates() const {
+            return sessions.values();
+        }
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp?rev=748196&r1=748195&r2=748196&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp Thu Feb
26 16:24:42 2009
@@ -26,6 +26,7 @@
 using namespace activemq::exceptions;
 using namespace decaf;
 using namespace decaf::lang;
+using namespace decaf::io;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -76,15 +77,462 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<Tracked> ConnectionStateTracker::track( const Pointer<Command>& command
)
+    throw( decaf::io::IOException ) {
+
+    try{
+        return command->visit( this ).dynamicCast<Tracked, Pointer<Tracked>::CounterType>();
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::trackBack( const Pointer<Command>& command ) {
+
+    try{
+        if( trackMessages && command != NULL && command->isMessage() )
{
+            Pointer<Message> message =
+                command.dynamicCast<Message, Pointer<Message>::CounterType>();
+            if( message->getTransactionId() == NULL ) {
+                currentCacheSize = currentCacheSize + message->getSize();
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::restore( const Pointer<transport::Transport>& transport
)
+    throw( decaf::io::IOException ) {
+
+    try{
+
+        std::vector< Pointer<ConnectionState> > connectionStates = this->connectionStates.values();
+        std::vector< Pointer<ConnectionState> >::const_iterator iter = connectionStates.begin();
+
+        for( ; iter != connectionStates.end(); ++iter ) {
+            Pointer<ConnectionState> state = *iter;
+            transport->oneway( state->getInfo() );
+            doRestoreTempDestinations( transport, state );
+
+            if( restoreSessions ) {
+                doRestoreSessions( transport, state );
+            }
+
+            if( restoreTransaction ) {
+                doRestoreTransactions( transport, state );
+            }
+        }
+
+        // Now we flush messages
+        std::vector< Pointer<Message> > messages = messageCache.values();
+        std::vector< Pointer<Message> >::const_iterator messageIter = messages.begin();
+
+        for( ; messageIter != messages.end(); ++messageIter ) {
+            transport->oneway( *messageIter );
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::doRestoreTransactions( const Pointer<transport::Transport>&
transport,
+                                                    const Pointer<ConnectionState>&
connectionState )
+    throw( decaf::io::IOException ) {
+
+    try{
+
+        // Restore the session's transaction state
+        std::vector< Pointer<TransactionState> > transactionStates =
+            connectionState->getTransactionStates();
+
+        std::vector< Pointer<TransactionState> >::const_iterator iter = transactionStates.begin();
+
+        for( ; iter != transactionStates.end(); ++iter ) {
+            Pointer<TransactionState> state = *iter;
+
+            std::auto_ptr< Iterator< Pointer<Command> > > commands(
+                state->getCommands().iterator() );
+
+            while( commands->hasNext() ) {
+                transport->oneway( commands->next() );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::doRestoreSessions( const Pointer<transport::Transport>&
transport,
+                                                const Pointer<ConnectionState>&
connectionState )
+    throw( decaf::io::IOException ) {
+
+    try{
+
+        std::vector< Pointer<SessionState> > sessionStates = connectionState->getSessionStates();
+
+        std::vector< Pointer<SessionState> >::const_iterator iter = sessionStates.begin();
+
+        // Restore the Session State
+        for( ; iter != sessionStates.end(); ++iter ) {
+            Pointer<SessionState> state = *iter;
+            transport->oneway( state->getInfo() );
+
+            if( restoreProducers ) {
+                doRestoreProducers( transport, state );
+            }
+
+            if( restoreConsumers ) {
+                doRestoreConsumers( transport, state );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::doRestoreConsumers( const Pointer<transport::Transport>&
transport,
+                                                 const Pointer<SessionState>& sessionState
)
+    throw( decaf::io::IOException ) {
+
+    try{
+
+        // Restore the session's consumers
+        std::vector< Pointer<ConsumerState> > consumerStates = sessionState->getConsumerStates();
+
+        std::vector< Pointer<ConsumerState> >::const_iterator iter = consumerStates.begin();
+
+        for( ; iter != consumerStates.end(); ++iter ) {
+            Pointer<ConsumerState> state = *iter;
+            transport->oneway( state->getInfo() );
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::doRestoreProducers( const Pointer<transport::Transport>&
transport,
+                                                 const Pointer<SessionState>& sessionState
)
+    throw( decaf::io::IOException ) {
+
+    try{
+
+        // Restore the session's producers
+        std::vector< Pointer<ProducerState> > producerStates = sessionState->getProducerStates();
+
+        std::vector< Pointer<ProducerState> >::const_iterator iter = producerStates.begin();
+
+        for( ; iter != producerStates.end(); ++iter ) {
+            Pointer<ProducerState> state = *iter;
+            transport->oneway( state->getInfo() );
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::doRestoreTempDestinations( const Pointer<transport::Transport>&
transport,
+                                                        const Pointer<ConnectionState>&
connectionState )
+    throw( decaf::io::IOException ) {
+
+    try{
+
+        std::auto_ptr< Iterator< Pointer<DestinationInfo> > > iter(
+            connectionState->getTempDesinations().iterator() );
+
+        while( iter->hasNext() ) {
+            transport->oneway( iter->next() );
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processAddDestination( DestinationInfo* info
)
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+        if( info != NULL ) {
+            Pointer<ConnectionState> cs = connectionStates.get( info->getConnectionId()
);
+            if( cs != NULL && info->getDestination()->isTemporary() ) {
+                cs->addTempDestination( Pointer<DestinationInfo>( info->cloneDataStructure()
) );
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processRemoveDestination( DestinationInfo*
info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+        if( info != NULL ) {
+            Pointer<ConnectionState> cs = connectionStates.get( info->getConnectionId()
);
+            if( cs != NULL && info->getDestination()->isTemporary() ) {
+                cs->removeTempDestination( info->getDestination() );
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processAddProducer( ProducerInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( info != NULL && info->getProducerId() != NULL ) {
+            Pointer<SessionId> sessionId = info->getProducerId()->getParentId();
+            if( sessionId != NULL ) {
+                Pointer<ConnectionId> connectionId = sessionId->getParentId();
+                if( connectionId != NULL ) {
+                    Pointer<ConnectionState> cs = connectionStates.get( connectionId
);
+                    if( cs != NULL ) {
+                        Pointer<SessionState> ss = cs->getSessionState( sessionId
);
+                        if( ss != NULL ) {
+                            ss->addProducer(
+                                Pointer<ProducerInfo>( info->cloneDataStructure()
) );
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processRemoveProducer( ProducerId* id )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( id != NULL ) {
+            Pointer<SessionId> sessionId = id->getParentId();
+            if( sessionId != NULL ) {
+                Pointer<ConnectionId> connectionId = sessionId->getParentId();
+                if( connectionId != NULL ) {
+                    Pointer<ConnectionState> cs = connectionStates.get( connectionId
);
+                    if( cs != NULL ) {
+                        Pointer<SessionState> ss = cs->getSessionState( sessionId
);
+                        if( ss != NULL ) {
+                            ss->removeProducer( Pointer<ProducerId>( id->cloneDataStructure()
) );
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processAddConsumer( ConsumerInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( info != NULL ) {
+            Pointer<SessionId> sessionId = info->getConsumerId()->getParentId();
+            if( sessionId != NULL ) {
+                Pointer<ConnectionId> connectionId = sessionId->getParentId();
+                if( connectionId != NULL ) {
+                    Pointer<ConnectionState> cs = connectionStates.get( connectionId
);
+                    if( cs != NULL ) {
+                        Pointer<SessionState> ss = cs->getSessionState( sessionId
);
+                        if( ss != NULL ) {
+                            ss->addConsumer(
+                                Pointer<ConsumerInfo>( info->cloneDataStructure()
) );
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processRemoveConsumer( ConsumerId* id )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+        if( id != NULL ) {
+            Pointer<SessionId> sessionId = id->getParentId();
+            if( sessionId != NULL ) {
+                Pointer<ConnectionId> connectionId = sessionId->getParentId();
+                if( connectionId != NULL ) {
+                    Pointer<ConnectionState> cs = connectionStates.get( connectionId
);
+                    if( cs != NULL ) {
+                        Pointer<SessionState> ss = cs->getSessionState( sessionId
);
+                        if( ss != NULL ) {
+                            ss->removeConsumer( Pointer<ConsumerId>( id->cloneDataStructure()
) );
+                        }
+                    }
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processAddSession( SessionInfo* info)
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( info != NULL ) {
+            Pointer<ConnectionId> connectionId = info->getSessionId()->getParentId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    cs->addSession( Pointer<SessionInfo>( info->cloneDataStructure()
) );
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processRemoveSession( SessionId* id)
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( id != NULL ) {
+            Pointer<ConnectionId> connectionId = id->getParentId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    cs->removeSession( Pointer<SessionId>( id->cloneDataStructure()
) );
+                }
+            }
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processAddConnection( ConnectionInfo* info
)
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( info != NULL ) {
+            Pointer<ConnectionInfo> infoCopy( info->cloneDataStructure() );
+            connectionStates.put( info->getConnectionId(),
+                                  Pointer<ConnectionState>( new ConnectionState( infoCopy
) ) );
+        }
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processRemoveConnection( ConnectionId* id
)
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( id != NULL ) {
+            connectionStates.remove( Pointer<ConnectionId>( id->cloneDataStructure()
) );
+        }
+
+        return TRACKED_RESPONSE_MARKER;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processMessage( Message* message )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+        if( message != NULL ) {
+            if( trackTransactions && message->getTransactionId() != NULL ) {
+                Pointer<ConnectionId> connectionId =
+                    message->getProducerId()->getParentId()->getParentId();
+                if( connectionId != NULL ) {
+                    Pointer<ConnectionState> cs = connectionStates.get( connectionId
);
+                    if( cs != NULL ) {
+                        Pointer<TransactionState> transactionState =
+                            cs->getTransactionState( message->getTransactionId() );
+                        if( transactionState != NULL ) {
+                            transactionState->addCommand(
+                                Pointer<Command>( message->cloneDataStructure()
) );
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }else if( trackMessages ) {
+                messageCache.put( message->getMessageId(),
+                                  Pointer<Message>( message->cloneDataStructure()
) );
+            }
+        }
+        return Pointer<Response>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 Pointer<Command> ConnectionStateTracker::processMessageAck( MessageAck* ack )
     throw ( exceptions::ActiveMQException ) {
 
     try{
 
         if( trackTransactions && ack != NULL && ack->getTransactionId()
!= NULL) {
-            // TODO
             Pointer<ConnectionId> connectionId;// =
-                //ack->getConsumerId()->getParentId().getParentId();
+                ack->getConsumerId()->getParentId()->getParentId();
             if( connectionId != NULL ) {
                 Pointer<ConnectionState> cs = connectionStates.get( connectionId );
                 if( cs != NULL ) {

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h?rev=748196&r1=748195&r2=748196&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h Thu Feb 26
16:24:42 2009
@@ -45,9 +45,15 @@
         static const Pointer<Tracked> TRACKED_RESPONSE_MARKER;
 
         // TODO - Create a Thread Safe impl of Map.
-        decaf::util::StlMap< Pointer<ConnectionId>,
-                             Pointer<ConnectionState>,
-                             commands::ConnectionId::COMPARATOR > connectionStates;
+        decaf::util::StlMap< Pointer<ConnectionId>, Pointer<ConnectionState>,
+                             ConnectionId::COMPARATOR > connectionStates;
+
+        // TODO - The Map doesn't have a way to automatically remove the eldest Entry
+        //        Either we need to implement something similar to LinkedHashMap or find
+        //        some other way of tracking the eldest entry into the map and removing it
+        //        if the cache size is exceeded.
+        decaf::util::StlMap< Pointer<MessageId>, Pointer<Message>,
+                             MessageId::COMPARATOR > messageCache;
 
         bool trackTransactions;
         bool restoreSessions;
@@ -66,7 +72,7 @@
 
         virtual ~ConnectionStateTracker();
 
-        Tracked track( const Pointer<Command>& command ) throw( decaf::io::IOException
);
+        Pointer<Tracked> track( const Pointer<Command>& command ) throw(
decaf::io::IOException );
 
         void trackBack( const Pointer<Command>& command );
 
@@ -103,7 +109,7 @@
         virtual Pointer<Command> processRemoveConnection( ConnectionId* id )
             throw ( exceptions::ActiveMQException );
 
-        virtual Pointer<Command> processMessage( Message* send )
+        virtual Pointer<Command> processMessage( Message* message )
             throw ( exceptions::ActiveMQException );
 
         virtual Pointer<Command> processMessageAck( MessageAck* ack )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/SessionState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/SessionState.h?rev=748196&r1=748195&r2=748196&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/SessionState.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/SessionState.h Thu Feb 26 16:24:42
2009
@@ -93,18 +93,18 @@
 //        Set<commands::ProducerId> getProducerIds() {
 //            return producers.keySet();
 //        }
-//
-//        Collection<ProducerState> getProducerStates() {
-//            return producers.values();
-//        }
+
+        std::vector< Pointer<ProducerState> > getProducerStates() const {
+            return producers.values();
+        }
 
         Pointer<ProducerState> getProducerState( const Pointer<ProducerId>&
id ) {
             return producers.get( id );
         }
 
-//        Collection<ConsumerState> getConsumerStates() {
-//            return consumers.values();
-//        }
+        std::vector< Pointer<ConsumerState> > getConsumerStates() const {
+            return consumers.values();
+        }
 
         Pointer<ConsumerState> getConsumerState( const Pointer<ConsumerId>&
id ) {
             return consumers.get( id );

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/TransactionState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/TransactionState.h?rev=748196&r1=748195&r2=748196&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/TransactionState.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/TransactionState.h Thu Feb 26 16:24:42
2009
@@ -62,7 +62,7 @@
             this->disposed.set( true );
         }
 
-        const decaf::util::List< Pointer<Command> >& getCommands() {
+        const StlList< Pointer<Command> >& getCommands() const {
             return commands;
         }
 



Mime
View raw message