activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-601
Date Tue, 14 Feb 2017 22:07:19 GMT
Repository: activemq-cpp
Updated Branches:
  refs/heads/master 59e62e175 -> 4d010e12f


https://issues.apache.org/jira/browse/AMQCPP-601

Fixes for priority backup handling to prevent stall when reconnecting
tries to use a priority instance.
(cherry picked from commit a8ff8b54e0d75fb751f5b76682c715a78ee8150d)

# Conflicts:
#	activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
#	activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
#	activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
#	activemq-cpp/src/test/testRegistry.cpp


Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/4d010e12
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/4d010e12
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/4d010e12

Branch: refs/heads/master
Commit: 4d010e12f05fc6283ed9c5558543973b2ba7b0e7
Parents: 59e62e1
Author: Timothy Bish <tabish121@gmail.com>
Authored: Tue Feb 14 16:56:48 2017 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Tue Feb 14 17:06:47 2017 -0500

----------------------------------------------------------------------
 .../transport/failover/BackupTransportPool.cpp  |  53 ++-
 .../transport/failover/FailoverTransport.cpp    |  30 +-
 .../activemq/transport/failover/URIPool.cpp     |   4 +-
 .../test/activemq/mock/MockBrokerService.cpp    |  28 +-
 .../src/test/activemq/mock/MockBrokerService.h  |   2 +
 .../failover/FailoverTransportTest.cpp          | 336 +++++++++++++++++++
 .../transport/failover/FailoverTransportTest.h  |  12 +
 activemq-cpp/src/test/testRegistry.cpp          |   9 -
 8 files changed, 429 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp b/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
index e80ff51..00993d7 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
@@ -53,14 +53,37 @@ namespace failover {
 
     public:
 
+        BackupTransportPool* pool;
+        FailoverTransport* parent;
         LinkedList< Pointer<BackupTransport> > backups;
         volatile bool pending;
         volatile bool closed;
         volatile int priorityBackups;
 
-        BackupTransportPoolImpl() : backups(), pending(false), closed(false), priorityBackups(0)
{
+        BackupTransportPoolImpl(BackupTransportPool* pool, FailoverTransport* parent) : pool(pool),
+                                                                                        parent(parent),
+                                                                                        backups(),
+                                                                                        pending(false),
+                                                                                        closed(false),
+                                                                                        priorityBackups(0)
{
         }
 
+        bool shouldBuildBackup() {
+            bool result = false;
+
+            if (pool->isEnabled()) {
+
+                // If there's no priority backup and the failover transport isn't connected
to
+                // a priority backup then we should keep trying to connect to one.
+                if (parent->isPriorityBackup() && !parent->isConnectedToPriority()
&& priorityBackups == 0) {
+                    result = true;
+                } else if (backups.size() < pool->getBackupPoolSize()) {
+                    result = true;
+                }
+            }
+
+            return result;
+        }
     };
 
 }}}
@@ -101,7 +124,7 @@ BackupTransportPool::BackupTransportPool(FailoverTransport* parent,
         throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
     }
 
-    this->impl = new BackupTransportPoolImpl();
+    this->impl = new BackupTransportPoolImpl(this, parent);
 
     // Add this instance as a Task so that we can create backups when nothing else is
     // going on.
@@ -145,7 +168,7 @@ BackupTransportPool::BackupTransportPool(FailoverTransport* parent,
         throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
     }
 
-    this->impl = new BackupTransportPoolImpl();
+    this->impl = new BackupTransportPoolImpl(this, parent);
 
     // Add this instance as a Task so that we can create backups when nothing else is
     // going on.
