qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1060568 - in /qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/
Date Tue, 18 Jan 2011 20:43:41 GMT
Author: aconway
Date: Tue Jan 18 20:43:41 2011
New Revision: 1060568

URL: http://svn.apache.org/viewvc?rev=1060568&view=rev
Log:
QPID-2982 Bug 669452 - Creating a route and using management tools can crash cluster members.

Cluster update did not include federation link and bridge
objects. Fixed update to include them.

Management linkUp and linkDown events were generated only on the
broker receiving the link. Suppressed these events in a cluster.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Jan 18 20:43:41 2011
@@ -60,8 +60,7 @@ Bridge::Bridge(Link* _link, framing::Cha
             (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
              args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
              args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
-        if (!args.i_durable)
-            agent->addObject(mgmtObject);
+        agent->addObject(mgmtObject);
     }
     QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
 }
@@ -167,10 +166,6 @@ void Bridge::destroy()
 
 void Bridge::setPersistenceId(uint64_t pId) const
 {
-    if (mgmtObject != 0 && persistenceId == 0) {
-        ManagementAgent* agent = link->getBroker()->getManagementAgent();
-        agent->addObject (mgmtObject, pId);
-    }
     persistenceId = pId;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jan 18 20:43:41 2011
@@ -30,6 +30,7 @@
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/sys/ClusterSafe.h"
 
 using namespace qpid::broker;
 using qpid::framing::Buffer;
@@ -130,9 +131,12 @@ void Link::established ()
 {
     stringstream addr;
     addr << host << ":" << port;
-
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
-    agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
+    // Don't raise the management event in a cluster, other members wont't get this call.
+    if (!sys::isCluster()) 
+        agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
+
     {
         Mutex::ScopedLock mutex(lock);
         setStateLH(STATE_OPERATIONAL);
@@ -150,11 +154,13 @@ void Link::closed (int, std::string text
 
     connection = 0;
 
+    // Don't raise the management event in a cluster, other members wont't get this call.
     if (state == STATE_OPERATIONAL) {
         stringstream addr;
         addr << host << ":" << port;
         QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
-        agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
+        if (!sys::isCluster())
+            agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
 
     for (Bridges::iterator i = active.begin(); i != active.end(); i++) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Jan 18 20:43:41 2011
@@ -379,3 +379,12 @@ void LinkRegistry::setPassive(bool p) 
     passive = p;
     //will activate or passivate links on maintenance visit
 }
+
+void LinkRegistry::eachLink(boost::function<void(boost::shared_ptr<Link>)> f)
{
+    for (LinkMap::iterator i = links.begin(); i != links.end(); ++i) f(i->second);
+}
+
+void LinkRegistry::eachBridge(boost::function<void(boost::shared_ptr<Bridge>)>
f) {
+    for (BridgeMap::iterator i = bridges.begin(); i != bridges.end(); ++i) f(i->second);
+}
+

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Jan 18 20:43:41 2011
@@ -31,6 +31,7 @@
 #include "qpid/management/Manageable.h"
 #include <boost/shared_ptr.hpp>
 #include <boost/intrusive_ptr.hpp>
+#include <boost/function.hpp>
 
 namespace qpid {
 namespace broker {
@@ -148,6 +149,12 @@ namespace broker {
          * bridges won't therefore pull or push any messages.
          */
         void setPassive(bool);
+
+        
+        /** Iterate over each link in the registry. Used for cluster updates. */
+        void eachLink(boost::function<void(boost::shared_ptr<Link>)> f);
+        /** Iterate over each bridge in the registry. Used for cluster updates. */
+        void eachBridge(boost::function<void(boost::shared_ptr< Bridge>)> f);
     };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jan 18 20:43:41 2011
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1045272;
+const uint32_t Cluster::CLUSTER_VERSION = 1058747;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jan 18 20:43:41 2011
@@ -32,6 +32,8 @@
 #include "qpid/broker/RecoveredEnqueue.h"
 #include "qpid/broker/RecoveredDequeue.h"
 #include "qpid/broker/Exchange.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Bridge.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
@@ -346,13 +348,12 @@ size_t Connection::decode(const char* da
 // returns true if the header is complete or already read.
 bool Connection::checkProtocolHeader(const char*& data, size_t size) {
     if (expectProtocolHeader) {
-        //If this is an outgoing link, we will receive a protocol
-        //header which needs to be decoded first
+        // This is an outgoing link connection, we will receive a protocol
+        // header which needs to be decoded first
         framing::ProtocolInitiation pi;
         Buffer buf(const_cast<char*&>(data), size);
         if (pi.decode(buf)) {
             //TODO: check the version is correct
-            QPID_LOG(debug, "Outgoing clustered link connection received INIT(" <<
pi << ")");
             expectProtocolHeader = false;
             data += pi.encodedSize();
         } else {
@@ -650,5 +651,25 @@ void Connection::managementSetupState(
     agent->setUuid(id);
     agent->setName(vendor, product, instance);
 }
+
+void Connection::config(const std::string& encoded) {
+    Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+    string kind;
+    buf.getShortString (kind);
+    if (kind == "link") {
+        broker::Link::shared_ptr link =
+            broker::Link::decode(cluster.getBroker().getLinks(), buf);
+        QPID_LOG(debug, cluster << " updated link "
+                 << link->getHost() << ":" << link->getPort());
+    }
+    else if (kind == "bridge") {
+        broker::Bridge::shared_ptr bridge =
+            broker::Bridge::decode(cluster.getBroker().getLinks(), buf);
+        QPID_LOG(debug, cluster << " updated bridge " << bridge->getName());
+    }
+    else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
+}
+
+
 }} // Namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jan 18 20:43:41 2011
@@ -183,7 +183,9 @@ class Connection :
                               const std::string& vendor,
                               const std::string& product,
                               const std::string& instance);
-    
+
+    void config(const std::string& encoded);
+
     void setSecureConnection ( broker::SecureConnection * sc );
 
   private:

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Jan 18 20:43:41 2011
@@ -34,6 +34,9 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/Bridge.h"
+#include "qpid/broker/Link.h"
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
@@ -167,7 +170,7 @@ void UpdateClient::update() {
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1));
 
     ClusterConnectionProxy(session).expiryId(expiry.getId());
-
+    updateLinks();
     updateManagementAgent();
 
     session.close();
@@ -199,6 +202,14 @@ template <class T> std::string encode(co
     t.encode(buf);
     return encoded;
 }
+
+template <class T> std::string encode(const T& t, bool encodeKind) {
+    std::string encoded;
+    encoded.resize(t.encodedSize());
+    framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+    t.encode(buf, encodeKind);
+    return encoded;
+}
 } // namespace
 
 
@@ -583,4 +594,21 @@ void UpdateClient::updateQueueListener(s
     ClusterConnectionProxy(session).addQueueListener(q, n);
 }
 
+void UpdateClient::updateLinks() {
+    broker::LinkRegistry& links = updaterBroker.getLinks();
+    links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1));
+    links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1));
+}
+
+void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
+    QPID_LOG(debug, *this << " updating link "
+             << link->getHost() << ":" << link->getPort());
+    ClusterConnectionProxy(session).config(encode(*link));
+}
+
+void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge)
{
+    QPID_LOG(debug, *this << " updating bridge " << bridge->getName());
+    ClusterConnectionProxy(session).config(encode(*bridge));
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Jan 18 20:43:41 2011
@@ -49,6 +49,8 @@ class DeliveryRecord;
 class SessionState;
 class SemanticState;
 class Decoder;
+class Link;
+class Bridge;
 
 } // namespace broker
 
