qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r884226 - in /qpid/trunk/qpid: cpp/src/qpid/cluster/ cpp/src/tests/ cpp/xml/ python/qpid/
Date Wed, 25 Nov 2009 18:36:09 GMT
Author: aconway
Date: Wed Nov 25 18:36:09 2009
New Revision: 884226

URL: http://svn.apache.org/viewvc?rev=884226&view=rev
Log:
Consistency checks for persistent cluster startup.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Nov 25 18:36:09 2009
@@ -175,7 +175,7 @@
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 835547;
+const uint32_t Cluster::CLUSTER_VERSION = 884125;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -202,7 +202,7 @@
         cluster.errorCheck(member, type, frameSeq, l);
     }
 
-    void shutdown() { cluster.shutdown(member, l); }
+    void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
 
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
 };
@@ -287,7 +287,7 @@
       default:
         assert(0);
     }
-    QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster
" << name << " with url=" << myUrl);
+    QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << "
cluster " << name);
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
@@ -601,6 +601,7 @@
     // Called on completion of the initial status map. 
     if (state == INIT) {
         // We have status for all members so we can make join descisions.
+        initMap.checkConsistent();
         elders = initMap.getElders();
         QPID_LOG(debug, *this << " elders: " << elders);
         if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
@@ -611,17 +612,8 @@
         else {
             QPID_LOG(info, this << " active for links.");
         }
-        // Check that cluster ID matches persistent store.
-        Uuid agreedId = initMap.getClusterId();
-        if (store.hasStore()) {
-            Uuid storeId = store.getClusterId();
-            if (storeId && storeId != agreedId)
-                throw Exception(
-                    QPID_MSG("Persistent cluster-id " << storeId 
-                             << " doesn't match cluster " << agreedId));
-            store.dirty(agreedId);
-        }
-        setClusterId(agreedId, l);
+        setClusterId(initMap.getClusterId(), l);
+        if (store.hasStore()) store.dirty(clusterId);
 
         if (initMap.isUpdateNeeded())  { // Joining established cluster.
             broker.setRecovery(false); // Ditch my current store.
@@ -822,13 +814,13 @@
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
         discarding = false;     // ok to set, we're stalled for update.
-        QPID_LOG(notice, *this << " update complete, starting catch-up, members: "
<< map);
+        QPID_LOG(notice, *this << " update complete, starting catch-up.");
         deliverEventQueue.start();
     }
     else if (updateRetracted) { // Update was retracted, request another update
         updateRetracted = false;
         state = JOINER;
-        QPID_LOG(notice, *this << " update retracted, sending new update request");
+        QPID_LOG(notice, *this << " update retracted, sending new update request.");
         mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
         deliverEventQueue.start();
     }
@@ -853,10 +845,9 @@
     updateOutDone(l);
 }
 
-void Cluster ::shutdown(const MemberId& , Lock& l) {
+void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
     QPID_LOG(notice, *this << " cluster shut down by administrator.");
-    // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command.
-    if (store.hasStore()) store.clean(Uuid(true));
+    if (store.hasStore()) store.clean(Uuid(id));
     leave(l);
 }
 
@@ -885,13 +876,13 @@
 }
 
 void Cluster::stopClusterNode(Lock& l) {
-    QPID_LOG(notice, *this << " stopped by admin");
+    QPID_LOG(notice, *this << " cluster member stopped by administrator.");
     leave(l);
 }
 
 void Cluster::stopFullCluster(Lock& ) {
     QPID_LOG(notice, *this << " shutting down cluster " << name);
-    mcast.mcastControl(ClusterShutdownBody(), self);
+    mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self);
 }
 
 void Cluster::memberUpdate(Lock& l) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Nov 25 18:36:09 2009
@@ -160,7 +160,7 @@
     void messageExpired(const MemberId&, uint64_t, Lock& l);
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
 
-    void shutdown(const MemberId&, Lock&);
+    void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
 
     // Helper functions
     ConnectionPtr getConnection(const EventFrame&, Lock&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp Wed Nov 25 18:36:09 2009
@@ -29,6 +29,7 @@
 using namespace std;
 using namespace boost;
 using namespace framing::cluster;
+using namespace framing;
 
 InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
     : self(self_), completed(), resendNeeded(), size(size_)
@@ -106,7 +107,6 @@
 }
 
 bool InitialStatusMap::isUpdateNeeded() {
-    // FIXME aconway 2009-11-20: consistency checks isComplete or here?
     assert(isComplete());
     // We need an update if there are any active members.
     if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
@@ -145,7 +145,43 @@
     if (i != map.end())
         return i->second->getClusterId(); // An active member
     else
-        return map.begin()->second->getClusterId();
+        return map.begin()->second->getClusterId(); // Youngest newcomer in node-id
order
 }
 
