activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1087050 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover: FailoverTransport.cpp FailoverTransport.h FailoverTransportListener.cpp
Date Wed, 30 Mar 2011 19:24:23 GMT
Author: tabish
Date: Wed Mar 30 19:24:22 2011
New Revision: 1087050

URL: http://svn.apache.org/viewvc?rev=1087050&view=rev
Log:
fix for https://issues.apache.org/jira/browse/AMQCPP-358 

Adds some small refactoring to support future work needed.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1087050&r1=1087049&r2=1087050&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
Wed Mar 30 19:24:22 2011
@@ -551,7 +551,6 @@ void FailoverTransport::processNewTransp
                     URI uri( str );
                     list.add( uri );
                 } catch( Exception e ) {
-                    //LOG.error( "Failed to parse broker address: " + str, e );
                 }
             }
 
@@ -559,7 +558,6 @@ void FailoverTransport::processNewTransp
                 try {
                     updateURIs( rebalance, list );
                 } catch( IOException e ) {
-                    //LOG.error( "Failed to update transport URI's from: " + newTransports,
e );
                 }
             }
         }
@@ -615,7 +613,22 @@ bool FailoverTransport::isPending() cons
 
     synchronized( &reconnectMutex ) {
         if( this->connectedTransport == NULL && !closed && started ) {
-            result = true;
+
+            int reconnectAttempts = 0;
+            if( firstConnection ) {
+                if( startupMaxReconnectAttempts != 0 ) {
+                    reconnectAttempts = startupMaxReconnectAttempts;
+                }
+            }
+            if( reconnectAttempts == 0 ) {
+                reconnectAttempts = maxReconnectAttempts;
+            }
+
+            if( reconnectAttempts > 0 && connectFailures >= reconnectAttempts
) {
+                result = false;
+            } else {
+                result = true;
+            }
         }
     }
 