@@ -242,7 +265,7 @@ bool BackupTransportPool::iterate() {
 
         bool wakeupParent = false;
 
-        while (isEnabled() && (int) this->impl->backups.size() < backupPoolSize)
{
+        while (impl->shouldBuildBackup()) {
 
             URI connectTo;
 
@@ -259,14 +282,6 @@ bool BackupTransportPool::iterate() {
             Pointer<BackupTransport> backup(new BackupTransport(this));
             backup->setUri(connectTo);
 
-            if (priorityUriPool->contains(connectTo)) {
-                backup->setPriority(true);
-
-                if (!parent->isConnectedToPriority()) {
-                    wakeupParent = true;
-                }
-            }
-
             try {
                 Pointer<Transport> transport = createTransport(connectTo);
 
@@ -274,6 +289,14 @@ bool BackupTransportPool::iterate() {
                 transport->start();
                 backup->setTransport(transport);
 
+                if (priorityUriPool->contains(connectTo) || (priorityUriPool->isEmpty()
&& uriPool->isPriority(connectTo))) {
+                    backup->setPriority(true);
+
+                    if (!parent->isConnectedToPriority()) {
+                        wakeupParent = true;
+                    }
+                }
+
                 // Put any priority connections first so a reconnect picks them
                 // up automatically.
                 if (backup->isPriority()) {
@@ -282,6 +305,7 @@ bool BackupTransportPool::iterate() {
                 } else {
                     this->impl->backups.addLast(backup);
                 }
+
             } catch (...) {
                 // Store it in the list of URIs that didn't work, once done we
                 // return those to the pool.
@@ -289,15 +313,16 @@ bool BackupTransportPool::iterate() {
             }
 
             // We connected to a priority backup and the parent isn't already using one
-            // so wake it up and quick the backups process for now.
+            // so wake it up and quit the backups process for now.
             if (wakeupParent) {
-                this->parent->reconnect(true);
+                this->parent->reconnect(false);
                 break;
             }
         }
 
         // return all failures to the URI Pool, we can try again later.
         uriPool->addURIs(failures);
+
         this->impl->pending = false;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
index b5ea33b..09777ec 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
@@ -69,9 +69,9 @@ namespace failover {
 
     public:
 
-        bool closed;
-        bool connected;
-        bool started;
+        volatile bool closed;
+        volatile bool connected;
+        volatile bool started;
 
         long long timeout;
         long long initialReconnectDelay;
@@ -251,7 +251,7 @@ namespace failover {
         }
 
         bool isConnectionStateValid() const {
-            return connectedTransport != NULL && !doRebalance && !this->backups->isPriorityBackupAvailable();
+            return connectedTransport != NULL && !doRebalance && !backups->isPriorityBackupAvailable();
         }
 
         void disconnect() {
@@ -271,6 +271,10 @@ namespace failover {
                     this->uris->addURI(*this->connectedTransportURI);
                     this->connectedTransportURI.reset(NULL);
                 }
+
+                if (transportListener != NULL) {
+                    transportListener->transportInterrupted();
+                }
             }
         }
 
@@ -851,12 +855,14 @@ void FailoverTransport::updateURIs(bool rebalance, const decaf::util::List<decaf
 bool FailoverTransport::isPending() const {
     bool result = false;
 
-    synchronized(&this->impl->reconnectMutex) {
-        if (!this->impl->isConnectionStateValid() && this->impl->started
&& !this->impl->isClosedOrFailed()) {
+    synchronized(&impl->reconnectMutex) {
+        if (!impl->isConnectionStateValid() && impl->started && !impl->isClosedOrFailed())
{
 
-            int maxReconnectAttempts = this->impl->calculateReconnectAttemptLimit();
+            int maxReconnectAttempts = impl->calculateReconnectAttemptLimit();
 
-            if (maxReconnectAttempts != -1 && this->impl->connectFailures >=
maxReconnectAttempts) {
+            if (impl->firstConnection && impl->connectFailures == 0) {
+                result = true;
+            } else if (maxReconnectAttempts != -1 && impl->connectFailures >
maxReconnectAttempts) {
                 result = false;
             } else {
                 result = true;
@@ -872,7 +878,7 @@ bool FailoverTransport::iterate() {
 
     Pointer<Exception> failure;
 
-    synchronized( &this->impl->reconnectMutex ) {
+    synchronized(&this->impl->reconnectMutex) {
 
         if (this->impl->isClosedOrFailed()) {
             this->impl->reconnectMutex.notifyAll();
@@ -884,7 +890,7 @@ bool FailoverTransport::iterate() {
 
             Pointer<URIPool> connectList = this->impl->getConnectList();
 
-            if (connectList->isEmpty()) {
+            if (connectList->isEmpty() && !impl->backups->isEnabled()) {
                 failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for
reconnect."));
             } else {
 
@@ -933,7 +939,7 @@ bool FailoverTransport::iterate() {
                     }
                 }
 
-                while (transport == NULL && this->impl->connectedTransport
== NULL && !this->impl->closed) {
+                while ((transport != NULL || !connectList->isEmpty()) && this->impl->connectedTransport
== NULL && !this->impl->closed) {
                     try {
                         // We could be starting the loop with a backup already.
                         if (transport == NULL) {
@@ -958,6 +964,7 @@ bool FailoverTransport::iterate() {
                         this->impl->connectedTransport = transport;
                         this->impl->reconnectMutex.notifyAll();
                         this->impl->connectFailures = 0;
+                        this->impl->connected = true;
 
                         if (isPriorityBackup()) {
                             this->impl->connectedToPrioirty = connectList->getPriorityURI().equals(uri)
||
@@ -1006,6 +1013,7 @@ bool FailoverTransport::iterate() {
                             // this prevents a deadlock from occurring if the Transport happens
                             // to call back through our onException method or locks in some
other
                             // way.
+                            this->impl->connected = false;
                             this->impl->closeTask->add(transport);
                             this->impl->taskRunner->wakeup();
                             transport.reset(NULL);

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp b/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
index 1c7171a..7c8f947 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
@@ -161,9 +161,7 @@ bool URIPool::contains(const decaf::net::URI& uri) const {
 ////////////////////////////////////////////////////////////////////////////////
 bool URIPool::isPriority(const decaf::net::URI& uri) const {
     synchronized(&uriPool) {
-        if (!uriPool.isEmpty()) {
-            return uriPool.getFirst().equals(uri);
-        }
+        return priorityURI.equals(uri);
     }
     return false;
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp b/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
index dea1bbc..ceb6634 100644
--- a/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
+++ b/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp
@@ -55,8 +55,9 @@ namespace mock {
     class TcpServer : public lang::Thread {
     private:
 
-        bool done;
-        bool error;
+        volatile bool done;
+        volatile bool error;
+        const int configuredPort;
         Pointer<ServerSocket> server;
         Pointer<OpenWireFormat> wireFormat;
         Pointer<OpenWireResponseBuilder> responeBuilder;
@@ -65,9 +66,8 @@ namespace mock {
 
     public:
 
-        TcpServer() : Thread(), done(false), error(false), server(), wireFormat(),
+        TcpServer() : Thread(), done(false), error(false), configuredPort(0), server(), wireFormat(),
                       responeBuilder(), started(1), rand() {
-            server.reset(new ServerSocket(0));
 
             Properties properties;
 
@@ -77,9 +77,8 @@ namespace mock {
             this->rand.setSeed(System::currentTimeMillis());
         }
 
-        TcpServer(int port) : Thread(), done(false), error(false), server(), wireFormat(),
+        TcpServer(int port) : Thread(), done(false), error(false), configuredPort(port),
server(), wireFormat(),
                               responeBuilder(), started(1), rand() {
-            server.reset(new ServerSocket(port));
 
             Properties properties;
             this->wireFormat = OpenWireFormatFactory().createWireFormat(properties).dynamicCast<OpenWireFormat>();
@@ -90,6 +89,7 @@ namespace mock {
 
         virtual ~TcpServer() {
             stop();
+            waitUntilStopped();
         }
 
         int getLocalPort() {
@@ -124,7 +124,15 @@ namespace mock {
 
                     MockTransport mock(this->wireFormat, this->responeBuilder);
 
-                    std::auto_ptr<Socket> socket(server->accept());
+                    server.reset(new ServerSocket(configuredPort));
+
+                    std::auto_ptr<Socket> socket;
+                    try {
+                        socket.reset(server->accept());
+                    } catch (IOException& ioe) {
+                        continue;
+                    }
+
                     socket->setSoLinger(false, 0);
 
                     Pointer<WireFormatInfo> preferred = wireFormat->getPreferedWireFormatInfo();
@@ -147,7 +155,6 @@ namespace mock {
                         }
                     }
                 }
-
             } catch (IOException& ex) {
                 error = true;
             } catch (Exception& ex) {
@@ -228,6 +235,11 @@ void MockBrokerService::waitUntilStopped() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+int MockBrokerService::getPort() const {
+    return this->impl->server->getLocalPort();
+}
+
+////////////////////////////////////////////////////////////////////////////////
 std::string MockBrokerService::getConnectString() const {
     int port = this->impl->server->getLocalPort();
     return std::string("tcp://localhost:") + Integer::toString(port);

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test/activemq/mock/MockBrokerService.h b/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
index 10dfc8a..92f4b76 100644
--- a/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
+++ b/activemq-cpp/src/test/activemq/mock/MockBrokerService.h
@@ -57,6 +57,8 @@ namespace mock {
 
         std::string getConnectString() const;
 
+        int getPort() const;
+
     };
 
 }}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
index bc3f04f..9d51b7e 100644
--- a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
+++ b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp
@@ -24,6 +24,8 @@
 #include <activemq/commands/ActiveMQMessage.h>
 #include <activemq/commands/ConnectionControl.h>
 #include <activemq/mock/MockBrokerService.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/Mutex.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/Thread.h>
 #include <decaf/util/UUID.h>
@@ -38,6 +40,7 @@ using namespace activemq::exceptions;
 using namespace decaf::io;
 using namespace decaf::lang;
 using namespace decaf::util;
+using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
 FailoverTransportTest::FailoverTransportTest() {
@@ -700,3 +703,336 @@ void FailoverTransportTest::testConnectedToMockBroker() {
     broker1.stop();
     broker1.waitUntilStopped();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testMaxReconnectsZeroAttemptsOneConnect() {
+
+    std::string uri = "failover://(mock://localhost:61616)?maxReconnectAttempts=0";
+
+    DefaultTransportListener listener;
+    FailoverTransportFactory factory;
+
+    Pointer<Transport> transport(factory.create(uri));
+    CPPUNIT_ASSERT(transport != NULL);
+    transport->setTransportListener(&listener);
+
+    FailoverTransport* failover =
+        dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+    CPPUNIT_ASSERT(failover != NULL);
+
+    transport->start();
+
+    Thread::sleep(1000);
+    CPPUNIT_ASSERT(failover->isConnected() == true);
+
+    transport->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testMaxReconnectsHonorsConfiguration() {
+
+    // max reconnect attempts of two means one connection attempt followed by
+    // two retries.
+
+    std::string uri = "failover://(mock://localhost:61616?failOnCreate=true,"
+                                  "mock://localhost:61617?failOnCreate=true)"
+                                  "?randomize=false&maxReconnectAttempts=2";
+
+    Pointer<WireFormatInfo> info(new WireFormatInfo());
+
+    DefaultTransportListener listener;
+    FailoverTransportFactory factory;
+
+    Pointer<Transport> transport(factory.create(uri));
+    CPPUNIT_ASSERT(transport != NULL);
+    transport->setTransportListener(&listener);
+
+    FailoverTransport* failover =
+        dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+    CPPUNIT_ASSERT(failover != NULL);
+    CPPUNIT_ASSERT(failover->isRandomize() == false);
+
+    transport->start();
+
+    CPPUNIT_ASSERT_THROW_MESSAGE("Send should have failed after max connect attempts of two",
+            transport->oneway(info), Exception);
+
+    CPPUNIT_ASSERT(failover->isConnected() == false);
+
+    transport->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testStartupMaxReconnectsHonorsConfiguration() {
+
+    // max reconnect attempts of two means one connection attempt followed by
+    // two retries.
+
+    std::string uri = "failover://(mock://localhost:61616?failOnCreate=true,"
+                                  "mock://localhost:61617?failOnCreate=true)"
+                                  "?randomize=false&startupMaxReconnectAttempts=2";
+
+    Pointer<WireFormatInfo> info(new WireFormatInfo());
+
+    DefaultTransportListener listener;
+    FailoverTransportFactory factory;
+
+    Pointer<Transport> transport(factory.create(uri));
+    CPPUNIT_ASSERT(transport != NULL);
+    transport->setTransportListener(&listener);
+
+    FailoverTransport* failover =
+        dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+    CPPUNIT_ASSERT(failover != NULL);
+    CPPUNIT_ASSERT(failover->isRandomize() == false);
+
+    transport->start();
+
+    CPPUNIT_ASSERT_THROW_MESSAGE("Send should have failed after max connect attempts of two",
+            transport->oneway(info), Exception);
+
+    CPPUNIT_ASSERT(failover->isConnected() == false);
+
+    transport->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class PriorityBackupListener : public DefaultTransportListener {
+    private:
+
+        Pointer<CountDownLatch> interruptedLatch;
+        Pointer<CountDownLatch> resumedLatch;
+
+        Mutex resetMutex;
+
+    public:
+
+        PriorityBackupListener() : interruptedLatch(new CountDownLatch(1)),
+                                   resumedLatch(new CountDownLatch(1)),
+                                   resetMutex() {
+        }
+
+        virtual ~PriorityBackupListener() {}
+
+        virtual void transportInterrupted() {
+            interruptedLatch->countDown();
+        }
+
+        virtual void transportResumed() {
+            resumedLatch->countDown();
+        }
+
+        void reset() {
+            synchronized(&resetMutex) {
+                interruptedLatch.reset(new CountDownLatch(1));
+                resumedLatch.reset(new CountDownLatch(1));
+            }
+        }
+
+        bool awaitInterruption() {
+            synchronized(&resetMutex) {
+                return interruptedLatch->await(60000);
+            }
+
+            return false;
+        }
+
+        bool awaitResumed() {
+            synchronized(&resetMutex) {
+                return resumedLatch->await(60000);
+            }
+
+            return false;
+        }
+    };
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testConnectedToPriorityOnFirstTryThenFailover() {
+
+    Pointer<MockBrokerService> broker1(new MockBrokerService(61626));
+    Pointer<MockBrokerService> broker2(new MockBrokerService(61628));
+
+    broker1->start();
+    broker1->waitUntilStarted();
+
+    broker2->start();
+    broker2->waitUntilStarted();
+
+    std::string uri = "failover://(tcp://localhost:61626,"
+                                  "tcp://localhost:61628)?randomize=false&priorityBackup=true";
+
+    PriorityBackupListener listener;
+    FailoverTransportFactory factory;
+
+    Pointer<Transport> transport(factory.create(uri));
+    CPPUNIT_ASSERT(transport != NULL);
+    transport->setTransportListener(&listener);
+
+    FailoverTransport* failover =
+        dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+    CPPUNIT_ASSERT(failover != NULL);
+    CPPUNIT_ASSERT(failover->isRandomize() == false);
+    CPPUNIT_ASSERT(failover->isPriorityBackup() == true);
+
+    transport->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+    listener.reset();
+
+    CPPUNIT_ASSERT(failover->isConnected() == true);
+    CPPUNIT_ASSERT(failover->isConnectedToPriority() == true);
+
+    broker1->stop();
+    broker1->waitUntilStopped();
+
+    CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption());
+    CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+    listener.reset();
+
+    CPPUNIT_ASSERT(failover->isConnected() == true);
+    CPPUNIT_ASSERT(failover->isConnectedToPriority() == false);
+
+    transport->close();
+
+    broker1->stop();
+    broker1->waitUntilStopped();
+
+    broker2->stop();
+    broker2->waitUntilStopped();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testConnectsToPriorityOnceStarted() {
+
+    Pointer<MockBrokerService> broker1(new MockBrokerService(61626));
+    Pointer<MockBrokerService> broker2(new MockBrokerService(61628));
+
+    broker2->start();
+    broker2->waitUntilStarted();
+
+    std::string uri = "failover://(tcp://localhost:61626?transport.useInactivityMonitor=false,"
+                                  "tcp://localhost:61628?transport.useInactivityMonitor=false)?randomize=false&priorityBackup=true";
+
+    PriorityBackupListener listener;
+    FailoverTransportFactory factory;
+
+    Pointer<Transport> transport(factory.create(uri));
+    CPPUNIT_ASSERT(transport != NULL);
+    transport->setTransportListener(&listener);
+
+    FailoverTransport* failover =
+        dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+    CPPUNIT_ASSERT(failover != NULL);
+    CPPUNIT_ASSERT(failover->isRandomize() == false);
+    CPPUNIT_ASSERT(failover->isPriorityBackup() == true);
+
+    transport->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+    listener.reset();
+
+    CPPUNIT_ASSERT(failover->isConnected() == true);
+    CPPUNIT_ASSERT(failover->isConnectedToPriority() == false);
+
+    broker1->start();
+    broker1->waitUntilStarted();
+
+    CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption());
+    CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+    listener.reset();
+
+    CPPUNIT_ASSERT(failover->isConnected() == true);
+    CPPUNIT_ASSERT(failover->isConnectedToPriority() == true);
+
+    transport->close();
+
+    broker1->stop();
+    broker1->waitUntilStopped();
+
+    broker2->stop();
+    broker2->waitUntilStopped();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testConnectsToPriorityAfterInitialBackupFails() {
+
+    Pointer<MockBrokerService> broker1(new MockBrokerService(61626));
+    Pointer<MockBrokerService> broker2(new MockBrokerService(61627));
+    Pointer<MockBrokerService> broker3(new MockBrokerService(61628));
+
+    broker2->start();
+    broker2->waitUntilStarted();
+
+    broker3->start();
+    broker3->waitUntilStarted();
+
+    std::string uri = "failover://(tcp://localhost:61626?transport.useInactivityMonitor=false,"
+                                  "tcp://localhost:61627?transport.useInactivityMonitor=false,"
+                                  "tcp://localhost:61628?transport.useInactivityMonitor=false)?randomize=false&priorityBackup=true";
+
+    PriorityBackupListener listener;
+    FailoverTransportFactory factory;
+
+    Pointer<Transport> transport(factory.create(uri));
+    CPPUNIT_ASSERT(transport != NULL);
+    transport->setTransportListener(&listener);
+
+    FailoverTransport* failover =
+        dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport)));
+
+    CPPUNIT_ASSERT(failover != NULL);
+    CPPUNIT_ASSERT(failover->isRandomize() == false);
+    CPPUNIT_ASSERT(failover->isPriorityBackup() == true);
+
+    transport->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+    listener.reset();
+
+    CPPUNIT_ASSERT(failover->isConnected() == true);
+    CPPUNIT_ASSERT(failover->isConnectedToPriority() == false);
+
+    Thread::sleep(100);
+
+    broker1->start();
+    broker1->waitUntilStarted();
+
+    broker2->stop();
+    broker2->waitUntilStopped();
+
+    for (int i = 0; i < 2; ++i) {
+
+        CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption());
+        CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed());
+        listener.reset();
+
+        URI connectedURI = URI(transport->getRemoteAddress());
+
+        if (connectedURI.getPort() == broker1->getPort()) {
+            break;
+        }
+    }
+
+    CPPUNIT_ASSERT(failover->isConnected() == true);
+    CPPUNIT_ASSERT(failover->isConnectedToPriority() == true);
+
+    transport->close();
+
+    broker1->stop();
+    broker1->waitUntilStopped();
+
+    broker2->stop();
+    broker2->waitUntilStopped();
+
+    broker3->stop();
+    broker3->waitUntilStopped();
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
index f0ea87b..74d7813 100644
--- a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
+++ b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h
@@ -54,6 +54,12 @@ namespace failover {
         CPPUNIT_TEST( testPriorityBackupConfig );
         CPPUNIT_TEST( testUriOptionsApplied );
         CPPUNIT_TEST( testConnectedToMockBroker );
+        CPPUNIT_TEST( testMaxReconnectsZeroAttemptsOneConnect );
+        CPPUNIT_TEST( testMaxReconnectsHonorsConfiguration );
+        CPPUNIT_TEST( testStartupMaxReconnectsHonorsConfiguration );
+        CPPUNIT_TEST( testConnectedToPriorityOnFirstTryThenFailover );
+        CPPUNIT_TEST( testConnectsToPriorityOnceStarted );
+        //CPPUNIT_TEST( testConnectsToPriorityAfterInitialBackupFails );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -75,6 +81,12 @@ namespace failover {
         void testPriorityBackupConfig();
         void testUriOptionsApplied();
         void testConnectedToMockBroker();
+        void testMaxReconnectsZeroAttemptsOneConnect();
+        void testMaxReconnectsHonorsConfiguration();
+        void testStartupMaxReconnectsHonorsConfiguration();
+        void testConnectedToPriorityOnFirstTryThenFailover();
+        void testConnectsToPriorityOnceStarted();
+        void testConnectsToPriorityAfterInitialBackupFails();
 
     private:
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4d010e12/activemq-cpp/src/test/testRegistry.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test/testRegistry.cpp b/activemq-cpp/src/test/testRegistry.cpp
index 4936891..dcea2a7 100644
--- a/activemq-cpp/src/test/testRegistry.cpp
+++ b/activemq-cpp/src/test/testRegistry.cpp
@@ -121,15 +121,6 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::mock::MockTransportFactory
 #include <activemq/transport/inactivity/InactivityMonitorTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::inactivity::InactivityMonitorTest );
 
-//#include <activemq/transport/discovery/DiscoveryAgentRegistryTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::DiscoveryAgentRegistryTest
);
-//#include <activemq/transport/discovery/DiscoveryTransportFactoryTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::DiscoveryTransportFactoryTest
);
-//#include <activemq/transport/discovery/AbstractDiscoveryAgentTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::AbstractDiscoveryAgentTest
);
-//#include <activemq/transport/discovery/AbstractDiscoveryAgentFactoryTest.h>
-//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::discovery::AbstractDiscoveryAgentFactoryTest
);
-
 #include <activemq/transport/TransportRegistryTest.h>
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::transport::TransportRegistryTest );
 #include <activemq/transport/IOTransportTest.h>


Mime
View raw message