+void InitialStatusMap::checkConsistent() {
+    assert(isComplete());
+    bool persistent = (map.begin()->second->getStoreState() != STORE_STATE_NO_STORE);
+    Uuid clusterId;
+    for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+        // Must not mix transient and persistent members.
+        if (persistent != (i->second->getStoreState() != STORE_STATE_NO_STORE))
+            throw Exception("Mixing transient and persistent brokers in a cluster");
+        // Members with non-empty stores must have same cluster-id
+        switch (i->second->getStoreState()) {
+          case STORE_STATE_NO_STORE:
+          case STORE_STATE_EMPTY_STORE:
+            break;               
+          case STORE_STATE_DIRTY_STORE:
+          case STORE_STATE_CLEAN_STORE:
+            if (!clusterId) clusterId = i->second->getClusterId();
+            assert(clusterId);
+            if (clusterId != i->second->getClusterId())
+                throw Exception("Cluster-id mismatch, brokers belonged to different clusters.");
+        }
+    }
+    // If this is a newly forming cluster, clean stores must have same shutdown-id
+    if (find_if(map.begin(), map.end(), &isActive) == map.end()) {
+        Uuid shutdownId;
+        for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+            if (i->second->getStoreState() == STORE_STATE_CLEAN_STORE) {
+                if (!shutdownId) shutdownId = i->second->getShutdownId();
+                assert(shutdownId);
+                if (shutdownId != i->second->getShutdownId())
+                    throw Exception("Shutdown-id mismatch, brokers were not shut down together.");
+            }
+        }
+    }
+}
+
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/InitialStatusMap.h Wed Nov 25 18:36:09 2009
@@ -56,13 +56,14 @@
     bool isUpdateNeeded();
     /**@pre isComplete(). @return Cluster-wide cluster ID. */
     framing::Uuid getClusterId();
+    /**@pre isComplete(). @throw Exception if there are any inconsistencies. */
+    void checkConsistent();
 
   private:
     typedef std::map<MemberId, boost::optional<Status> > Map;
     static bool notInitialized(const Map::value_type&);
     static bool isActive(const Map::value_type&);
     static bool hasStore(const Map::value_type&);
-    void check();
     Map map;
     MemberSet firstConfig;
     MemberId self;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.cpp Wed Nov 25 18:36:09 2009
@@ -85,8 +85,6 @@
 }
 
 void StoreStatus::clean(const Uuid& shutdownId_) {
-    assert(clusterId);              // FIXME aconway 2009-11-20: throw exception
-    assert(shutdownId_);
     state = STORE_STATE_CLEAN_STORE;
     shutdownId = shutdownId_;
     save();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/StoreStatus.h Wed Nov 25 18:36:09 2009
@@ -50,7 +50,6 @@
     void save();
 
     bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
-    bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; }
 
   private:
     framing::cluster::StoreState state;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Nov 25 18:36:09 2009
@@ -122,9 +122,9 @@
     def test_persistent_restart(self):
         """Verify persistent cluster shutdown/restart scenarios"""
         cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
-        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait_for_start=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait_for_start=False)
-        c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait_for_start=True)
+        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+        c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True)
         a.send_message("q", Message("1", durable=True))
         # Kill & restart one member.
         c.kill()
@@ -135,30 +135,30 @@
         # Shut down the entire cluster cleanly and bring it back up
         a.send_message("q", Message("3", durable=True))
         qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()])      
-        a = cluster.start("a", wait_for_start=False)
-        b = cluster.start("b", wait_for_start=False)
-        c = cluster.start("c", wait_for_start=True)
+        a = cluster.start("a", wait=False)
+        b = cluster.start("b", wait=False)
+        c = cluster.start("c", wait=True)
         self.assertEqual(a.get_message("q").content, "3")
 
     def test_persistent_partial_failure(self):
         # Kill 2 members, shut down the last cleanly then restart
         # Ensure we use the clean database
         cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