@@ -802,8 +815,6 @@ bool FailoverTransport::iterate() {
     if( !closed ) {
 
         synchronized( &sleepMutex ) {
-            //std::cout << "Failover: Trying again in "
-            //          << reconnectDelay << "Milliseconds." << std::endl;
             sleepMutex.wait( (unsigned int)reconnectDelay );
         }
 
@@ -848,3 +859,219 @@ void FailoverTransport::setConnectionInt
         stateTracker.connectionInterruptProcessingComplete( this, connectionId );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isConnected() const {
+    return this->connected;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isClosed() const {
+    return this->closed;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isInitialized() const {
+    return this->initialized;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setInitialized( bool value ) {
+    this->initialized = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* FailoverTransport::narrow( const std::type_info& typeId ) {
+
+    if( typeid( *this ) == typeId ) {
+        return this;
+    }
+
+    if( this->connectedTransport != NULL ) {
+        return this->connectedTransport->narrow( typeId );
+    }
+
+    return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::processResponse(const Pointer<Response>& response) {
+
+    Pointer<Command> object;
+
+    synchronized( &( this->requestMap ) ) {
+        try{
+            object = this->requestMap.remove( response->getCorrelationId() );
+        } catch( NoSuchElementException& ex ) {
+            // Not tracking this request in our map, not an error.
+        }
+    }
+
+    if( object != NULL ) {
+        try{
+            Pointer<Tracked> tracked = object.dynamicCast<Tracked>();
+            tracked->onResponse();
+        }
+        AMQ_CATCH_NOTHROW( ClassCastException )
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getTimeout() const {
+    return this->timeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTimeout( long long value ) {
+    this->timeout = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getInitialReconnectDelay() const {
+    return this->initialReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setInitialReconnectDelay( long long value ) {
+    this->initialReconnectDelay = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getMaxReconnectDelay() const {
+    return this->maxReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxReconnectDelay( long long value ) {
+    this->maxReconnectDelay = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getBackOffMultiplier() const {
+    return this->backOffMultiplier;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setBackOffMultiplier( long long value ) {
+    this->backOffMultiplier = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isUseExponentialBackOff() const {
+    return this->useExponentialBackOff;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setUseExponentialBackOff( bool value ) {
+    this->useExponentialBackOff = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isRandomize() const {
+    return this->uris->isRandomize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setRandomize( bool value ) {
+    this->uris->setRandomize( value );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getMaxReconnectAttempts() const {
+    return this->maxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxReconnectAttempts( int value ) {
+    this->maxReconnectAttempts = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getStartupMaxReconnectAttempts() const {
+    return this->startupMaxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setStartupMaxReconnectAttempts( int value ) {
+    this->startupMaxReconnectAttempts = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getReconnectDelay() const {
+    return this->reconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setReconnectDelay( long long value ) {
+    this->reconnectDelay = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isBackup() const {
+    return this->backups->isEnabled();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setBackup( bool value ) {
+    this->backups->setEnabled( value );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getBackupPoolSize() const {
+    return this->backups->getBackupPoolSize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setBackupPoolSize( int value ) {
+    this->backups->setBackupPoolSize( value );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isTrackMessages() const {
+    return this->trackMessages;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTrackMessages( bool value ) {
+    this->trackMessages = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isTrackTransactionProducers() const {
+    return this->trackTransactionProducers;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTrackTransactionProducers( bool value ) {
+    this->trackTransactionProducers = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getMaxCacheSize() const {
+    return this->maxCacheSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxCacheSize( int value ) {
+    this->maxCacheSize = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isReconnectSupported() const {
+    return this->reconnectSupported;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setReconnectSupported( bool value ) {
+    this->reconnectSupported = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isUpdateURIsSupported() const {
+    return this->updateURIsSupported;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setUpdateURIsSupported( bool value ) {
+    this->updateURIsSupported = value;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1087050&r1=1087049&r2=1087050&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
Wed Mar 30 19:24:22 2011
@@ -155,33 +155,15 @@ namespace failover {
             return true;
         }
 
-        virtual bool isConnected() const {
-            return this->connected;
-        }
+        virtual bool isConnected() const;
 
-        virtual bool isClosed() const {
-            return this->closed;
-        }
+        virtual bool isClosed() const;
 
-        bool isInitialized() const {
-            return this->initialized;
-        }
-
-        void setInitialized( bool value ) {
-            this->initialized = value;
-        }
+        bool isInitialized() const;
 
-        virtual Transport* narrow( const std::type_info& typeId ) {
-            if( typeid( *this ) == typeId ) {
-                return this;
-            }
-
-            if( this->connectedTransport != NULL ) {
-                return this->connectedTransport->narrow( typeId );
-            }
+        void setInitialized( bool value );
 
-            return NULL;
-        }
+        virtual Transport* narrow( const std::type_info& typeId );
 
         virtual std::string getRemoteAddress() const;
 
@@ -208,133 +190,69 @@ namespace failover {
 
     public: // FailoverTransport Property Getters / Setters
 
-        long long getTimeout() const {
-            return this->timeout;
-        }
+        long long getTimeout() const;
 
-        void setTimeout( long long value ) {
-            this->timeout = value;
-        }
+        void setTimeout( long long value );
 
-        long long getInitialReconnectDelay() const {
-            return this->initialReconnectDelay;
-        }
+        long long getInitialReconnectDelay() const;
 
-        void setInitialReconnectDelay( long long value ) {
-            this->initialReconnectDelay = value;
-        }
+        void setInitialReconnectDelay( long long value );
 
-        long long getMaxReconnectDelay() const {
-            return this->maxReconnectDelay;
-        }
+        long long getMaxReconnectDelay() const;
 
-        void setMaxReconnectDelay( long long value ) {
-            this->maxReconnectDelay = value;
-        }
+        void setMaxReconnectDelay( long long value );
 
-        long long getBackOffMultiplier() const {
-            return this->backOffMultiplier;
-        }
+        long long getBackOffMultiplier() const;
 
-        void setBackOffMultiplier( long long value ) {
-            this->backOffMultiplier = value;
-        }
+        void setBackOffMultiplier( long long value );
 
-        bool isUseExponentialBackOff() const {
-            return this->useExponentialBackOff;
-        }
+        bool isUseExponentialBackOff() const;
 
-        void setUseExponentialBackOff( bool value ) {
-            this->useExponentialBackOff = value;
-        }
+        void setUseExponentialBackOff( bool value );
 
-        bool isRandomize() const {
-            return this->uris->isRandomize();
-        }
+        bool isRandomize() const;
 
-        void setRandomize( bool value ) {
-            this->uris->setRandomize( value );
-        }
+        void setRandomize( bool value );
 
-        int getMaxReconnectAttempts() const {
-            return this->maxReconnectAttempts;
-        }
+        int getMaxReconnectAttempts() const;
 
-        void setMaxReconnectAttempts( int value ) {
-            this->maxReconnectAttempts = value;
-        }
+        void setMaxReconnectAttempts( int value );
 
-        int getStartupMaxReconnectAttempts() const {
-            return this->startupMaxReconnectAttempts;
-        }
+        int getStartupMaxReconnectAttempts() const;
 
-        void setStartupMaxReconnectAttempts( int value ) {
-            this->startupMaxReconnectAttempts = value;
-        }
+        void setStartupMaxReconnectAttempts( int value );
 
-        long long getReconnectDelay() const {
-            return this->reconnectDelay;
-        }
+        long long getReconnectDelay() const;
 
-        void setReconnectDelay( long long value ) {
-            this->reconnectDelay = value;
-        }
+        void setReconnectDelay( long long value );
 
-        bool isBackup() const {
-            return this->backups->isEnabled();
-        }
+        bool isBackup() const;
 
-        void setBackup( bool value ) {
-            this->backups->setEnabled( value );
-        }
+        void setBackup( bool value );
 
-        int getBackupPoolSize() const {
-            return this->backups->getBackupPoolSize();
-        }
+        int getBackupPoolSize() const;
 
-        void setBackupPoolSize( int value ) {
-            this->backups->setBackupPoolSize( value );
-        }
+        void setBackupPoolSize( int value );
 
-        bool isTrackMessages() const {
-            return this->trackMessages;
-        }
+        bool isTrackMessages() const;
 
-        void setTrackMessages( bool value ) {
-            this->trackMessages = value;
-        }
+        void setTrackMessages( bool value );
 
-        bool isTrackTransactionProducers() const {
-            return this->trackTransactionProducers;
-        }
+        bool isTrackTransactionProducers() const;
 
-        void setTrackTransactionProducers( bool value ) {
-            this->trackTransactionProducers = value;
-        }
+        void setTrackTransactionProducers( bool value );
 
-        int getMaxCacheSize() const {
-            return this->maxCacheSize;
-        }
+        int getMaxCacheSize() const;
 
-        void setMaxCacheSize( int value ) {
-            this->maxCacheSize = value;
-        }
+        void setMaxCacheSize( int value );
 
-        bool isReconnectSupported() const {
-            return this->reconnectSupported;
-        }
+        bool isReconnectSupported() const;
 
-        void setReconnectSupported( bool value ) {
-            this->reconnectSupported = value;
-        }
+        void setReconnectSupported( bool value );
 
-        bool isUpdateURIsSupported() const {
-            return this->updateURIsSupported;
-        }
+        bool isUpdateURIsSupported() const;
 
-        void setUpdateURIsSupported( bool value ) {
-            this->updateURIsSupported = value;
-        }
+        void setUpdateURIsSupported( bool value );
 
         void setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>&
connectionId );
 
@@ -382,6 +300,8 @@ namespace failover {
 
         void processNewTransports( bool rebalance, std::string newTransports );
 
+        void processResponse(const Pointer<Response>& response);
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=1087050&r1=1087049&r2=1087050&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
Wed Mar 30 19:24:22 2011
@@ -53,26 +53,8 @@ void FailoverTransportListener::onComman
     }
 
     if( command->isResponse() ) {
-
-        Pointer<Response> response =
-            command.dynamicCast<Response>();
-        Pointer<Command> object;
-
-        synchronized( &( parent->requestMap ) ) {
-            try{
-                object = parent->requestMap.remove( response->getCorrelationId() );
-            } catch( NoSuchElementException& ex ) {
-                // Not tracking this request in our map, not an error.
-            }
-        }
-
-        if( object != NULL ) {
-            try{
-                Pointer<Tracked> tracked = object.dynamicCast<Tracked>();
-                tracked->onResponse();
-            }
-            AMQ_CATCH_NOTHROW( ClassCastException )
-        }
+        Pointer<Response> response = command.dynamicCast<Response>();
+        parent->processResponse(response);
     }
 
     if( !parent->isInitialized() ) {
@@ -83,8 +65,8 @@ void FailoverTransportListener::onComman
         parent->handleConnectionControl( command );
     }
 
-    if( parent->transportListener != NULL ) {
-        parent->transportListener->onCommand( command );
+    if( parent->getTransportListener() != NULL ) {
+        parent->getTransportListener()->onCommand( command );
     }
 }
 
@@ -93,22 +75,22 @@ void FailoverTransportListener::onExcept
     try {
         parent->handleTransportFailure( ex );
     } catch( Exception& e ) {
-        if( parent->transportListener != NULL ) {
-            parent->transportListener->onException( e );
+        if( parent->getTransportListener() != NULL ) {
+            parent->getTransportListener()->onException( e );
         }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransportListener::transportInterrupted() {
-    if( parent->transportListener != NULL ) {
-        parent->transportListener->transportInterrupted();
+    if( parent->getTransportListener() != NULL ) {
+        parent->getTransportListener()->transportInterrupted();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransportListener::transportResumed() {
-    if( parent->transportListener != NULL ) {
-        parent->transportListener->transportResumed();
+    if( parent->getTransportListener() != NULL ) {
+        parent->getTransportListener()->transportResumed();
     }
 }



Mime
View raw message