qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r674855 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp qpid/cluster/Cpg.cpp qpid/cluster/Cpg.h tests/ForkedBroker.h tests/cluster_test.cpp
Date Tue, 08 Jul 2008 15:22:37 GMT
Author: aconway
Date: Tue Jul  8 08:22:37 2008
New Revision: 674855

URL: http://svn.apache.org/viewvc?rev=674855&view=rev
Log:
Removed static Cpg::handlers, fixed ForkedBroker shutdown.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jul  8 08:22:37 2008
@@ -114,6 +114,7 @@
     QPID_LOG(trace, *this << " Leaving cluster.");
     try {
         cpg.leave(name);
+        cpg.shutdown();
         dispatcher.join();
     }
     catch (const std::exception& e) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Jul  8 08:22:37 2008
@@ -32,36 +32,14 @@
 
 using namespace std;
 
-// Global vector of Cpg pointers by handle.
-// TODO aconway 2007-06-12: Replace this with cpg_get/set_context,
-// coming in in RHEL 5.1.
-class Cpg::Handles
-{
-  public:
-    void put(cpg_handle_t handle, Cpg::Handler* handler) {
-        sys::Mutex::ScopedLock l(lock);
-        uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
-        if (index >= handles.size())
-            handles.resize(index+1, 0);
-        handles[index] = handler;
-    }
-    
-    Cpg::Handler* get(cpg_handle_t handle) {
-        sys::Mutex::ScopedLock l(lock);
-        uint32_t index=uint32_t(handle); // Lower 32 bits is an array index.
-        assert(index < handles.size());
-        assert(handles[index]);
-        return handles[index];
-    }
-    
-  private:
-    sys::Mutex lock;
-    vector<Cpg::Handler*>  handles;
-};
-
-Cpg::Handles Cpg::handles;
+Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
+    void* cpg=0;
+    check(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
+    if (!cpg) throw Exception("Cannot get CPG instance.");
+    return reinterpret_cast<Cpg*>(cpg);
+}
 
-// Global callback functions call per-object callbacks via handles vector.
+// Global callback functions.
 void Cpg::globalDeliver (
     cpg_handle_t handle,
     struct cpg_name *group,
@@ -70,9 +48,7 @@
     void* msg,
     int msg_len)
 {
-    Cpg::Handler* handler=handles.get(handle);
-    if (handler)
-        handler->deliver(handle, group, nodeid, pid, msg, msg_len);
+    cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len);
 }
 
 void Cpg::globalConfigChange(
@@ -83,15 +59,13 @@
     struct cpg_address *joined, int nJoined
 )
 {
-    Cpg::Handler* handler=handles.get(handle);
-    if (handler)
-        handler->configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined);
+    cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left,
nLeft, joined, nJoined);
 }
 
 Cpg::Cpg(Handler& h) : handler(h) {
     cpg_callbacks_t callbacks = { &globalDeliver, &globalConfigChange };
     check(cpg_initialize(&handle, &callbacks), "Cannot initialize CPG");
-    handles.put(handle, &handler);
+    check(cpg_context_set(handle, this), "Cannot set CPG context");
     QPID_LOG(debug, "Initialize CPG handle 0x" << std::hex << handle);
 }
 