-        a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
-        b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
-        c = cluster.start("c", expect=EXPECT_EXIT_OK, wait_for_start=True)
+        a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+        b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+        c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
         a.send_message("q", Message("4", durable=True))
         a.kill()
         b.kill()
         self.assertEqual(c.get_message("q").content, "4")
         c.send_message("q", Message("clean", durable=True))
         qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()])              
-        a = cluster.start("a", wait_for_start=False)
-        b = cluster.start("b", wait_for_start=False)
-        c = cluster.start("c", wait_for_start=True)
+        a = cluster.start("a", wait=False)
+        b = cluster.start("b", wait=False)
+        c = cluster.start("c", wait=True)
         self.assertEqual(a.get_message("q").content, "clean")
         
-    def test_wrong_store_uuid(self):
+    def test_wrong_cluster_id(self):
         # Start a cluster1 broker, then try to restart in cluster2
         cluster1 = self.cluster(0, args=self.args())
         a = cluster1.start("a", expect=EXPECT_EXIT_OK)
@@ -168,4 +168,25 @@
             a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
             self.fail("Expected exception")
         except: pass
-            
+
+    def test_wrong_shutdown_id(self):
+        # Start 2 members and shut down.
+        cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
+        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+        b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+        self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+        self.assertEqual(a.wait(), 0)
+        self.assertEqual(b.wait(), 0)
+
+        # Restart with a different member and shut down.
+        a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+        c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
+        self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+        self.assertEqual(a.wait(), 0)
+        self.assertEqual(c.wait(), 0)
+
+        # Mix members from both shutdown events, they should fail
+        a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+        b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+
+

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov 25 18:36:09 2009
@@ -92,9 +92,11 @@
       <field name="type" type="error-type"/>
       <field name="frame-seq" type="sequence-no"/>
     </control>
-    
 
-    <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
+    <!-- Shut down the entire cluster -->
+    <control name="shutdown" code="0x20">
+      <field name="shutdown-id" type="uuid"/>
+    </control>
 
   </class>
 

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=884226&r1=884225&r2=884226&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Wed Nov 25 18:36:09 2009
@@ -215,7 +215,7 @@
 
     _cluster_count = 0
 
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True):
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
         self.test = test
         self._brokers=[]
         self.name = "cluster%d" % Cluster._cluster_count
@@ -225,17 +225,17 @@
         self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid())
]
         assert BrokerTest.cluster_lib
         self.args += [ "--load-module", BrokerTest.cluster_lib ]
-        self.start_n(count, expect=expect, wait_for_start=wait_for_start)
+        self.start_n(count, expect=expect, wait=wait)
 
-    def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True):
+    def start(self, name=None, expect=EXPECT_RUNNING, wait=True):
         """Add a broker to the cluster. Returns the index of the new broker."""
         if not name: name="%s-%d" % (self.name, len(self._brokers))
         log.debug("Cluster %s starting member %s" % (self.name, name))
-        self._brokers.append(self.test.broker(self.args, name, expect, wait_for_start))
+        self._brokers.append(self.test.broker(self.args, name, expect, wait))
         return self._brokers[-1]
 
-    def start_n(self, count, expect=EXPECT_RUNNING, wait_for_start=True):
-        for i in range(count): self.start(expect=expect, wait_for_start=wait_for_start)
+    def start_n(self, count, expect=EXPECT_RUNNING, wait=True):
+        for i in range(count): self.start(expect=expect, wait=wait)
 
     # Behave like a list of brokers.
     def __len__(self): return len(self._brokers)
@@ -275,8 +275,6 @@
             except Exception, e: err.append(str(e))
         if err: raise Exception("Unexpected process status:\n    "+"\n    ".join(err))
 
-    # FIXME aconway 2009-11-06: check for core files of exited processes.
-    
     def cleanup_stop(self, stopable):
         """Call thing.stop at end of test"""
         self.stopem.append(stopable)
@@ -288,17 +286,21 @@
         self.cleanup_stop(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait_for_start=True):
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True):
         """Create and return a broker ready for use"""
         b = Broker(self, args=args, name=name, expect=expect)
-        if (wait_for_start): b.connect().close()
+        if (wait): b.connect().close()
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True):
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
         """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect, wait_for_start=wait_for_start)
+        cluster = Cluster(self, count, args, expect=expect, wait=wait)
         return cluster
 
+    def wait():
+        """Wait for all brokers in the cluster to be ready"""
+        for b in _brokers: b.connect().close()
+        
 class RethrownException(Exception):
     """Captures the original stack trace to be thrown later""" 
     def __init__(self, e, msg=""):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message