activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r906751 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: commands/ core/ state/ transport/failover/
Date Fri, 05 Feb 2010 00:45:07 GMT
Author: tabish
Date: Fri Feb  5 00:45:07 2010
New Revision: 906751

URL: http://svn.apache.org/viewvc?rev=906751&view=rev
Log:
Updates to the State Tracker and Failover code to improve recovery.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp Fri Feb  5 00:45:07 2010
@@ -80,6 +80,7 @@
     // Copy the data of the base class or classes
     BaseCommand::copyDataStructure( src );
 
+    this->setDestination( srcPtr->getDestination() );
     this->setClose( srcPtr->isClose() );
     this->setConsumerId( srcPtr->getConsumerId() );
     this->setPrefetch( srcPtr->getPrefetch() );
@@ -102,6 +103,13 @@
            << "commandId = " << this->getCommandId() << ", "
            << "responseRequired = " << boolalpha << this->isResponseRequired();
     stream << ", ";
+    stream << "Destination = ";
+    if( this->getDestination() != NULL ) {
+        stream << this->getDestination()->toString();
+    } else {
+        stream << "NULL";
+    }
+    stream << ", ";
     stream << "Close = " << this->isClose();
     stream << ", ";
     stream << "ConsumerId = ";
@@ -136,6 +144,13 @@
         return false;
     }
 
+    if( this->getDestination() != NULL ) {
+        if( !this->getDestination()->equals( valuePtr->getDestination().get() ) ) {
+            return false;
+        }
+    } else if( valuePtr->getDestination() != NULL ) {
+        return false;
+    }
     if( this->isClose() != valuePtr->isClose() ) {
         return false;
     }
