activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-609
Date Wed, 08 Feb 2017 22:35:52 GMT
Repository: activemq-cpp
Updated Branches:
  refs/heads/3.9.x fe3b22163 -> f93fa9ea1


https://issues.apache.org/jira/browse/AMQCPP-609

Watch for shutdown and don't reconnect if the remote drops the
connection before the close on the transport is registered.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/f93fa9ea
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/f93fa9ea
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/f93fa9ea

Branch: refs/heads/3.9.x
Commit: f93fa9ea187693c26614cee17e7a7855cea3e7c0
Parents: fe3b221
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Feb 8 17:35:45 2017 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Feb 8 17:35:45 2017 -0500

----------------------------------------------------------------------
 .../transport/failover/FailoverTransport.cpp    | 34 ++++++++++++++++----
 1 file changed, 28 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f93fa9ea/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
index 3e1059a..b5ea33b 100644
--- a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
+++ b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
@@ -94,6 +94,7 @@ namespace failover {
         bool rebalanceUpdateURIs;
         bool priorityBackup;
         bool backupsEnabled;
+        volatile bool shutdown;
 
         bool doRebalance;
         bool connectedToPrioirty;
@@ -143,6 +144,7 @@ namespace failover {
             rebalanceUpdateURIs(true),
             priorityBackup(false),
             backupsEnabled(false),
+            shutdown(false),
             doRebalance(false),
             connectedToPrioirty(false),
             reconnectMutex(),
@@ -226,7 +228,7 @@ namespace failover {
                 try {
                     ioException = this->connectionFailure.dynamicCast<IOException>();
                 }
-                AMQ_CATCH_NOTHROW( ClassCastException)
+                AMQ_CATCH_NOTHROW(ClassCastException)
 
                 if (ioException != NULL) {
                     transportListener->onException(*this->connectionFailure);
@@ -272,6 +274,9 @@ namespace failover {
             }
         }
 
+        bool willReconnect() {
+            return firstConnection || 0 != calculateReconnectAttemptLimit();
+        }
     };
 
     const int FailoverTransportImpl::DEFAULT_INITIAL_RECONNECT_DELAY = 10;
@@ -435,7 +440,9 @@ void FailoverTransport::oneway(const Pointer<Command> command) {
                     long long start = System::currentTimeMillis();
                     bool timedout = false;
 
-                    while (transport == NULL && !this->impl->closed &&
this->impl->connectionFailure == NULL) {
+                    while (transport == NULL && !this->impl->closed &&
+                           this->impl->connectionFailure == NULL && this->impl->willReconnect())
{
+
                         long long end = System::currentTimeMillis();
                         if (command->isMessage() && this->impl->timeout
> 0 && (end - start > this->impl->timeout)) {
                             timedout = true;
@@ -455,6 +462,9 @@ void FailoverTransport::oneway(const Pointer<Command> command) {
                         } else if (timedout == true) {
                             error.reset(new IOException(__FILE__, __LINE__,
                                 "Failover timeout of %d ms reached.", this->impl->timeout));
+                        } else if (!this->impl->willReconnect()) {
+                            error.reset(new IOException(__FILE__, __LINE__,
+                                "Maximum reconnection attempts exceeded"));
                         } else {
                             error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
                         }
@@ -468,7 +478,7 @@ void FailoverTransport::oneway(const Pointer<Command> command) {
                     Pointer<Tracked> tracked;
                     try {
                         tracked = stateTracker.track(command);
-                        synchronized( &this->impl->requestMap ) {
+                        synchronized(&this->impl->requestMap) {
                             if (tracked != NULL && tracked->isWaitingForResponse())
{
                                 this->impl->requestMap.put(command->getCommandId(),
tracked);
                             } else if (tracked == NULL && command->isResponseRequired())
{
@@ -485,13 +495,15 @@ void FailoverTransport::oneway(const Pointer<Command> command)
{
                     try {
                         transport->oneway(command);
                         stateTracker.trackBack(command);
+                        if (command->isShutdownInfo()) {
+                            this->impl->shutdown = true;
+                        }
                     } catch (IOException& e) {
 
                         e.setMark(__FILE__, __LINE__);
 
-                        // If the command was not tracked.. we will retry in
-                        // this method
-                        if (tracked == NULL) {
+                        // If the command was not tracked.. we will retry in this method
+                        if (tracked == NULL && this->impl->canReconnect())
{
 
                             // since we will retry in this method.. take it out of the
                             // request map so that it is not sent 2 times on recovery
@@ -690,8 +702,18 @@ void FailoverTransport::restoreTransport(const Pointer<Transport>
transport) {
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error) {
 
+    if (this->impl->shutdown) {
+        // shutdown info sent and remote socket closed and we see that before a local close
+        // let the close do the work
+        return;
+    }
+
     synchronized(&this->impl->reconnectMutex) {
 
+        if (this->impl->shutdown) {
+            return;
+        }
+
         Pointer<Transport> transport;
         this->impl->connectedTransport.swap(transport);
 


Mime
View raw message