qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mgoul...@apache.org
Subject svn commit: r889657 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cpg.cpp Cpg.h
Date Fri, 11 Dec 2009 15:29:01 GMT
Author: mgoulish
Date: Fri Dec 11 15:29:00 2009
New Revision: 889657

URL: http://svn.apache.org/viewvc?rev=889657&view=rev
Log:
Add retry capability to several cpg calls.
First retry is immediate, next one after 10 usec,
then 100 usec, etc ... for 5 retries.
Retry pause maxes out at 0.1 second.
Then give up and report error.

The lack of retry on one of these calls must have 
been responsible for several hard-to-reproduce 
failures seen over the last month or so.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=889657&r1=889656&r2=889657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Dec 11 15:29:00 2009
@@ -39,6 +39,8 @@
 
 using namespace std;
 
+
+
 Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
     void* cpg=0;
     CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
@@ -46,6 +48,28 @@
     return reinterpret_cast<Cpg*>(cpg);
 }
 
+// Applies the same retry-logic to all cpg calls that need it.
+void Cpg::callCpg ( CpgOp & c ) {
+    cpg_error_t result;
+    unsigned int snooze = 10;
+    for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) {
+        if ( CPG_OK == (result = c.op(handle, & group))) {
+            QPID_LOG(info, c.opName << " successful.");
+            break;
+        }
+        else if ( result == CPG_ERR_TRY_AGAIN ) {
+            QPID_LOG(info, "Retrying " << c.opName );
+            sys::usleep ( snooze );
+            snooze *= 10;
+            snooze = (snooze <= maxCpgRetrySleep) ? snooze : maxCpgRetrySleep;
+        }
+        else break;  // Don't retry unless CPG tells us to.
+    }
+
+    if ( result != CPG_OK )
+        CPG_CHECK(result, c.msg(group));
+}
+
 // Global callback functions.
 void Cpg::globalDeliver (
     cpg_handle_t handle,
@@ -129,11 +153,11 @@
 
 void Cpg::join(const std::string& name) {
     group = name;
-    CPG_CHECK(cpg_join(handle, &group), cantJoinMsg(group));
+    callCpg ( cpgJoinOp );
 }
     
 void Cpg::leave() {
-    CPG_CHECK(cpg_leave(handle, &group), cantLeaveMsg(group));
+    callCpg ( cpgLeaveOp );
 }
 
 
@@ -158,7 +182,8 @@
     if (!isShutdown) {
         QPID_LOG(debug,"Shutting down CPG");
         isShutdown=true;
-        CPG_CHECK(cpg_finalize(handle), "Error in shutdown of CPG");
+
+        callCpg ( cpgFinalizeOp );
     }
 }
 
@@ -201,6 +226,10 @@
     return "Cannot join CPG group "+group.str();
 }
 
+std::string Cpg::cantFinalizeMsg(const Name& group) {
+    return "Cannot finalize CPG group "+group.str();
+}
+
 std::string Cpg::cantLeaveMsg(const Name& group) {
     return "Cannot leave CPG group "+group.str();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=889657&r1=889656&r2=889657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Dec 11 15:29:00 2009
@@ -39,6 +39,7 @@
  * On error all functions throw Cpg::Exception.
  *
  */
+
 class Cpg : public sys::IOHandle {
   public:
     struct Exception : public ::qpid::Exception {
@@ -114,10 +115,73 @@
     int getFd();
     
   private:
+
+    // Maximum number of retries for cog functions that can tell
+    // us to "try again later".
+    static const unsigned int cpgRetries = 5;
+
+    // Don't let sleep-time between cpg retries to go above 0.1 second.
+    static const unsigned int maxCpgRetrySleep = 100000;
+
+
+    // Base class for the Cpg operations that need retry capability.
+    struct CpgOp { 
+        std::string opName;
+
+        CpgOp ( std::string opName ) 
+          : opName(opName) { }
+
+        virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0; 
+        virtual std::string msg(const Name&) = 0;
+        virtual ~CpgOp ( ) { }
+    };
+
+
+    struct CpgJoinOp : public CpgOp { 
+        CpgJoinOp ( )
+          : CpgOp ( std::string("cpg_join") ) { }
+
+        cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { 
+            return cpg_join ( handle, group ); 
+        }
+
+        std::string msg(const Name& name) { return cantJoinMsg(name); }
+    };
+
+    struct CpgLeaveOp : public CpgOp { 
+        CpgLeaveOp ( )
+          : CpgOp ( std::string("cpg_leave") ) { }
+
+        cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { 
+            return cpg_leave ( handle, group ); 
+        }
+
+        std::string msg(const Name& name) { return cantLeaveMsg(name); }
+    };
+
+    struct CpgFinalizeOp : public CpgOp { 
+        CpgFinalizeOp ( )
+          : CpgOp ( std::string("cpg_finalize") ) { }
+
+        cpg_error_t op(cpg_handle_t handle, struct cpg_name *) { 
+            return cpg_finalize ( handle ); 
+        }
+
+        std::string msg(const Name& name) { return cantFinalizeMsg(name); }
+    };
+
+    // This fn standardizes retry policy across all Cpg ops that need it.
+    void callCpg ( CpgOp & );
+
+    CpgJoinOp     cpgJoinOp;
+    CpgLeaveOp    cpgLeaveOp;
+    CpgFinalizeOp cpgFinalizeOp;
+
     static std::string errorStr(cpg_error_t err, const std::string& msg);
     static std::string cantJoinMsg(const Name&);
     static std::string cantLeaveMsg(const Name&);
     static std::string cantMcastMsg(const Name&);
+    static std::string cantFinalizeMsg(const Name&);
 
     static Cpg* cpgFromHandle(cpg_handle_t);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message