@@ -165,6 +180,21 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+const decaf::lang::Pointer<ActiveMQDestination>& ConsumerControl::getDestination() const {
+    return destination;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Pointer<ActiveMQDestination>& ConsumerControl::getDestination() {
+    return destination;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConsumerControl::setDestination( const decaf::lang::Pointer<ActiveMQDestination>& destination ) {
+    this->destination = destination;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ConsumerControl::isClose() const {
     return close;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h Fri Feb  5 00:45:07 2010
@@ -23,6 +23,7 @@
 #pragma warning( disable : 4290 )
 #endif
 
+#include <activemq/commands/ActiveMQDestination.h>
 #include <activemq/commands/BaseCommand.h>
 #include <activemq/commands/ConsumerId.h>
 #include <activemq/util/Config.h>
@@ -47,6 +48,7 @@
     class AMQCPP_API ConsumerControl : public BaseCommand {
     protected:
 
+        Pointer<ActiveMQDestination> destination;
         bool close;
         Pointer<ConsumerId> consumerId;
         int prefetch;
@@ -105,6 +107,10 @@
          */
         virtual bool equals( const DataStructure* value ) const;
 
+        virtual const Pointer<ActiveMQDestination>& getDestination() const;
+        virtual Pointer<ActiveMQDestination>& getDestination();
+        virtual void setDestination( const Pointer<ActiveMQDestination>& destination );
+
         virtual bool isClose() const;
         virtual void setClose( bool close );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Fri Feb  5 00:45:07 2010
@@ -25,10 +25,13 @@
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/exceptions/BrokerException.h>
 #include <activemq/util/CMSExceptionSupport.h>
+#include <activemq/transport/failover/FailoverTransport.h>
 
+#include <decaf/lang/Math.h>
 #include <decaf/lang/Boolean.h>
 #include <decaf/util/Iterator.h>
 #include <decaf/util/UUID.h>
+#include <decaf/util/concurrent/TimeUnit.h>
 
 #include <activemq/commands/Command.h>
 #include <activemq/commands/ActiveMQMessage.h>
@@ -56,9 +59,12 @@
 using namespace activemq::core;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
 using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::util;
+using namespace decaf::util::concurrent;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
@@ -224,7 +230,12 @@
             return;
         }
 
-        // Indicates we are on the way out to squelch any exceptions getting
+        // If we are running lets stop first.
+        if( !this->transportFailed.get() ) {
+            this->stop();
+        }
+
+        // Indicates we are on the way out to suppress any exceptions getting
         // passed on from the transport as it goes down.
         this->closing.set( true );
 
@@ -234,18 +245,23 @@
             allSessions = activeSessions.toArray();
         }
 
+        long long lastDeliveredSequenceId = 0;
+
         // Close all of the resources.
         for( unsigned int ix=0; ix<allSessions.size(); ++ix ){
-            cms::Session* session = allSessions[ix];
+            ActiveMQSession* session = allSessions[ix];
             try{
                 session->close();
+
+                lastDeliveredSequenceId =
+                    Math::max( lastDeliveredSequenceId, session->getLastDeliveredSequenceId() );
             } catch( cms::CMSException& ex ){
                 /* Absorb */
             }
         }
 
         // Now inform the Broker we are shutting down.
-        this->disconnect();
+        this->disconnect( lastDeliveredSequenceId );
 
         // Once current deliveries are done this stops the delivery
         // of any new messages.
@@ -261,15 +277,16 @@
 
         enforceConnected();
 
-        // This starts or restarts the delivery of all incomming messages
+        // This starts or restarts the delivery of all incoming messages
         // messages delivered while this connection is stopped are dropped
         // and not acknowledged.
-        this->started.set( true );
+        if( this->started.compareAndSet( false, true ) ) {
 
-        // Start all the sessions.
-        std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
-        for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
-            sessions[ix]->start();
+            // Start all the sessions.
+            std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
+            for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
+                sessions[ix]->start();
+            }
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -284,12 +301,12 @@
 
         // Once current deliveries are done this stops the delivery of any
         // new messages.
-        this->started.set( false );
-
-        std::auto_ptr< Iterator<ActiveMQSession*> > iter( activeSessions.iterator() );
+        if( this->started.compareAndSet( true, false ) ) {
+            std::auto_ptr< Iterator<ActiveMQSession*> > iter( activeSessions.iterator() );
 
-        while( iter->hasNext() ){
-            iter->next()->stop();
+            while( iter->hasNext() ){
+                iter->next()->stop();
+            }
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -329,13 +346,14 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::disconnect() throw ( activemq::exceptions::ActiveMQException ) {
+void ActiveMQConnection::disconnect( long long lastDeliveredSequenceId )
+    throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
 
         // Remove our ConnectionId from the Broker
-        Pointer<RemoveInfo> command( new RemoveInfo() );
-        command->setObjectId( this->connectionInfo->getConnectionId() );
+        Pointer<RemoveInfo> command( this->connectionInfo->createRemoveCommand() );
+        command->setLastDeliveredSequenceId( lastDeliveredSequenceId );
         this->syncRequest( command, this->getCloseTimeout() );
 
         // Send the disconnect command to the broker.
@@ -440,6 +458,9 @@
 
             Pointer<MessageDispatch> dispatch = command.dynamicCast<MessageDispatch>();
 
+            // Check first to see if we are recovering.
+            waitForTransportInterruptionProcessingToComplete();
+
             // Check for an empty Message, shouldn't ever happen but who knows.
             if( dispatch->getMessage() == NULL ) {
                 throw ActiveMQException(
@@ -527,6 +548,9 @@
             return;
         }
 
+        // Mark this Connection as having a Failed transport.
+        this->transportFailed.set( true );
+
         // Inform the user of the error.
         fire( exceptions::ActiveMQException( ex ) );
 
@@ -548,6 +572,8 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::transportInterrupted() {
 
+    transportInterruptionProcessingComplete.reset( new CountDownLatch( dispatchers.size() ) );
+
     synchronized( &activeSessions ) {
         std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->activeSessions.iterator() );
 
@@ -694,3 +720,58 @@
         transportListeners.remove( transportListener );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete()
+    throw( decaf::lang::exceptions::InterruptedException ) {
+
+    if( transportInterruptionProcessingComplete != NULL ) {
+
+        while( !closed.get() && !transportFailed.get() &&
+               !transportInterruptionProcessingComplete->await( 15, TimeUnit::SECONDS) ) {
+
+            //LOG.warn( "dispatch paused, waiting for outstanding dispatch interruption processing (" +
+            //          transportInterruptionProcessingComplete.getCount() + ") to complete..");
+        }
+
+        signalInterruptionProcessingComplete();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setTransportInterruptionProcessingComplete() {
+
+    synchronized( &mutex ) {
+
+        if( transportInterruptionProcessingComplete != NULL ) {
+            transportInterruptionProcessingComplete->countDown();
+
+            try {
+                signalInterruptionProcessingComplete();
+            } catch( InterruptedException& ignored ) {}
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::signalInterruptionProcessingComplete()
+    throw( decaf::lang::exceptions::InterruptedException ) {
+
+    if( transportInterruptionProcessingComplete->await( 0, TimeUnit::SECONDS ) ) {
+        synchronized( &mutex ) {
+
+            transportInterruptionProcessingComplete.reset( NULL );
+            FailoverTransport* failoverTransport =
+                dynamic_cast<FailoverTransport*>( this->getTransport().narrow( typeid( FailoverTransport ) ) );
+
+            if( failoverTransport != NULL ) {
+                failoverTransport->setConnectionInterruptProcessingComplete(
+                    this->connectionInfo->getConnectionId() );
+
+                //if( LOG.isDebugEnabled() ) {
+                //    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
+                //}
+            }
+        }
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Fri Feb  5 00:45:07 2010
@@ -35,6 +35,7 @@
 #include <decaf/util/Properties.h>
 #include <decaf/util/StlMap.h>
 #include <decaf/util/StlSet.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -48,6 +49,7 @@
 
     using decaf::lang::Pointer;
     using decaf::util::concurrent::atomic::AtomicBoolean;
+    using decaf::util::concurrent::CountDownLatch;
 
     class ActiveMQSession;
     class ActiveMQProducer;
@@ -104,6 +106,11 @@
         AtomicBoolean closing;
 
         /**
+         * Indicates that this connection's Transport has failed.
+         */
+        AtomicBoolean transportFailed;
+
+        /**
          * Map of message dispatchers indexed by consumer id.
          */
         DispatcherMap dispatchers;
@@ -138,6 +145,12 @@
          */
         Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
 
+        /**
+         * Latch used to track completion of recovery of consumers
+         * after a Connection Interrupted event.
+         */
+        Pointer<CountDownLatch> transportInterruptionProcessingComplete;
+
     public:
 
         /**
@@ -217,6 +230,14 @@
         }
 
         /**
+         * Checks if the Connection's Transport has failed
+         * @return true if the Connection's Transport has failed.
+         */
+        bool isTransportFailed() const {
+            return this->transportFailed.get();
+        }
+
+        /**
          * Requests that the Broker removes the given Destination.  Calling this
          * method implies that the client is finished with the Destination and that
          * no other messages will be sent or received for the given Destination.  The
@@ -427,17 +448,31 @@
          */
         virtual void fire( const exceptions::ActiveMQException& ex );
 
+        /**
+         * Indicates that a Connection resource that is processing the transportInterrupted
+         * event has completed.
+         */
+        void setTransportInterruptionProcessingComplete();
+
     private:
 
         // Sends the connect message to the broker and waits for the response.
         void connect() throw ( activemq::exceptions::ActiveMQException );
 
         // Sends a oneway disconnect message to the broker.
-        void disconnect() throw ( activemq::exceptions::ActiveMQException );
+        void disconnect( long long lastDeliveredSequenceId ) throw ( activemq::exceptions::ActiveMQException );
 
         // Check for Connected State and Throw an exception if not.
         void enforceConnected() const throw ( activemq::exceptions::ActiveMQException );
 
+        // Waits for all Consumers to handle the Transport Interrupted event.
+        void waitForTransportInterruptionProcessingToComplete()
+            throw ( decaf::lang::exceptions::InterruptedException );
+
+        // Marks processing complete for a single caller when interruption processing completes.
+        void signalInterruptionProcessingComplete()
+            throw ( decaf::lang::exceptions::InterruptedException );
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp Fri Feb  5 00:45:07 2010
@@ -25,6 +25,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 ConnectionState::ConnectionState( const Pointer<ConnectionInfo>& info ) : disposed( false ) {
 
+    this->connectionInterruptProcessingComplete = true;
     this->info = info;
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h Fri Feb  5 00:45:07 2010
@@ -30,6 +30,7 @@
 #include <activemq/state/SessionState.h>
 #include <activemq/state/TransactionState.h>
 
+#include <decaf/util/StlMap.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/ConcurrentStlMap.h>
 #include <decaf/util/StlList.h>
@@ -58,6 +59,11 @@
         StlList< Pointer<DestinationInfo> > tempDestinations;
         decaf::util::concurrent::atomic::AtomicBoolean disposed;
 
+        bool connectionInterruptProcessingComplete;
+        StlMap< Pointer<ConsumerId>,
+                Pointer<ConsumerInfo>,
+                ConsumerId::COMPARATOR > recoveringPullConsumers;
+
     public:
 
         ConnectionState( const Pointer<ConnectionInfo>& info );
@@ -133,6 +139,18 @@
             return sessions.values();
         }
 
+        StlMap< Pointer<ConsumerId>, Pointer<ConsumerInfo>, ConsumerId::COMPARATOR > getRecoveringPullConsumers() {
+            return recoveringPullConsumers;
+        }
+
+        void setConnectionInterruptProcessingComplete(bool connectionInterruptProcessingComplete) {
+            this->connectionInterruptProcessingComplete = connectionInterruptProcessingComplete;
+        }
+
+        bool isConnectionInterruptProcessingComplete() {
+            return this->connectionInterruptProcessingComplete;
+        }
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp Fri Feb  5 00:45:07 2010
@@ -20,7 +20,13 @@
 #include <decaf/lang/Runnable.h>
 #include <decaf/lang/exceptions/NoSuchElementException.h>
 
+#include <activemq/commands/ConsumerControl.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/core/ActiveMQConstants.h>
+#include <activemq/transport/TransportListener.h>
+
 using namespace activemq;
+using namespace activemq::core;
 using namespace activemq::state;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
@@ -30,8 +36,6 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-
-////////////////////////////////////////////////////////////////////////////////
 namespace activemq {
 namespace state {
 
@@ -67,6 +71,7 @@
     this->restoreProducers = true;
     this->restoreTransaction = true;
     this->trackMessages = true;
+    this->trackTransactionProducers = true;
     this->maxCacheSize = 128 * 1024;
     this->currentCacheSize = 0;
 }
@@ -153,6 +158,8 @@
 
     try{
 
+        std::vector< Pointer<Command> > toIgnore;
+
         // Restore the session's transaction state
         std::vector< Pointer<TransactionState> > transactionStates =
             connectionState->getTransactionStates();
@@ -160,14 +167,60 @@
         std::vector< Pointer<TransactionState> >::const_iterator iter = transactionStates.begin();
 
         for( ; iter != transactionStates.end(); ++iter ) {
-            Pointer<TransactionState> state = *iter;
+
+            //if( LOG.isDebugEnabled() ) {
+            //    LOG.debug("tx: " + transactionState.getId());
+            //}
+
+            // ignore any empty (ack) transaction
+            if( (*iter)->getCommands().size() == 2 ) {
+                Pointer<Command> lastCommand = (*iter)->getCommands().get(1);
+                if( lastCommand->isTransactionInfo() ) {
+                    Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
+
+                    if( transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE ) {
+                        //if( LOG.isDebugEnabled() ) {
+                        //    LOG.debug("not replaying empty (ack) tx: " + transactionState.getId());
+                        //}
+                        toIgnore.push_back(lastCommand);
+                        continue;
+                    }
+                }
+            }
+
+            // replay short lived producers that may have been involved in the transaction
+            std::vector< Pointer<ProducerState> > producerStates = (*iter)->getProducerStates();
+            std::vector< Pointer<ProducerState> >::const_iterator state = producerStates.begin();
+
+            for( ; state != producerStates.end(); ++state ) {
+                //if( LOG.isDebugEnabled() ) {
+                //    LOG.debug("tx replay producer :" + producerState.getInfo());
+                //}
+                transport->oneway( (*state)->getInfo() );
+            }
 
             std::auto_ptr< Iterator< Pointer<Command> > > commands(
-                state->getCommands().iterator() );
+                (*iter)->getCommands().iterator() );
 
             while( commands->hasNext() ) {
                 transport->oneway( commands->next() );
             }
+
+            state = producerStates.begin();
+            for( ; state != producerStates.end(); ++state ) {
+                //if( LOG.isDebugEnabled() ) {
+                //    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
+                //}
+                transport->oneway( (*state)->getInfo()->createRemoveCommand() );
+            }
+        }
+
+        std::vector< Pointer<Command> >::const_iterator command = toIgnore.begin();
+        for( ; command != toIgnore.end(); ++command ) {
+            // respond to the outstanding commit
+            Pointer<Response> response( new Response() );
+            response->setCorrelationId( (*command)->getCommandId() );
+            transport->getTransportListener()->onCommand( response );
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -212,14 +265,27 @@
 
     try{
 
-        // Restore the session's consumers
+        // Restore the session's consumers but possibly in pull only (prefetch 0 state) till recovery complete
+        Pointer<ConnectionState> connectionState =
+            connectionStates.get( sessionState->getInfo()->getSessionId()->getParentId() );
+        bool connectionInterruptionProcessingComplete =
+            connectionState->isConnectionInterruptProcessingComplete();
+
         std::vector< Pointer<ConsumerState> > consumerStates = sessionState->getConsumerStates();
+        std::vector< Pointer<ConsumerState> >::const_iterator state = consumerStates.begin();
 
-        std::vector< Pointer<ConsumerState> >::const_iterator iter = consumerStates.begin();
+        for( ; state != consumerStates.end(); ++state ) {
 
-        for( ; iter != consumerStates.end(); ++iter ) {
-            Pointer<ConsumerState> state = *iter;
-            transport->oneway( state->getInfo() );
+            Pointer<ConsumerInfo> infoToSend = (*state)->getInfo();
+
+            if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0) {
+
+                infoToSend.reset( (*state)->getInfo()->cloneDataStructure() );
+                connectionState->getRecoveringPullConsumers().put( infoToSend->getConsumerId(), (*state)->getInfo() );
+                infoToSend->setPrefetchSize(0);
+            }
+
+            transport->oneway( infoToSend );
         }
     }
     AMQ_CATCH_RETHROW( IOException )
@@ -501,10 +567,12 @@
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
+
         if( message != NULL ) {
             if( trackTransactions && message->getTransactionId() != NULL ) {
-                Pointer<ConnectionId> connectionId =
-                    message->getProducerId()->getParentId()->getParentId();
+                Pointer<ProducerId> producerId = message->getProducerId();
+                Pointer<ConnectionId> connectionId = producerId->getParentId()->getParentId();
+
                 if( connectionId != NULL ) {
                     Pointer<ConnectionState> cs = connectionStates.get( connectionId );
                     if( cs != NULL ) {
@@ -513,6 +581,13 @@
                         if( transactionState != NULL ) {
                             transactionState->addCommand(
                                 Pointer<Command>( message->cloneDataStructure() ) );
+
+                            if( trackTransactionProducers ) {
+                                // Track the producer in case it is closed before a commit
+                                Pointer<SessionState> sessionState = cs->getSessionState( producerId->getParentId() );
+                                Pointer<ProducerState> producerState = sessionState->getProducerState(producerId);
+                                producerState->setTransactionState(transactionState);
+                            }
                         }
                     }
                 }
@@ -740,3 +815,55 @@
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::connectionInterruptProcessingComplete(
+    transport::Transport* transport, const Pointer<ConnectionId>& connectionId ) {
+
+    Pointer<ConnectionState> connectionState = connectionStates.get( connectionId );
+
+    if( connectionState != NULL ) {
+
+        connectionState->setConnectionInterruptProcessingComplete( true );
+
+        StlMap< Pointer<ConsumerId>, Pointer<ConsumerInfo>, ConsumerId::COMPARATOR > stalledConsumers =
+            connectionState->getRecoveringPullConsumers();
+
+        std::vector< Pointer<ConsumerId> > keySet = stalledConsumers.keySet();
+        std::vector< Pointer<ConsumerId> >::const_iterator key = keySet.begin();
+
+        for( ; key != keySet.end(); ++key ) {
+            Pointer<ConsumerControl> control( new ConsumerControl() );
+
+            control->setConsumerId( *key );
+            control->setPrefetch( stalledConsumers.get( *key )->getPrefetchSize() );
+            control->setDestination( stalledConsumers.get( *key )->getDestination() );
+
+            try {
+
+                //if( LOG.isDebugEnabled() ) {
+                //    LOG.debug("restored recovering consumer: " + control.getConsumerId() +
+                //              " with: " + control.getPrefetch());
+                //}
+                transport->oneway( control );
+            } catch( Exception& ex ) {
+                //if( LOG.isDebugEnabled() ) {
+                //    LOG.debug("Failed to submit control for consumer: " + control.getConsumerId() +
+                //              " with: " + control.getPrefetch(), ex);
+                //}
+            }
+        }
+
+        stalledConsumers.clear();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::transportInterrupted() {
+
+    std::vector< Pointer<ConnectionState> > connectionStatesVec = this->connectionStates.values();
+    std::vector< Pointer<ConnectionState> >::const_iterator state = connectionStatesVec.begin();
+
+    for( ; state != connectionStatesVec.end(); ++state ) {
+        (*state)->setConnectionInterruptProcessingComplete( false );
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h Fri Feb  5 00:45:07 2010
@@ -63,6 +63,7 @@
         bool restoreProducers;
         bool restoreTransaction;
         bool trackMessages;
+        bool trackTransactionProducers;
         int maxCacheSize;
         int currentCacheSize;
 
@@ -81,6 +82,11 @@
         void restore( const Pointer<transport::Transport>& transport )
             throw( decaf::io::IOException );
 
+        void connectionInterruptProcessingComplete(
+            transport::Transport* transport, const Pointer<ConnectionId>& connectionId );
+
+        void transportInterrupted();
+
         virtual Pointer<Command> processDestinationInfo( DestinationInfo* info )
             throw ( exceptions::ActiveMQException );
 
@@ -191,6 +197,14 @@
             this->maxCacheSize = maxCacheSize;
         }
 
+        bool isTrackTransactionProducers() const {
+            return this->trackTransactionProducers;
+        }
+
+        void setTrackTransactionProducers( bool trackTransactionProducers ) {
+            this->trackTransactionProducers = trackTransactionProducers;
+        }
+
     private:
 
         void doRestoreTransactions( const Pointer<transport::Transport>& transport,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp Fri Feb  5 00:45:07 2010
@@ -17,6 +17,8 @@
 
 #include "ProducerState.h"
 
+#include <activemq/state/TransactionState.h>
+
 using namespace activemq;
 using namespace activemq::state;
 using namespace activemq::commands;
@@ -39,3 +41,13 @@
 
     return "NULL";
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ProducerState::setTransactionState( const Pointer<TransactionState>& transactionState ) {
+    this->transactionState = transactionState;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<TransactionState> ProducerState::getTransactionState() const {
+    return this->transactionState;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h Fri Feb  5 00:45:07 2010
@@ -32,10 +32,13 @@
     using namespace decaf::lang;
     using namespace activemq::commands;
 
+    class TransactionState;
+
     class AMQCPP_API ProducerState {
     private:
 
         Pointer<ProducerInfo> info;
+        Pointer<TransactionState> transactionState;
 
     public:
 
@@ -49,6 +52,10 @@
             return this->info;
         }
 
+        void setTransactionState( const Pointer<TransactionState>& transactionState );
+
+        Pointer<TransactionState> getTransactionState() const;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp Fri Feb  5 00:45:07 2010
@@ -17,6 +17,8 @@
 
 #include "SessionState.h"
 
+#include <activemq/state/TransactionState.h>
+
 #include <decaf/lang/exceptions/IllegalStateException.h>
 
 using namespace activemq;
@@ -52,3 +54,24 @@
             __FILE__, __LINE__, "Session already Disposed" );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void SessionState::addProducer( const Pointer<ProducerInfo>& info ) {
+    checkShutdown();
+    producers.put( info->getProducerId(),
+        Pointer<ProducerState>( new ProducerState( info ) ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<ProducerState> SessionState::removeProducer( const Pointer<ProducerId>& id ) {
+
+    Pointer<ProducerState> producerState = producers.remove( id );
+    if( producerState != NULL ) {
+        if( producerState->getTransactionState() != NULL ) {
+            // allow the transaction to recreate dependent producer on recovery
+            producerState->getTransactionState()->addProducerState( producerState );
+        }
+    }
+
+    return producerState;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h Fri Feb  5 00:45:07 2010
@@ -66,15 +66,9 @@
             return this->info;
         }
 
-        void addProducer( const Pointer<ProducerInfo>& info ) {
-            checkShutdown();
-            producers.put( info->getProducerId(),
-                Pointer<ProducerState>( new ProducerState( info ) ) );
-        }
+        void addProducer( const Pointer<ProducerInfo>& info );
 
-        Pointer<ProducerState> removeProducer( const Pointer<ProducerId>& id) {
-            return producers.remove( id );
-        }
+        Pointer<ProducerState> removeProducer( const Pointer<ProducerId>& id );
 
         void addConsumer( const Pointer<ConsumerInfo>& info ) {
             checkShutdown();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp Fri Feb  5 00:45:07 2010
@@ -17,6 +17,8 @@
 
 #include "TransactionState.h"
 
+#include <activemq/state/ProducerState.h>
+
 #include <decaf/lang/exceptions/IllegalStateException.h>
 
 using namespace activemq;
@@ -62,3 +64,16 @@
             __FILE__, __LINE__, "Transaction already Disposed" );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void TransactionState::addProducerState( const Pointer<ProducerState>& producerState ) {
+
+    if( producerState != NULL ) {
+        producers.put( producerState->getInfo()->getProducerId(), producerState );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector< Pointer<ProducerState> > TransactionState::getProducerStates() {
+    return this->producers.values();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h Fri Feb  5 00:45:07 2010
@@ -20,11 +20,13 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/commands/Command.h>
+#include <activemq/commands/ProducerId.h>
 #include <activemq/commands/TransactionId.h>
 
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/StlList.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
 
 #include <string>
 #include <memory>
@@ -35,8 +37,11 @@
     using decaf::lang::Pointer;
     using decaf::util::StlList;
     using decaf::util::concurrent::atomic::AtomicBoolean;
+    using decaf::util::concurrent::ConcurrentStlMap;
     using namespace activemq::commands;
 
+    class ProducerState;
+
     class AMQCPP_API TransactionState {
     private:
 
@@ -45,6 +50,8 @@
         AtomicBoolean disposed;
         bool prepared;
         int preparedResult;
+        ConcurrentStlMap< Pointer<ProducerId>, Pointer<ProducerState>,
+                          ProducerId::COMPARATOR > producers;
 
     public:
 
@@ -86,6 +93,10 @@
             return this->preparedResult;
         }
 
+        void addProducerState( const Pointer<ProducerState>& producerState );
+
+        std::vector< Pointer<ProducerState> > getProducerStates();
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Fri Feb  5 00:45:07 2010
@@ -51,14 +51,18 @@
     this->useExponentialBackOff = true;
     this->initialized = false;
     this->maxReconnectAttempts = 0;
+    this->startupMaxReconnectAttempts = 0;
     this->connectFailures = 0;
     this->reconnectDelay = this->initialReconnectDelay;
     this->trackMessages = false;
+    this->trackTransactionProducers = true;
     this->maxCacheSize = 128 * 1024;
 
     this->started = false;
     this->closed = false;
     this->connected = false;
+    this->connectionInterruptProcessingComplete = false;
+    this->firstConnection = true;
 
     this->transportListener = NULL;
     this->uris.reset( new URIPool() );
@@ -188,8 +192,8 @@
                     return;
                 }
 
-                if( command->isRemoveInfo() ) {
-                    // Simulate response to RemoveInfo command
+                if( command->isRemoveInfo() || command->isMessageAck() ) {
+                    // Simulate response to RemoveInfo command or Ack as they will be stale.
                     stateTracker.track( command );
                     Pointer<Response> response( new Response() );
                     response->setCorrelationId( command->getCommandId() );
@@ -332,6 +336,7 @@
 
             stateTracker.setMaxCacheSize( this->getMaxCacheSize() );
             stateTracker.setTrackMessages( this->isTrackMessages() );
+            stateTracker.setTrackTransactionProducers( this->isTrackTransactionProducers() );
 
             if( connectedTransport != NULL ) {
                 stateTracker.restore( connectedTransport );
@@ -454,7 +459,6 @@
 
         // Hand off to the close task so it gets done in a different thread.
         closeTask->add( transport );
-        taskRunner->wakeup();
 
         synchronized( &reconnectMutex ) {
 
@@ -463,14 +467,19 @@
             connectedTransportURI.reset( NULL );
             connected = false;
 
+            // Notify before we attempt to reconnect so that the consumers have a chance
+            // to cleanup their state.
+            if( transportListener != NULL ) {
+                transportListener->transportInterrupted();
+            }
+
+            // Place the State Tracker into a reconnection state.
+            this->stateTracker.transportInterrupted();
+
             if( started ) {
                 taskRunner->wakeup();
             }
         }
-
-        if( transportListener != NULL ) {
-            transportListener->transportInterrupted();
-        }
     }
 }
 
@@ -578,6 +587,10 @@
                             transport->setTransportListener( disposedListener.get() );
                         }
 
+                        try{
+                            transport->stop();
+                        } catch(...) {}
+
                         // Hand off to the close task so it gets done in a different thread
                         // this prevents a deadlock from occurring if the Transport happens
                         // to call back through our onException method or locks in some other
@@ -616,6 +629,10 @@
                     transportListener->transportResumed();
                 }
 
+                if( firstConnection ) {
+                    firstConnection = false;
+                }
+
                 //std::cout << "Failover: Successfully connected to Broker at: "
                 //          << connectedTransportURI->toString() << std::endl;
 
@@ -623,7 +640,17 @@
             }
         }
 
-        if( maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts ) {
+        int reconnectAttempts = 0;
+        if( firstConnection ) {
+            if( startupMaxReconnectAttempts != 0 ) {
+                reconnectAttempts = startupMaxReconnectAttempts;
+            }
+        }
+        if( reconnectAttempts == 0 ) {
+            reconnectAttempts = maxReconnectAttempts;
+        }
+
+        if( reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts ) {
             connectionFailure = failure;
 
             // Make sure on initial startup, that the transportListener has been initialized
@@ -696,3 +723,11 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
     AMQ_CATCHALL_THROW( IOException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId ) {
+
+    synchronized( &reconnectMutex ) {
+        stateTracker.connectionInterruptProcessingComplete( this, connectionId );
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Fri Feb  5 00:45:07 2010
@@ -21,6 +21,7 @@
 #include <activemq/util/Config.h>
 
 #include <activemq/commands/Command.h>
+#include <activemq/commands/ConnectionId.h>
 #include <activemq/threads/TaskRunner.h>
 #include <activemq/threads/CompositeTaskRunner.h>
 #include <activemq/state/ConnectionStateTracker.h>
@@ -67,10 +68,14 @@
         bool useExponentialBackOff;
         bool initialized;
         int maxReconnectAttempts;
+        int startupMaxReconnectAttempts;
         int connectFailures;
         long long reconnectDelay;
         bool trackMessages;
+        bool trackTransactionProducers;
         int maxCacheSize;
+        bool connectionInterruptProcessingComplete;
+        bool firstConnection;
 
         mutable decaf::util::concurrent::Mutex reconnectMutex;
         mutable decaf::util::concurrent::Mutex sleepMutex;
@@ -368,6 +373,14 @@
             this->maxReconnectAttempts = value;
         }
 
+        int getStartupMaxReconnectAttempts() const {
+            return this->startupMaxReconnectAttempts;
+        }
+
+        void setStartupMaxReconnectAttempts( int value ) {
+            this->startupMaxReconnectAttempts = value;
+        }
+
         long long getReconnectDelay() const {
             return this->reconnectDelay;
         }
@@ -400,6 +413,14 @@
             this->trackMessages = value;
         }
 
+        bool isTrackTransactionProducers() const {
+            return this->trackTransactionProducers;
+        }
+
+        void setTrackTransactionProducers( bool value ) {
+            this->trackTransactionProducers = value;
+        }
+
         int getMaxCacheSize() const {
             return this->maxCacheSize;
         }
@@ -408,6 +429,8 @@
             this->maxCacheSize = value;
         }
 
+        void setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId );
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Fri Feb  5 00:45:07 2010
@@ -95,6 +95,8 @@
             Boolean::parseBoolean( properties.getProperty( "useExponentialBackOff", "true" ) ) );
         transport->setMaxReconnectAttempts(
             Integer::parseInt( properties.getProperty( "maxReconnectAttempts", "0" ) ) );
+        transport->setStartupMaxReconnectAttempts(
+            Integer::parseInt( properties.getProperty( "startupMaxReconnectAttempts", "0" ) ) );
         transport->setRandomize(
             Boolean::parseBoolean( properties.getProperty( "randomize", "true" ) ) );
         transport->setBackup(



Mime
View raw message