qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r883910 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/ src/tests/ xml/
Date Tue, 24 Nov 2009 22:41:11 GMT
Author: aconway
Date: Tue Nov 24 22:41:10 2009
New Revision: 883910

URL: http://svn.apache.org/viewvc?rev=883910&view=rev
Log:
Verify stored cluster-id matches agreed cluster-id when joining a persistent cluster.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
    qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Nov 24 22:41:10 2009
@@ -103,6 +103,7 @@
  * done single-threaded, bypassing the normal PollableQueues because
  * the Poller is not active at this point to service them.
  */
+#include "qpid/Exception.h"
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/ClusterSettings.h"
 #include "qpid/cluster/Connection.h"
@@ -153,15 +154,16 @@
 
 namespace qpid {
 namespace cluster {
+using namespace qpid;
 using namespace qpid::framing;
 using namespace qpid::sys;
-using namespace std;
 using namespace qpid::cluster;
-using namespace qpid::framing::cluster;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
+using namespace framing::cluster;
+using namespace std;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
 namespace _qmf = ::qmf::org::apache::qpid::cluster;
 
 /**
@@ -184,10 +186,10 @@
     void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l);
}
 
     void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
-                       uint8_t storeState, const Uuid& start, const Uuid& stop)
+                       uint8_t storeState, const Uuid& shutdownId)
     {
         cluster.initialStatus(member, version, active, clusterId, 
-                              framing::cluster::StoreState(storeState), start, stop, l);
+                              framing::cluster::StoreState(storeState), shutdownId, l);
     }
     void ready(const std::string& url) { cluster.ready(member, url, l); }
     void configChange(const std::string& current) { cluster.configChange(member, current,
l); }
@@ -254,8 +256,11 @@
     broker.getExchanges().registerExchange(
         boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
     // Load my store status before we go into initialization
-    if (! broker::NullMessageStore::isNullStore(&broker.getStore())) 
+    if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
         store.load();
+        if (store.getClusterId())
+            clusterId = store.getClusterId(); // Use stored ID if there is one.
+    }
 
     cpg.join(name);
     // Pump the CPG dispatch manually till we get initialized. 
@@ -606,14 +611,18 @@
         else {
             QPID_LOG(info, this << " active for links.");
         }
+        // Check that cluster ID matches persistent store.
+        Uuid agreedId = initMap.getClusterId();
+        if (store.hasStore()) {
+            Uuid storeId = store.getClusterId();
+            if (storeId && storeId != agreedId)
+                throw Exception(
+                    QPID_MSG("Persistent cluster-id " << storeId 
+                             << " doesn't match cluster " << agreedId));
+            store.dirty(agreedId);
+        }
+        setClusterId(agreedId, l);
 
-        setClusterId(initMap.getClusterId(), l);
-        // FIXME aconway 2009-11-20: store id == cluster id.
-        // Clean up redundant copy of id in InitialStatus
-        // Use store ID as advertized cluster ID.
-        // Consistency check on cluster ID vs. locally stored ID.
-        // throw rathr than  assert in StoreStatus.
-        if (store.hasStore()) store.dirty(clusterId);
         if (initMap.isUpdateNeeded())  { // Joining established cluster.
             broker.setRecovery(false); // Ditch my current store.
             state = JOINER;
@@ -645,7 +654,7 @@
         mcast.mcastControl(
             ClusterInitialStatusBody(
                 ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, 
-                store.getState(), store.getStart(), store.getStop()
+                store.getState(), store.getShutdownId()
             ),
             self);
     }
@@ -690,7 +699,7 @@
 void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
                             const framing::Uuid& id, 
                             framing::cluster::StoreState store,
-                            const framing::Uuid& start, const framing::Uuid& end,
+                            const framing::Uuid& shutdownId,
                             Lock& l)
 {
     if (version != CLUSTER_VERSION) {
@@ -701,8 +710,7 @@
     }
     initMap.received(
         member,
-        ClusterInitialStatusBody(
-            ProtocolVersion(), version, active, id, store, start, end)
+        ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId)
     );
     if (initMap.transitionToComplete()) {
         QPID_LOG(debug, *this << " initial status map complete. ");

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Nov 24 22:41:10 2009
@@ -151,10 +151,9 @@
     void initialStatus(const MemberId&,
                        uint32_t version,
                        bool active,
-                       const framing::Uuid& id,
+                       const framing::Uuid& clusterId,
                        framing::cluster::StoreState,
-                       const framing::Uuid& start,
-                       const framing::Uuid& end,
+                       const framing::Uuid& shutdownId,
                        Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& current, Lock& l);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Tue Nov 24 22:41:10 2009
@@ -39,8 +39,8 @@
 namespace {
 
 const char* SUBDIR="cluster";
-const char* START_FILE="start";
-const char* STOP_FILE="stop";
+const char* CLUSTER_ID_FILE="cluster.uuid";
+const char* SHUTDOWN_ID_FILE="shutdown.uuid";
 
 Uuid loadUuid(const path& path) {
     Uuid ret;
@@ -62,33 +62,33 @@
 void StoreStatus::load() {
     path dir = path(dataDir)/SUBDIR;
     create_directory(dir);
-    start = loadUuid(dir/START_FILE);
-    stop = loadUuid(dir/STOP_FILE);
+    clusterId = loadUuid(dir/CLUSTER_ID_FILE);
+    shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
 
-    if (start && stop) state = STORE_STATE_CLEAN_STORE;
-    else if (start) state = STORE_STATE_DIRTY_STORE;
+    if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
+    else if (clusterId) state = STORE_STATE_DIRTY_STORE;
     else state = STORE_STATE_EMPTY_STORE;
 }
 
 void StoreStatus::save() {
     path dir = path(dataDir)/SUBDIR;
     create_directory(dir);
-    saveUuid(dir/START_FILE, start);
-    saveUuid(dir/STOP_FILE, stop);
+    saveUuid(dir/CLUSTER_ID_FILE, clusterId);
+    saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
 }
 
-void StoreStatus::dirty(const Uuid& start_) {
-    start = start_;
-    stop = Uuid();
+void StoreStatus::dirty(const Uuid& clusterId_) {
+    clusterId = clusterId_;
+    shutdownId = Uuid();
     state = STORE_STATE_DIRTY_STORE;
     save();
 }
 
-void StoreStatus::clean(const Uuid& stop_) {
-    assert(start);              // FIXME aconway 2009-11-20: exception?
-    assert(stop_);
+void StoreStatus::clean(const Uuid& shutdownId_) {
+    assert(clusterId);              // FIXME aconway 2009-11-20: throw exception
+    assert(shutdownId_);
     state = STORE_STATE_CLEAN_STORE;
-    stop = stop_;
+    shutdownId = shutdownId_;
     save();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Tue Nov 24 22:41:10 2009
@@ -40,8 +40,8 @@
     StoreStatus(const std::string& dir);
 
     framing::cluster::StoreState getState() const { return state; }
-    Uuid getStart() const { return start; }
-    Uuid getStop() const { return stop; }
+    const Uuid& getClusterId() const { return clusterId; }
+    const Uuid& getShutdownId() const { return shutdownId; }
 
     void dirty(const Uuid& start); // Start using the store.
     void clean(const Uuid& stop); // Stop using the store.
@@ -51,9 +51,10 @@
 
     bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
     bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; }
+
   private:
     framing::cluster::StoreState state;
-    Uuid start, stop;
+    Uuid clusterId, shutdownId;
     std::string dataDir;
 };
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/InitialStatusMap.cpp Tue Nov 24 22:41:10 2009
@@ -37,17 +37,15 @@
 typedef InitialStatusMap::Status Status;
 
 Status activeStatus(const Uuid& id=Uuid()) {
-    return Status(ProtocolVersion(), 0, true, id,
-                  STORE_STATE_NO_STORE, Uuid(), Uuid());
+    return Status(ProtocolVersion(), 0, true, id, STORE_STATE_NO_STORE, Uuid());
 }
 
 Status newcomerStatus(const Uuid& id=Uuid()) {
-    return Status(ProtocolVersion(), 0, false, id,
-                  STORE_STATE_NO_STORE, Uuid(), Uuid());
+    return Status(ProtocolVersion(), 0, false, id, STORE_STATE_NO_STORE, Uuid());
 }
 
 Status storeStatus(bool active, StoreState state, Uuid start=Uuid(), Uuid stop=Uuid()) {
-    return Status(ProtocolVersion(), 0, active, Uuid(), state, start, stop);
+    return Status(ProtocolVersion(), 0, active, start, state, stop);
 }
 
 QPID_AUTO_TEST_CASE(testFirstInCluster) {

Modified: qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/StoreStatus.cpp Tue Nov 24 22:41:10 2009
@@ -43,64 +43,64 @@
     create_directory(TEST_DIR);
     StoreStatus ss(TEST_DIR);
     BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_NO_STORE);
-    BOOST_CHECK(!ss.getStart());
-    BOOST_CHECK(!ss.getStop());
+    BOOST_CHECK(!ss.getClusterId());
+    BOOST_CHECK(!ss.getShutdownId());
     ss.load();
     BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_EMPTY_STORE);
-    BOOST_CHECK(!ss.getStop());
+    BOOST_CHECK(!ss.getShutdownId());
     remove_all(TEST_DIR);
 }
 
 QPID_AUTO_TEST_CASE(testSaveLoadDirty) {
     create_directory(TEST_DIR);
-    Uuid start = Uuid(true);
+    Uuid clusterId = Uuid(true);
     StoreStatus ss(TEST_DIR);
     ss.load();
-    ss.dirty(start);
+    ss.dirty(clusterId);
     BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_DIRTY_STORE);
 
     StoreStatus ss2(TEST_DIR);
     ss2.load();
     BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE);
-    BOOST_CHECK_EQUAL(ss2.getStart(), start);
-    BOOST_CHECK(!ss2.getStop());
+    BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId);
+    BOOST_CHECK(!ss2.getShutdownId());
     remove_all(TEST_DIR);
 }
 
 QPID_AUTO_TEST_CASE(testSaveLoadClean) {
     create_directory(TEST_DIR);
-    Uuid start = Uuid(true);
-    Uuid stop = Uuid(true);
+    Uuid clusterId = Uuid(true);
+    Uuid shutdownId = Uuid(true);
     StoreStatus ss(TEST_DIR);
     ss.load();
-    ss.dirty(start);
-    ss.clean(stop);
+    ss.dirty(clusterId);
+    ss.clean(shutdownId);
     BOOST_CHECK_EQUAL(ss.getState(), STORE_STATE_CLEAN_STORE);
 
     StoreStatus ss2(TEST_DIR);
     ss2.load();
     BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_CLEAN_STORE);
-    BOOST_CHECK_EQUAL(ss2.getStart(), start);
-    BOOST_CHECK_EQUAL(ss2.getStop(), stop);
+    BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId);
+    BOOST_CHECK_EQUAL(ss2.getShutdownId(), shutdownId);
     remove_all(TEST_DIR);
 }
 
 QPID_AUTO_TEST_CASE(testMarkDirty) {
     // Save clean then mark to dirty.
     create_directory(TEST_DIR);
-    Uuid start = Uuid(true);
-    Uuid stop = Uuid(true);
+    Uuid clusterId = Uuid(true);
+    Uuid shutdownId = Uuid(true);
     StoreStatus ss(TEST_DIR);
     ss.load();
-    ss.dirty(start);
-    ss.clean(stop);
-    ss.dirty(start);
+    ss.dirty(clusterId);
+    ss.clean(shutdownId);
+    ss.dirty(clusterId);
     
     StoreStatus ss2(TEST_DIR);
     ss2.load();
     BOOST_CHECK_EQUAL(ss2.getState(), STORE_STATE_DIRTY_STORE);
-    BOOST_CHECK_EQUAL(ss2.getStart(), start);
-    BOOST_CHECK(!ss2.getStop());
+    BOOST_CHECK_EQUAL(ss2.getClusterId(), clusterId);
+    BOOST_CHECK(!ss2.getShutdownId());
     remove_all(TEST_DIR);
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Nov 24 22:41:10 2009
@@ -158,3 +158,14 @@
         c = cluster.start("c", wait_for_start=True)
         self.assertEqual(a.get_message("q").content, "clean")
         
+    def test_wrong_store_uuid(self):
+        # Start a cluster1 broker, then try to restart in cluster2
+        cluster1 = self.cluster(0, args=self.args())
+        a = cluster1.start("a", expect=EXPECT_EXIT_OK)
+        a.terminate()
+        cluster2 = self.cluster(1, args=self.args())
+        try:
+            a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
+            self.fail("Expected exception")
+        except: pass
+            

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=883910&r1=883909&r2=883910&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Nov 24 22:41:10 2009
@@ -62,10 +62,8 @@
       <field name="version" type="uint32"/>
       <field name="active" type="bit"/>
       <field name="cluster-id" type="uuid"/>>
-      <!-- Related to persistent store -->
       <field name="store-state" type="store-state"/>
-      <field name="start-uuid" type="uuid"/>
-      <field name="stop-uuid" type="uuid"/>
+      <field name="shutdown-id" type="uuid"/>
     </control>
 
     <!-- New member or updater is ready as an active member. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message