@@ -104,10 +78,10 @@
 }
 
 void Cpg::shutdown() {
-    if (handles.get(handle)) {
-        QPID_LOG(debug, "Finalize CPG handle " << std::hex << handle);
-        handles.put(handle, 0);
+    if (handle) {
+        cpg_context_set(handle, 0);
         check(cpg_finalize(handle), "Error in shutdown of CPG");
+        handle = 0;
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Tue Jul  8 08:22:37 2008
@@ -38,6 +38,8 @@
  * Lightweight C++ interface to cpg.h operations. 
  * Manages a single CPG handle, initialized in ctor, finialzed in destructor.
  * On error all functions throw Cpg::Exception
+ *
+ * NOTE: only one at a time can exist per process.
  */
 class Cpg : public Dispatchable {
   public:
@@ -95,7 +97,7 @@
      */
     Cpg(Handler&);
     
-    /** Destructor calls shutdown. */
+    /** Destructor calls shutdown if not already calledx. */
     ~Cpg();
 
     /** Disconnect from CPG */
@@ -134,22 +136,17 @@
     Id self() const;
 
   private:
-    class Handles;
-    struct ClearHandleOnExit;
-  friend class Handles;
-  friend struct ClearHandleOnExit;
-    
     static std::string errorStr(cpg_error_t err, const std::string& msg);
     static std::string cantJoinMsg(const Name&);
     static std::string cantLeaveMsg(const Name&);
     static std::string cantMcastMsg(const Name&);
     
     static void check(cpg_error_t result, const std::string& msg) {
-        // TODO aconway 2007-06-01: Logging and exceptions.
-        if (result != CPG_OK) 
-            throw Exception(errorStr(result, msg));
+        if (result != CPG_OK) throw Exception(errorStr(result, msg));
     }
 
+    static Cpg* cpgFromHandle(cpg_handle_t);
+
     static void globalDeliver(
         cpg_handle_t handle,
         struct cpg_name *group,
@@ -166,7 +163,6 @@
         struct cpg_address *joined, int nJoined
     );
 
-    static Handles handles;
     cpg_handle_t handle;
     Handler& handler;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Tue Jul  8 08:22:37 2008
@@ -22,6 +22,7 @@
  *
  */
 
+#include "qpid/Exception.h"
 #include "qpid/sys/Fork.h"
 #include "qpid/log/Logger.h"
 #include "qpid/broker/Broker.h"
@@ -48,7 +49,7 @@
  * 
  */
 class ForkedBroker : public qpid::sys::ForkWithMessage {
-    pid_t childPid;
+    pid_t pid;
     uint16_t port;
     qpid::broker::Broker::Options opts;
     std::string prefix;
@@ -56,22 +57,33 @@
   public:
     struct ChildExit {};   // Thrown in child processes.
 
-    ForkedBroker(const qpid::broker::Broker::Options& opts_, const std::string& prefix_=std::string())
-        : childPid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } 
-
-    ~ForkedBroker() { stop(); }
+    ForkedBroker(const qpid::broker::Broker::Options& opts_=qpid::broker::Broker::Options(),
+                 const std::string& prefix_=std::string())
+        : pid(0), port(0), opts(opts_), prefix(prefix_) { fork(); } 
+
+    ~ForkedBroker() {
+        try { stop(); }
+        catch(const std::exception& e) {
+            QPID_LOG(error, e.what());
+        }
+    }
 
     void stop() {
-        if (childPid > 0) {
-            ::kill(childPid, SIGINT);
-            ::waitpid(childPid, 0, 0);
+        if (pid > 0) {     // I am the parent, clean up children.
+            if (::kill(pid, SIGINT) < 0)
+                throw qpid::Exception(QPID_MSG("Can't kill process " << pid <<
": " << qpid::strError(errno)));
+            int status = 0;
+            if (::waitpid(pid, &status, 0) < 0)
+                throw qpid::Exception(QPID_MSG("Waiting for process " << pid <<
": " << qpid::strError(errno)));
+            if (WEXITSTATUS(status) != 0)
+                throw qpid::Exception(QPID_MSG("Process " << pid << " exited
with status: " << WEXITSTATUS(status)));
         }
     }
 
-    void parent(pid_t pid) {
-        childPid = pid;
+    void parent(pid_t pid_) {
+        pid = pid_;
         qpid::log::Logger::instance().setPrefix("parent");
-        std::string portStr = wait(2);
+        std::string portStr = wait(5);
         port = boost::lexical_cast<uint16_t>(portStr);
     }
 
@@ -88,6 +100,7 @@
 
         // Force exit in the child process, otherwise we will try to
         // carry with parent tests.
+        broker.reset();         // Run broker dtor before we exit.
         exit(0);
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=674855&r1=674854&r2=674855&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jul  8 08:22:37 2008
@@ -147,6 +147,16 @@
 }
 
 
+QPID_AUTO_TEST_CASE(testForkedBroker) {
+    // Verify the ForkedBroker works as expected.
+    Broker::Options opts;
+    opts.auth="no";
+    opts.noDataDir=true;
+    ForkedBroker broker(opts);
+    Client c(broker.getPort());
+    BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("amq.direct").getType()); 
+}
+
 QPID_AUTO_TEST_CASE(testWiringReplication) {
     ClusterFixture cluster(2);  // FIXME aconway 2008-07-02: 3 brokers
     Client c0(cluster[0].getPort());



Mime
View raw message