@@ -99,6 +101,10 @@ class UpdateClient : public sys::Runnabl
     void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>&
c);
     void updateManagementSetupState();
     void updateManagementAgent();
+    void updateLinks();
+    void updateLink(const boost::shared_ptr<broker::Link>&);
+    void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+
 
     Numbering<broker::SemanticState::ConsumerImpl::shared_ptr> consumerNumbering;
     MemberId updaterId;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py Tue Jan 18 20:43:41 2011
@@ -58,7 +58,8 @@ def filter_log(log):
             'warning Broker closed connection: 200, OK',
             'task late',
             'task overran',
-            'warning CLOSING .* unsent data'
+            'warning CLOSING .* unsent data',
+            'Inter-broker link '
             ])
         if re.compile(skip).search(l): continue
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Jan 18 20:43:41 2011
@@ -26,6 +26,7 @@ from qpid.messaging import Message, Empt
 from threading import Thread, Lock
 from logging import getLogger
 from itertools import chain
+from tempfile import NamedTemporaryFile
 
 log = getLogger("qpid.cluster_tests")
 
@@ -264,6 +265,45 @@ acl allow all all
         cluster.start()
         self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
 
+    def test_route_update(self):
+        """Regression test for https://issues.apache.org/jira/browse/QPID-2982
+        Links and bridges associated with routes were not replicated on update.
+        This meant extra management objects and caused an exit if a management
+        client was attached.
+        """
+        args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+        cluster0 = self.cluster(1, args=args)
+        cluster1 = self.cluster(1, args=args)
+        assert 0 == subprocess.call(
+            ["qpid-route", "route", "add", cluster0[0].host_port(),
+             cluster1[0].host_port(), "dummy-exchange", "dummy-key", "-d"])
+        cluster0.start()
+
+        # Wait for qpid-tool:list on cluster0[0] to generate expected output.
+        pattern = re.compile("org.apache.qpid.broker.*link")
+        qpid_tool = subprocess.Popen(["qpid-tool", cluster0[0].host_port()],
+                                     stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+        class Scanner(Thread):
+            def __init__(self): self.found = False; Thread.__init__(self)
+            def run(self):
+                for l in qpid_tool.stdout:
+                    if pattern.search(l): self.found = True; return
+        scanner = Scanner()
+        scanner.start()
+        start = time.time()        
+        try:
+            # Wait up to 5 second timeout for scanner to find expected output
+            while not scanner.found and time.time() < start + 5:
+                qpid_tool.stdin.write("list\n") # Ask qpid-tool to list
+                for b in cluster0: b.ready() # Raise if any brokers are down
+        finally:
+            qpid_tool.stdin.write("quit\n")
+            qpid_tool.wait()
+            scanner.join()
+        assert scanner.found
+        # Verify logs are consistent
+        cluster_test_logs.verify_logs(glob.glob("*.log"))
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1060568&r1=1060567&r2=1060568&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jan 18 20:43:41 2011
@@ -272,5 +272,9 @@
       <field name="product" type="str32"/>
       <field name="instance" type="str32"/>
     </control>
+
+    <!-- Replicate encoded config objects - e.g. links and bridges. -->
+    <control name="config" code="0x37"><field name="encoded" type="str32"/></control>
   </class>
+
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message