activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1446974 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover: BackupTransportPool.cpp BackupTransportPool.h FailoverTransport.cpp FailoverTransport.h FailoverTransportFactory.cpp
Date Sat, 16 Feb 2013 23:04:56 GMT
Author: tabish
Date: Sat Feb 16 23:04:56 2013
New Revision: 1446974

URL: http://svn.apache.org/r1446974
Log:
https://issues.apache.org/jira/browse/AMQCPP-463

Support priority backups and better support broker led re-balancing via ConnectionControl messages

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp?rev=1446974&r1=1446973&r2=1446974&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp Sat Feb 16 23:04:56 2013
@@ -35,19 +35,48 @@ using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
+using namespace decaf::net;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace transport {
+namespace failover {
+
+    class BackupTransportPoolImpl {
+    private:
+
+        BackupTransportPoolImpl(const BackupTransportPoolImpl&);
+        BackupTransportPoolImpl& operator= (const BackupTransportPoolImpl&);
+
+    public:
+
+        LinkedList< Pointer<BackupTransport> > backups;
+        volatile bool pending;
+        volatile bool closed;
+        volatile int priorityBackups;
+
+        BackupTransportPoolImpl() : backups(), pending(false), closed(false), priorityBackups(0) {
+        }
+
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
 BackupTransportPool::BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
                                          const Pointer<CloseTransportsTask> closeTask,
-                                         const Pointer<URIPool> uriPool) : backups(),
-                                                                           taskRunner(taskRunner),
-                                                                           closeTask(closeTask),
-                                                                           uriPool(uriPool),
-                                                                           backupPoolSize(1),
-                                                                           enabled(false),
-                                                                           pending(false) {
+                                         const Pointer<URIPool> uriPool,
+                                         const Pointer<URIPool> updates,
+                                         const Pointer<URIPool> priorityUriPool) : impl(new BackupTransportPoolImpl),
+                                                                                   taskRunner(taskRunner),
+                                                                                   closeTask(closeTask),
+                                                                                   uriPool(uriPool),
+                                                                                   updates(updates),
+                                                                                   priorityUriPool(priorityUriPool),
+                                                                                   backupPoolSize(1),
+                                                                                   enabled(false) {
 
     if (taskRunner == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
@@ -57,6 +86,10 @@ BackupTransportPool::BackupTransportPool
         throw NullPointerException(__FILE__, __LINE__, "URIPool passed is NULL");
     }
 
+    if (priorityUriPool == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Piroirty URIPool passed is NULL");
+    }
+
     if (closeTask == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
     }
@@ -70,13 +103,16 @@ BackupTransportPool::BackupTransportPool
 BackupTransportPool::BackupTransportPool(int backupPoolSize,
                                          const Pointer<CompositeTaskRunner> taskRunner,
                                          const Pointer<CloseTransportsTask> closeTask,
-                                         const Pointer<URIPool> uriPool ) : backups(),
-                                                                            taskRunner(taskRunner),
-                                                                            closeTask(closeTask),
-                                                                            uriPool(uriPool),
-                                                                            backupPoolSize(backupPoolSize),
-                                                                            enabled(false),
-                                                                            pending(false) {
+                                         const Pointer<URIPool> uriPool,
+                                         const Pointer<URIPool> updates,
+                                         const Pointer<URIPool> priorityUriPool) : impl(new BackupTransportPoolImpl),
+                                                                                   taskRunner(taskRunner),
+                                                                                   closeTask(closeTask),
+                                                                                   uriPool(uriPool),
+                                                                                   updates(updates),
+                                                                                   priorityUriPool(priorityUriPool),
+                                                                                   backupPoolSize(backupPoolSize),
+                                                                                   enabled(false) {
 
     if (taskRunner == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
@@ -86,6 +122,10 @@ BackupTransportPool::BackupTransportPool
         throw NullPointerException(__FILE__, __LINE__, "URIPool passed is NULL");
     }
 
+    if (priorityUriPool == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Piroirty URIPool passed is NULL");
+    }
+
     if (closeTask == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
     }
@@ -98,17 +138,41 @@ BackupTransportPool::BackupTransportPool
 ////////////////////////////////////////////////////////////////////////////////
 BackupTransportPool::~BackupTransportPool() {
     this->taskRunner->removeTask(this);
+
+    try {
+        delete this->impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BackupTransportPool::close() {
+
+    if (this->impl->closed) {
+        return;
+    }
+
+    synchronized(&this->impl->backups) {
+        this->enabled = false;
+        this->impl->closed = true;
+        this->impl->backups.clear();
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void BackupTransportPool::setEnabled(bool value) {
+
+    if (this->impl->closed) {
+        return;
+    }
+
     this->enabled = value;
 
     if (enabled == true) {
         this->taskRunner->wakeup();
     } else {
-        synchronized(&backups) {
-            this->backups.clear();
+        synchronized(&this->impl->backups) {
+            this->impl->backups.clear();
         }
     }
 }
@@ -122,14 +186,14 @@ Pointer<BackupTransport> BackupTransport
 
     Pointer<BackupTransport> result;
 
-    synchronized(&backups) {
-        if (!backups.isEmpty()) {
-            result = backups.removeAt(0);
+    synchronized(&this->impl->backups) {
+        if (!this->impl->backups.isEmpty()) {
+            result = this->impl->backups.removeAt(0);
         }
     }
 
     // Flag as pending so the task gets run again and new backups are created.
-    this->pending = true;
+    this->impl->pending = true;
     this->taskRunner->wakeup();
 
     return result;
@@ -139,7 +203,7 @@ Pointer<BackupTransport> BackupTransport
 bool BackupTransportPool::isPending() const {
 
     if (this->isEnabled()) {
-        return this->pending;
+        return this->impl->pending;
     }
 
     return false;
@@ -150,9 +214,16 @@ bool BackupTransportPool::iterate() {
 
     LinkedList<URI> failures;
 
-    synchronized(&backups) {
+    synchronized(&this->impl->backups) {
 
-        while (isEnabled() && (int) backups.size() < backupPoolSize) {
+        Pointer<URIPool> uriPool = this->uriPool;
+
+        // We prefer the Broker updated URIs list if it has any URIs.
+        if (!updates->isEmpty()) {
+            uriPool = updates;
+        }
+
+        while (isEnabled() && (int) this->impl->backups.size() < backupPoolSize) {
 
             URI connectTo;
 
@@ -168,23 +239,37 @@ bool BackupTransportPool::iterate() {
             Pointer<BackupTransport> backup(new BackupTransport(this));
             backup->setUri(connectTo);
 
+            if (priorityUriPool->contains(connectTo)) {
+                backup->setPriority(true);
+            }
+
             try {
                 Pointer<Transport> transport = createTransport(connectTo);
+
                 transport->setTransportListener(backup.get());
                 transport->start();
                 backup->setTransport(transport);
-                backups.add(backup);
+
+                // Put any priority connections first so a reconnect picks them
+                // up automatically.
+                if (backup->isPriority()) {
+                    this->impl->priorityBackups++;
+                    this->impl->backups.addFirst(backup);
+                } else {
+                    this->impl->backups.addLast(backup);
+                }
             } catch (...) {
                 // Store it in the list of URIs that didn't work, once done we
                 // return those to the pool.
                 failures.add(connectTo);
             }
         }
+
+        // return all failures to the URI Pool, we can try again later.
+        uriPool->addURIs(failures);
     }
 
-    // return all failures to the URI Pool, we can try again later.
-    uriPool->addURIs(failures);
-    this->pending = false;
+    this->impl->pending = false;
 
     return false;
 }
@@ -192,15 +277,19 @@ bool BackupTransportPool::iterate() {
 ////////////////////////////////////////////////////////////////////////////////
 void BackupTransportPool::onBackupTransportFailure(BackupTransport* failedTransport) {
 
-    synchronized(&backups) {
+    synchronized(&this->impl->backups) {
 
-        std::auto_ptr<Iterator<Pointer<BackupTransport> > > iter(backups.iterator());
+        std::auto_ptr<Iterator<Pointer<BackupTransport> > > iter(this->impl->backups.iterator());
 
         while (iter->hasNext()) {
             if (iter->next() == failedTransport) {
                 iter->remove();
             }
 
+            if (failedTransport->isPriority() && this->impl->priorityBackups > 0) {
+                this->impl->priorityBackups--;
+            }
+
             this->uriPool->addURI(failedTransport->getUri());
             this->closeTask->add(failedTransport->getTransport());
             this->taskRunner->wakeup();
@@ -209,6 +298,16 @@ void BackupTransportPool::onBackupTransp
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool BackupTransportPool::isPriorityBackupAvailable() const {
+    bool result = false;
+    synchronized(&this->impl->backups) {
+        result = this->impl->priorityBackups > 0;
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 Pointer<Transport> BackupTransportPool::createTransport(const URI& location) const {
 
     try {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h?rev=1446974&r1=1446973&r2=1446974&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h Sat Feb 16 23:04:56 2013
@@ -37,33 +37,47 @@ namespace failover {
     using decaf::util::LinkedList;
     using activemq::threads::CompositeTaskRunner;
 
+    class BackupTransportPoolImpl;
+
     class AMQCPP_API BackupTransportPool : public activemq::threads::CompositeTask {
     private:
 
         friend class BackupTransport;
 
-        mutable LinkedList< Pointer<BackupTransport> > backups;
+        BackupTransportPoolImpl* impl;
+
         Pointer<CompositeTaskRunner> taskRunner;
         Pointer<CloseTransportsTask> closeTask;
         Pointer<URIPool> uriPool;
+        Pointer<URIPool> updates;
+        Pointer<URIPool> priorityUriPool;
         volatile int backupPoolSize;
         volatile bool enabled;
-        volatile bool pending;
+        volatile int maxReconnectDelay;
 
     public:
 
         BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
                             const Pointer<CloseTransportsTask> closeTask,
-                            const Pointer<URIPool> uriPool);
+                            const Pointer<URIPool> uriPool,
+                            const Pointer<URIPool> updates,
+                            const Pointer<URIPool> priorityUriPool);
 
         BackupTransportPool(int backupPoolSize,
                             const Pointer<CompositeTaskRunner> taskRunner,
                             const Pointer<CloseTransportsTask> closeTask,
-                            const Pointer<URIPool> uriPool);
+                            const Pointer<URIPool> uriPool,
+                            const Pointer<URIPool> updates,
+                            const Pointer<URIPool> priorityUriPool);
 
         virtual ~BackupTransportPool();
 
         /**
+         * Closes down the pool and destroys any Backups contained in the pool.
+         */
+        void close();
+
+        /**
          * Return true if we don't currently have enough Connected Transports
          */
         virtual bool isPending() const;
@@ -116,6 +130,14 @@ namespace failover {
          */
         void setEnabled(bool value);
 
+        /**
+         * Returns true if there is a Backup in the pool that's on the priority
+         * backups list.
+         *
+         * @returns true if there is a priority backup available.
+         */
+        bool isPriorityBackupAvailable() const;
+
     private:
 
         // The backups report their failure to the pool, the pool removes them
@@ -123,7 +145,7 @@ namespace failover {
         // the internal transport to the close transport's task for cleanup.
         void onBackupTransportFailure(BackupTransport* failedTransport);
 
-        Pointer<Transport> createTransport(const URI& location) const;
+        Pointer<Transport> createTransport(const decaf::net::URI& location) const;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1446974&r1=1446973&r2=1446974&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Sat Feb 16 23:04:56 2013
@@ -23,8 +23,17 @@
 #include <activemq/transport/TransportRegistry.h>
 #include <activemq/threads/DedicatedTaskRunner.h>
 #include <activemq/threads/CompositeTaskRunner.h>
+#include <activemq/transport/failover/BackupTransportPool.h>
+#include <activemq/transport/failover/URIPool.h>
+#include <activemq/transport/failover/FailoverTransportListener.h>
+#include <activemq/transport/failover/CloseTransportsTask.h>
+#include <activemq/transport/failover/URIPool.h>
 #include <decaf/util/Random.h>
 #include <decaf/util/StringTokenizer.h>
+#include <decaf/util/LinkedList.h>
+#include <decaf/util/StlMap.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/Mutex.h>
 #include <decaf/lang/System.h>
 #include <decaf/lang/Integer.h>
 
@@ -33,12 +42,14 @@ using namespace activemq;
 using namespace activemq::state;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
+using namespace activemq::threads;
 using namespace activemq::transport;
 using namespace activemq::transport::failover;
 using namespace decaf;
 using namespace decaf::io;
 using namespace decaf::net;
 using namespace decaf::util;
+using namespace decaf::util::concurrent;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
@@ -53,6 +64,9 @@ namespace failover {
         FailoverTransportImpl(const FailoverTransportImpl&);
         FailoverTransportImpl& operator= (const FailoverTransportImpl&);
 
+        static const int DEFAULT_INITIAL_RECONNECT_DELAY;
+        static const int INFINITE;
+
     public:
 
         bool closed;
@@ -78,16 +92,20 @@ namespace failover {
         bool reconnectSupported;
         bool rebalanceUpdateURIs;
         bool priorityBackup;
+        bool backupsEnabled;
+
+        bool doRebalance;
+        bool connectedToPrioirty;
 
-        mutable decaf::util::concurrent::Mutex reconnectMutex;
-        mutable decaf::util::concurrent::Mutex sleepMutex;
-        mutable decaf::util::concurrent::Mutex listenerMutex;
+        mutable Mutex reconnectMutex;
+        mutable Mutex sleepMutex;
+        mutable Mutex listenerMutex;
 
-        decaf::util::StlMap<int, Pointer<Command> > requestMap;
+        StlMap<int, Pointer<Command> > requestMap;
 
         Pointer<URIPool> uris;
         Pointer<URIPool> priorityUris;
-        decaf::util::LinkedList<URI> updated;
+        Pointer<URIPool> updated;
         Pointer<URI> connectedTransportURI;
         Pointer<Transport> connectedTransport;
         Pointer<Exception> connectionFailure;
@@ -97,20 +115,22 @@ namespace failover {
         Pointer<TransportListener> disposedListener;
         Pointer<TransportListener> myTransportListener;
 
+        TransportListener* transportListener;
+
         FailoverTransportImpl(FailoverTransport* parent) :
             closed(false),
             connected(false),
             started(false),
-            timeout(-1),
-            initialReconnectDelay(10),
+            timeout(INFINITE),
+            initialReconnectDelay(DEFAULT_INITIAL_RECONNECT_DELAY),
             maxReconnectDelay(1000*30),
             backOffMultiplier(2),
             useExponentialBackOff(true),
             initialized(false),
-            maxReconnectAttempts(0),
-            startupMaxReconnectAttempts(0),
+            maxReconnectAttempts(INFINITE),
+            startupMaxReconnectAttempts(INFINITE),
             connectFailures(0),
-            reconnectDelay(10),
+            reconnectDelay(DEFAULT_INITIAL_RECONNECT_DELAY),
             trackMessages(false),
             trackTransactionProducers(true),
             maxCacheSize(128*1024),
@@ -120,13 +140,16 @@ namespace failover {
             reconnectSupported(true),
             rebalanceUpdateURIs(true),
             priorityBackup(false),
+            backupsEnabled(false),
+            doRebalance(false),
+            connectedToPrioirty(false),
             reconnectMutex(),
             sleepMutex(),
             listenerMutex(),
             requestMap(),
             uris(new URIPool()),
             priorityUris(new URIPool()),
-            updated(),
+            updated(new URIPool()),
             connectedTransportURI(),
             connectedTransport(),
             connectionFailure(),
@@ -134,9 +157,10 @@ namespace failover {
             closeTask(new CloseTransportsTask()),
             taskRunner(new CompositeTaskRunner()),
             disposedListener(),
-            myTransportListener(new FailoverTransportListener(parent)) {
+            myTransportListener(new FailoverTransportListener(parent)),
+            transportListener(NULL) {
 
-            this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris));
+            this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris, updated, priorityUris));
 
             this->taskRunner->addTask(parent);
             this->taskRunner->addTask(this->closeTask.get());
@@ -146,12 +170,114 @@ namespace failover {
             return priorityUris->contains(uri) || uris->isPriority(uri);
         }
 
+        Pointer<URIPool> getConnectList() {
+            // Pick an appropriate URI pool, updated is always preferred if updates are
+            // enabled and we have any, otherwise we fallback to our original list so that
+            // we ensure we always try something.
+            Pointer<URIPool> uris = this->uris;
+            if (this->updateURIsSupported && !this->updated->isEmpty()) {
+                uris = this->updated;
+            }
+            return uris;
+        }
+
+        void doDelay() {
+            if (reconnectDelay > 0) {
+                synchronized (&sleepMutex) {
+                    try {
+                        sleepMutex.wait(reconnectDelay);
+                    } catch (InterruptedException& e) {
+                        Thread::currentThread()->interrupt();
+                    }
+                }
+            }
+
+            if (useExponentialBackOff) {
+                // Exponential increment of reconnect delay.
+                reconnectDelay *= backOffMultiplier;
+                if (reconnectDelay > maxReconnectDelay) {
+                    reconnectDelay = maxReconnectDelay;
+                }
+            }
+        }
+
+        int calculateReconnectAttemptLimit() const {
+            int maxReconnectValue = maxReconnectAttempts;
+            if (firstConnection && startupMaxReconnectAttempts != INFINITE) {
+                maxReconnectValue = startupMaxReconnectAttempts;
+            }
+            return maxReconnectValue;
+        }
+
+        bool canReconnect() const {
+            return started && 0 != calculateReconnectAttemptLimit();
+        }
+
+        /**
+         * This must be called with the reconnect mutex locked.
+         */
+        void propagateFailureToExceptionListener() {
+            if (this->transportListener != NULL) {
+
+                Pointer<IOException> ioException;
+                try {
+                    ioException = this->connectionFailure.dynamicCast<IOException>();
+                }
+                AMQ_CATCH_NOTHROW( ClassCastException)
+
+                if (ioException != NULL) {
+                    transportListener->onException(*this->connectionFailure);
+                } else {
+                    transportListener->onException(IOException(*this->connectionFailure));
+                }
+            }
+
+            reconnectMutex.notifyAll();
+        }
+
+        void resetReconnectDelay() {
+            if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
+                reconnectDelay = initialReconnectDelay;
+            }
+        }
+
+        bool isClosedOrFailed() const {
+            return closed || connectionFailure != NULL;
+        }
+
+        bool isConnectionStateValid() const {
+            return connectedTransport != NULL && !doRebalance && !this->backups->isPriorityBackupAvailable();
+        }
+
+        void disconnect() {
+            Pointer<Transport> transport;
+            transport.swap(this->connectedTransport);
+
+            if (transport != NULL) {
+
+                if (this->disposedListener != NULL) {
+                    transport->setTransportListener(this->disposedListener.get());
+                }
+
+                // Hand off to the close task so it gets done in a different thread.
+                this->closeTask->add(transport);
+
+                if (this->connectedTransportURI != NULL) {
+                    this->uris->addURI(*this->connectedTransportURI);
+                    this->connectedTransportURI.reset(NULL);
+                }
+            }
+        }
+
     };
 
+    const int FailoverTransportImpl::DEFAULT_INITIAL_RECONNECT_DELAY = 10;
+    const int FailoverTransportImpl::INFINITE = -1;
+
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
-FailoverTransport::FailoverTransport() : stateTracker(), impl(NULL), transportListener(NULL) {
+FailoverTransport::FailoverTransport() : stateTracker(), impl(NULL) {
     this->impl = new FailoverTransportImpl(this);
     this->stateTracker.setTrackTransactions(true);
 }
@@ -170,11 +296,12 @@ FailoverTransport::~FailoverTransport() 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::add(const std::string& uri) {
+void FailoverTransport::add(bool rebalance, const std::string& uri) {
 
     try {
-        this->impl->uris->addURI(URI(uri));
-        reconnect(false);
+        if (this->impl->uris->addURI(URI(uri))) {
+            reconnect(rebalance);
+        }
     }
     AMQ_CATCHALL_NOTHROW()
 }
@@ -182,50 +309,56 @@ void FailoverTransport::add(const std::s
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::addURI(bool rebalance, const List<URI>& uris) {
 
-    std::auto_ptr<Iterator<URI> > iter(uris.iterator());
+    bool newUri = false;
 
+    std::auto_ptr<Iterator<URI> > iter(uris.iterator());
     while (iter->hasNext()) {
-        this->impl->uris->addURI(iter->next());
+        if (this->impl->uris->addURI(iter->next())) {
+            newUri = true;
+        }
     }
 
-    reconnect(rebalance);
+    if (newUri) {
+        reconnect(rebalance);
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::removeURI(bool rebalance, const List<URI>& uris) {
 
-    std::auto_ptr<Iterator<URI> > iter(uris.iterator());
+    bool changed = false;
 
+    std::auto_ptr<Iterator<URI> > iter(uris.iterator());
     synchronized( &this->impl->reconnectMutex ) {
-
-        // We need to lock so that the reconnect doesn't get kicked off until
-        // we have a chance to remove the URIs in case one of them was the one
-        // we had a connection to and it gets reinserted into the URI pool.
-
-        reconnect(rebalance);
-
         while (iter->hasNext()) {
-            this->impl->uris->removeURI(iter->next());
+            if (this->impl->uris->removeURI(iter->next())) {
+                changed = true;
+            }
         }
     }
+
+    if (changed) {
+        reconnect(rebalance);
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::reconnect(const decaf::net::URI& uri) {
 
     try {
-        this->impl->uris->addURI(uri);
-        reconnect(true);
+        if (this->impl->uris->addURI(uri)) {
+            reconnect(true);
+        }
     }
-    AMQ_CATCH_RETHROW( IOException)
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
-    AMQ_CATCHALL_THROW( IOException)
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setTransportListener(TransportListener* listener) {
     synchronized( &this->impl->listenerMutex ) {
-        this->transportListener = listener;
+        this->impl->transportListener = listener;
         this->impl->listenerMutex.notifyAll();
     }
 }
@@ -233,7 +366,7 @@ void FailoverTransport::setTransportList
 ////////////////////////////////////////////////////////////////////////////////
 TransportListener* FailoverTransport::getTransportListener() const {
     synchronized( &this->impl->listenerMutex ) {
-        return this->transportListener;
+        return this->impl->transportListener;
     }
 
     return NULL;
@@ -276,6 +409,17 @@ void FailoverTransport::oneway(const Poi
                     }
 
                     return;
+                } else if (command->isMessagePull()) {
+                    // Simulate response to MessagePull if timed as we can't honor that now.
+                    Pointer<MessagePull> pullRequest = command.dynamicCast<MessagePull>();
+                    if (pullRequest->getTimeout() != 0) {
+                        Pointer<MessageDispatch> dispatch(new MessageDispatch());
+                        dispatch->setConsumerId(pullRequest->getConsumerId());
+                        dispatch->setDestination(pullRequest->getDestination());
+                        this->impl->myTransportListener->onCommand(dispatch);
+                    }
+
+                    return;
                 }
             }
 
@@ -352,9 +496,8 @@ void FailoverTransport::oneway(const Poi
                                 this->impl->requestMap.remove(command->getCommandId());
                             }
 
-                            // Rethrow the exception so it will handled by
-                            // the outer catch
-                            throw e;
+                            // re-throw the exception so it will handled by the outer catch
+                            throw;
                         } else {
                             // Trigger the reconnect since we can't count on inactivity or
                             // other socket events to trip the failover condition.
@@ -369,8 +512,10 @@ void FailoverTransport::oneway(const Poi
                 }
             }
         }
+    } catch (InterruptedException& ex) {
+        Thread::currentThread()->interrupt();
+        throw InterruptedIOException(__FILE__, __LINE__, "FailoverTransport oneway() interrupted");
     }
-    AMQ_CATCH_NOTHROW( Exception)
     AMQ_CATCHALL_NOTHROW()
 
     if (!this->impl->closed) {
@@ -409,6 +554,9 @@ void FailoverTransport::start() {
 
             this->impl->started = true;
 
+            if (this->impl->backupsEnabled || this->impl->priorityBackup) {
+                this->impl->backups->setEnabled(true);
+            }
             this->impl->taskRunner->start();
 
             stateTracker.setMaxCacheSize(this->getMaxCacheSize());
@@ -431,10 +579,14 @@ void FailoverTransport::start() {
 void FailoverTransport::stop() {
 
     try {
+        synchronized(&this->impl->reconnectMutex) {
+            this->impl->started = false;
+            this->impl->backups->setEnabled(false);
+        }
     }
-    AMQ_CATCH_RETHROW( IOException)
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
-    AMQ_CATCHALL_THROW( IOException)
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -444,8 +596,9 @@ void FailoverTransport::close() {
 
         Pointer<Transport> transportToStop;
 
-        synchronized( &this->impl->reconnectMutex ) {
-            if (!this->impl->started) {
+        synchronized(&this->impl->reconnectMutex) {
+
+            if (this->impl->closed) {
                 return;
             }
 
@@ -463,11 +616,13 @@ void FailoverTransport::close() {
             this->impl->reconnectMutex.notifyAll();
         }
 
+        this->impl->backups->close();
+
         synchronized( &this->impl->sleepMutex ) {
             this->impl->sleepMutex.notifyAll();
         }
 
-        this->impl->taskRunner->shutdown(2000);
+        this->impl->taskRunner->shutdown(TimeUnit::MINUTES.toMillis(5));
 
         if (transportToStop != NULL) {
             transportToStop->close();
@@ -487,26 +642,14 @@ void FailoverTransport::reconnect(bool r
         if (this->impl->started) {
 
             if (rebalance) {
-
-                transport.swap(this->impl->connectedTransport);
-
-                if (transport != NULL) {
-
-                    if (this->impl->disposedListener != NULL) {
-                        transport->setTransportListener(this->impl->disposedListener.get());
-                    }
-
-                    // Hand off to the close task so it gets done in a different thread.
-                    this->impl->closeTask->add(transport);
-
-                    if (this->impl->connectedTransportURI != NULL) {
-                        this->impl->uris->addURI(*this->impl->connectedTransportURI);
-                        this->impl->connectedTransportURI.reset(NULL);
-                    }
-                }
+                this->impl->doRebalance = true;
             }
 
-            this->impl->taskRunner->wakeup();
+            try {
+                this->impl->taskRunner->wakeup();
+            } catch (InterruptedException& ex) {
+                Thread::currentThread()->interrupt();
+            }
         }
     }
 }
@@ -541,7 +684,7 @@ void FailoverTransport::restoreTransport
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error AMQCPP_UNUSED) {
+void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error) {
 
     Pointer<Transport> transport;
     synchronized(&this->impl->reconnectMutex) {
@@ -557,10 +700,15 @@ void FailoverTransport::handleTransportF
         // Hand off to the close task so it gets done in a different thread.
         this->impl->closeTask->add(transport);
 
+        bool reconnectOk = false;
+
         synchronized(&this->impl->reconnectMutex) {
 
+            reconnectOk = this->impl->canReconnect();
+            URI failedUri = *this->impl->connectedTransportURI;
+
             this->impl->initialized = false;
-            this->impl->uris->addURI(*this->impl->connectedTransportURI);
+            this->impl->uris->addURI(failedUri);
             this->impl->connectedTransportURI.reset(NULL);
             this->impl->connected = false;
 
@@ -569,12 +717,16 @@ void FailoverTransport::handleTransportF
 
             // Notify before we attempt to reconnect so that the consumers have a chance
             // to cleanup their state.
-            if (transportListener != NULL) {
-                transportListener->transportInterrupted();
+            if (this->impl->transportListener != NULL) {
+                this->impl->transportListener->transportInterrupted();
             }
 
-            if (this->impl->started) {
+            if (reconnectOk) {
+                this->impl->updated->removeURI(failedUri);
                 this->impl->taskRunner->wakeup();
+            } else if (!this->impl->closed) {
+                this->impl->connectionFailure.reset(error.clone());
+                this->impl->propagateFailureToExceptionListener();
             }
         }
     }
@@ -644,8 +796,8 @@ void FailoverTransport::updateURIs(bool 
 
     if (isUpdateURIsSupported()) {
 
-        LinkedList<URI> copy(this->impl->updated);
-        LinkedList<URI> add;
+        Pointer<URIPool> copy(new URIPool(*this->impl->updated));
+        this->impl->updated->clear();
 
         if (!updatedURIs.isEmpty()) {
 
@@ -658,25 +810,15 @@ void FailoverTransport::updateURIs(bool 
             Pointer<Iterator<URI> > setIter(set.iterator());
             while (setIter->hasNext()) {
                 URI value = setIter->next();
-                if (copy.remove(value) == false) {
-                    add.add(value);
-                }
+                this->impl->updated->addURI(value);
             }
 
-            synchronized( &this->impl->reconnectMutex ) {
-
-                this->impl->updated.clear();
-                Pointer<Iterator<URI> > listIter1(add.iterator());
-                while (listIter1->hasNext()) {
-                    this->impl->updated.add(listIter1->next());
-                }
+            if (!(copy->isEmpty() && this->impl->updated->isEmpty()) &&
+                !(copy->equals(*this->impl->updated))) {
 
-                Pointer<Iterator<URI> > listIter2(copy.iterator());
-                while (listIter2->hasNext()) {
-                    this->impl->uris->removeURI(listIter2->next());
+                synchronized(&this->impl->reconnectMutex) {
+                    reconnect(rebalance);
                 }
-
-                this->addURI(rebalance, add);
             }
         }
     }
@@ -687,19 +829,11 @@ bool FailoverTransport::isPending() cons
     bool result = false;
 
     synchronized(&this->impl->reconnectMutex) {
-        if (this->impl->connectedTransport == NULL && !this->impl->closed && this->impl->started) {
+        if (!this->impl->isConnectionStateValid() && this->impl->started && !this->impl->isClosedOrFailed()) {
 
-            int reconnectAttempts = 0;
-            if (this->impl->firstConnection) {
-                if (this->impl->startupMaxReconnectAttempts != 0) {
-                    reconnectAttempts = this->impl->startupMaxReconnectAttempts;
-                }
-            }
-            if (reconnectAttempts == 0) {
-                reconnectAttempts = this->impl->maxReconnectAttempts;
-            }
+            int maxReconnectAttempts = this->impl->calculateReconnectAttemptLimit();
 
-            if (reconnectAttempts > 0 && this->impl->connectFailures >= reconnectAttempts) {
+            if (maxReconnectAttempts != -1 && this->impl->connectFailures >= maxReconnectAttempts) {
                 result = false;
             } else {
                 result = true;
@@ -717,45 +851,124 @@ bool FailoverTransport::iterate() {
 
     synchronized( &this->impl->reconnectMutex ) {
 
-        if (this->impl->closed || this->impl->connectionFailure != NULL) {
+        if (this->impl->isClosedOrFailed()) {
             this->impl->reconnectMutex.notifyAll();
         }
 
-        if (this->impl->connectedTransport != NULL || this->impl->closed || this->impl->connectionFailure != NULL) {
+        if (this->impl->isConnectionStateValid() || this->impl->isClosedOrFailed()) {
             return false;
         } else {
 
-            LinkedList<URI> failures;
-            Pointer<Transport> transport;
-            URI uri;
+            Pointer<URIPool> connectList = this->impl->getConnectList();
 
-            if (!this->impl->useExponentialBackOff) {
-                this->impl->reconnectDelay = this->impl->initialReconnectDelay;
-            }
+            if (connectList->isEmpty()) {
+                failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for reconnect."));
+            } else {
 
-            if (this->impl->backups->isEnabled()) {
+                if (this->impl->doRebalance) {
+                    if (connectList->getPriorityURI().equals(*this->impl->connectedTransportURI)) {
+                        // already connected to first in the list, no need to rebalance
+                        this->impl->doRebalance = false;
+                        return false;
+                    } else {
+                        // break any existing connect for rebalance.
+                        this->impl->disconnect();
+                    }
 
-                Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup();
+                    this->impl->doRebalance = false;
+                }
 
-                if (backupTransport != NULL) {
+                this->impl->resetReconnectDelay();
 
-                    transport = backupTransport->getTransport();
-                    uri = backupTransport->getUri();
-                    transport->setTransportListener(this->impl->myTransportListener.get());
+                LinkedList<URI> failures;
+                Pointer<Transport> transport;
+                URI uri;
+
+                if (this->impl->backups->isEnabled()) {
+                    Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup();
+                    if (backupTransport != NULL) {
+                        transport = backupTransport->getTransport();
+                        uri = backupTransport->getUri();
+                        if (this->impl->priorityBackup && this->impl->backups->isPriorityBackupAvailable()) {
+                            // A priority connection is available and we aren't connected to
+                            // any other priority transports so disconnect and use the backup.
+                            this->impl->disconnect();
+                        }
+                    }
+                }
 
+                // Sleep for the reconnectDelay if there's no backup and we aren't trying
+                // for the first time, or we were disposed for some reason.
+                if (transport == NULL && !this->impl->firstConnection &&
+                    (this->impl->reconnectDelay > 0) && !this->impl->closed) {
+                    synchronized (&this->impl->sleepMutex) {
+                        try {
+                            this->impl->sleepMutex.wait(this->impl->reconnectDelay);
+                        } catch (InterruptedException& e) {
+                            Thread::currentThread()->interrupt();
+                        }
+                    }
+                }
+
+                while (transport == NULL && this->impl->connectedTransport == NULL && !this->impl->closed) {
                     try {
+                        // We could be starting the loop with a backup already.
+                        if (transport == NULL) {
+                            try {
+                                uri = connectList->getURI();
+                            } catch (NoSuchElementException& ex) {
+                                break;
+                            }
+
+                            transport = createTransport(uri);
+                        }
+
+                        transport->setTransportListener(this->impl->myTransportListener.get());
+                        transport->start();
 
-                        if (this->impl->started) {
+                        if (this->impl->started && !this->impl->firstConnection) {
                             restoreTransport(transport);
                         }
 
+                        this->impl->reconnectDelay = this->impl->initialReconnectDelay;
+                        this->impl->connectedTransportURI.reset(new URI(uri));
+                        this->impl->connectedTransport = transport;
+                        this->impl->reconnectMutex.notifyAll();
+                        this->impl->connectFailures = 0;
+
+                        // Make sure on initial startup, that the transportListener
+                        // has been initialized for this instance.
+                        synchronized(&this->impl->listenerMutex) {
+                            if (this->impl->transportListener == NULL) {
+                                // if it isn't set after 2secs - it probably never will be
+                                this->impl->listenerMutex.wait(2000);
+                            }
+                        }
+
+                        if (this->impl->transportListener != NULL) {
+                            this->impl->transportListener->transportResumed();
+                        }
+
+                        if (this->impl->firstConnection) {
+                            this->impl->firstConnection = false;
+                        }
+
+                        this->impl->connected = true;
+                        return false;
+
                     } catch (Exception& e) {
+                        e.setMark(__FILE__, __LINE__);
 
                         if (transport != NULL) {
                             if (this->impl->disposedListener != NULL) {
                                 transport->setTransportListener(this->impl->disposedListener.get());
                             }
 
+                            try {
+                                transport->stop();
+                            } catch (...) {
+                            }
+
                             // Hand off to the close task so it gets done in a different thread
                             // this prevents a deadlock from occurring if the Transport happens
                             // to call back through our onException method or locks in some other
@@ -765,142 +978,36 @@ bool FailoverTransport::iterate() {
                             transport.reset(NULL);
                         }
 
-                        this->impl->uris->addURI(uri);
+                        failures.add(uri);
+                        failure.reset(e.clone());
                     }
                 }
-            }
-
-            while (transport == NULL && !this->impl->closed) {
-
-                try {
-                    uri = this->impl->uris->getURI();
-                } catch (NoSuchElementException& ex) {
-                    break;
-                }
-
-                try {
 
-                    transport = createTransport(uri);
-                    transport->setTransportListener(this->impl->myTransportListener.get());
-                    transport->start();
-
-                    if (this->impl->started) {
-                        restoreTransport(transport);
-                    }
-
-                } catch (Exception& e) {
-                    e.setMark(__FILE__, __LINE__);
-
-                    if (transport != NULL) {
-                        if (this->impl->disposedListener != NULL) {
-                            transport->setTransportListener(this->impl->disposedListener.get());
-                        }
-
-                        try {
-                            transport->stop();
-                        } catch (...) {
-                        }
-
-                        // Hand off to the close task so it gets done in a different thread
-                        // this prevents a deadlock from occurring if the Transport happens
-                        // to call back through our onException method or locks in some other
-                        // way.
-                        this->impl->closeTask->add(transport);
-                        this->impl->taskRunner->wakeup();
-                        transport.reset(NULL);
-                    }
-
-                    failures.add(uri);
-                    failure.reset(e.clone());
-                }
-            }
-
-            // Return the failures to the pool, we will try again on the next iteration.
-            this->impl->uris->addURIs(failures);
-
-            if (transport != NULL) {
-                this->impl->reconnectDelay = this->impl->initialReconnectDelay;
-                this->impl->connectedTransportURI.reset(new URI(uri));
-                this->impl->connectedTransport = transport;
-                this->impl->reconnectMutex.notifyAll();
-                this->impl->connectFailures = 0;
-                this->impl->connected = true;
-
-                // Make sure on initial startup, that the transportListener
-                // has been initialized for this instance.
-                synchronized(&this->impl->listenerMutex) {
-                    if (transportListener == NULL) {
-                        // if it isn't set after 2secs - it probably never will be
-                        this->impl->listenerMutex.wait(2000);
-                    }
-                }
-
-                if (transportListener != NULL) {
-                    transportListener->transportResumed();
-                }
-
-                if (this->impl->firstConnection) {
-                    this->impl->firstConnection = false;
-                }
-
-                return false;
+                // Return the failures to the pool, we will try again on the next iteration.
+                connectList->addURIs(failures);
             }
         }
 
-        int reconnectAttempts = 0;
-        if (this->impl->firstConnection) {
-            if (this->impl->startupMaxReconnectAttempts != 0) {
-                reconnectAttempts = this->impl->startupMaxReconnectAttempts;
-            }
-        }
-        if (reconnectAttempts == 0) {
-            reconnectAttempts = this->impl->maxReconnectAttempts;
-        }
+        int reconnectAttempts = this->impl->calculateReconnectAttemptLimit();
 
-        if (reconnectAttempts > 0 && ++this->impl->connectFailures >= reconnectAttempts) {
+        if (reconnectAttempts >= 0 && ++this->impl->connectFailures >= reconnectAttempts) {
             this->impl->connectionFailure = failure;
 
             // Make sure on initial startup, that the transportListener has been initialized
             // for this instance.
             synchronized(&this->impl->listenerMutex) {
-                if (transportListener == NULL) {
+                if (this->impl->transportListener == NULL) {
                     this->impl->listenerMutex.wait(2000);
                 }
             }
 
-            if (transportListener != NULL) {
-
-                Pointer<IOException> ioException;
-                try {
-                    ioException = this->impl->connectionFailure.dynamicCast<IOException>();
-                }
-                AMQ_CATCH_NOTHROW( ClassCastException)
-
-                if (ioException != NULL) {
-                    transportListener->onException(*this->impl->connectionFailure);
-                } else {
-                    transportListener->onException(IOException(*this->impl->connectionFailure));
-                }
-            }
-
-            this->impl->reconnectMutex.notifyAll();
+            this->impl->propagateFailureToExceptionListener();
             return false;
         }
     }
 
     if (!this->impl->closed) {
-
-        synchronized(&this->impl->sleepMutex) {
-            this->impl->sleepMutex.wait((unsigned int) this->impl->reconnectDelay);
-        }
-
-        if (this->impl->useExponentialBackOff) {
-            // Exponential increment of reconnect delay.
-            this->impl->reconnectDelay *= this->impl->backOffMultiplier;
-            if (this->impl->reconnectDelay > this->impl->maxReconnectDelay) {
-                this->impl->reconnectDelay = this->impl->maxReconnectDelay;
-            }
-        }
+        this->impl->doDelay();
     }
 
     return !this->impl->closed;
@@ -1095,12 +1202,12 @@ void FailoverTransport::setReconnectDela
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isBackup() const {
-    return this->impl->backups->isEnabled();
+    return this->impl->backupsEnabled;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setBackup(bool value) {
-    this->impl->backups->setEnabled(value);
+    this->impl->backupsEnabled = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1185,14 +1292,12 @@ void FailoverTransport::setPriorityBacku
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setPriorityURIs(const std::string& priorityURIs AMQCPP_UNUSED) {
-//    StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
-//    while (tokenizer.hasMoreTokens()) {
-//        String str = tokenizer.nextToken();
-//        try {
-//            URI uri = new URI(str);
-//            priorityList.add(uri);
-//        } catch (Exception e) {
-//            LOG.error("Failed to parse broker address: " + str, e);
-//        }
-//    }
+    StringTokenizer tokenizer(priorityURIs, ",");
+    while (tokenizer.hasMoreTokens()) {
+        std::string str = tokenizer.nextToken();
+        try {
+            this->impl->priorityUris->addURI(URI(str));
+        } catch (Exception& e) {
+        }
+    }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1446974&r1=1446973&r2=1446974&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Sat Feb 16 23:04:56 2013
@@ -26,17 +26,9 @@
 #include <activemq/threads/CompositeTaskRunner.h>
 #include <activemq/state/ConnectionStateTracker.h>
 #include <activemq/transport/CompositeTransport.h>
-#include <activemq/transport/failover/BackupTransportPool.h>
-#include <activemq/transport/failover/CloseTransportsTask.h>
-#include <activemq/transport/failover/FailoverTransportListener.h>
-#include <activemq/transport/failover/URIPool.h>
 #include <activemq/wireformat/WireFormat.h>
 
-#include <decaf/util/LinkedList.h>
-#include <decaf/util/StlMap.h>
 #include <decaf/util/Properties.h>
-#include <decaf/util/concurrent/Mutex.h>
-#include <decaf/util/concurrent/atomic/AtomicReference.h>
 #include <decaf/net/URI.h>
 #include <decaf/io/IOException.h>
 
@@ -48,6 +40,8 @@ namespace failover {
     using activemq::commands::Command;
     using activemq::commands::Response;
 
+    class FailoverTransportListener;
+    class BackupTransportPool;
     class FailoverTransportImpl;
 
     class AMQCPP_API FailoverTransport : public CompositeTransport,
@@ -55,13 +49,12 @@ namespace failover {
     private:
 
         friend class FailoverTransportListener;
+        friend class BackupTransportPool;
 
         state::ConnectionStateTracker stateTracker;
 
         FailoverTransportImpl* impl;
 
-        TransportListener* transportListener;
-
     private:
 
         FailoverTransport(const FailoverTransport&);
@@ -84,10 +77,13 @@ namespace failover {
 
         /**
          * Adds a New URI to the List of URIs this transport can Connect to.
+         *
+         * @param rebalance
+         *      Should the transport reconnect to a different broker to balance load.
          * @param uri
-         *        A String version of a URI to add to the URIs to failover to.
+         *      A String version of a URI to add to the URIs to failover to.
          */
-        void add(const std::string& uri);
+        void add(bool rebalance, const std::string& uri);
 
     public: // CompositeTransport methods
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=1446974&r1=1446973&r2=1446974&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Sat Feb 16 23:04:56 2013
@@ -87,9 +87,9 @@ Pointer<Transport> FailoverTransportFact
         transport->setUseExponentialBackOff(
             Boolean::parseBoolean(topLvlProperties.getProperty("useExponentialBackOff", "true")));
         transport->setMaxReconnectAttempts(
-            Integer::parseInt(topLvlProperties.getProperty("maxReconnectAttempts", "0")));
+            Integer::parseInt(topLvlProperties.getProperty("maxReconnectAttempts", "-1")));
         transport->setStartupMaxReconnectAttempts(
-            Integer::parseInt(topLvlProperties.getProperty("startupMaxReconnectAttempts", "0")));
+            Integer::parseInt(topLvlProperties.getProperty("startupMaxReconnectAttempts", "-1")));
         transport->setRandomize(
             Boolean::parseBoolean(topLvlProperties.getProperty("randomize", "true")));
         transport->setBackup(
@@ -104,6 +104,9 @@ Pointer<Transport> FailoverTransportFact
             Integer::parseInt(topLvlProperties.getProperty("maxCacheSize", "131072")));
         transport->setUpdateURIsSupported(
             Boolean::parseBoolean(topLvlProperties.getProperty("updateURIsSupported", "true")));
+        transport->setPriorityBackup(
+            Boolean::parseBoolean(topLvlProperties.getProperty("priorityBackup", "false")));
+        transport->setPriorityURIs(topLvlProperties.getProperty("priorityURIs", ""));
 
         transport->addURI(false, data.getComponents());
 



Mime
View raw message