qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r960681 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/tests/ cpp/xml/ python/qpid/ tools/src/py/
Date Mon, 05 Jul 2010 20:12:09 GMT
Author: aconway
Date: Mon Jul  5 20:12:08 2010
New Revision: 960681

URL: http://svn.apache.org/viewvc?rev=960681&view=rev
Log:
Defer delivery of messages in cluster-unsafe context.

Messages enqueued in a cluster-safe context are synchronized across
the cluster.  However some messages are delivered in a cluster-unsafe
context, for example raising a link established event occurs the
connection thread of the establishing connection.

This fix deferrs such messages by multicasting them so they can be
re-delived in a cluster safe context.

See https://bugzilla.redhat.com/show_bug.cgi?id=611543

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml
    qpid/trunk/qpid/python/qpid/brokertest.py
    qpid/trunk/qpid/tools/src/py/qpid-printevents

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Jul  5 20:12:08 2010
@@ -162,7 +162,8 @@ Broker::Broker(const Broker::Options& co
     clusterUpdatee(false),
     expiryPolicy(new ExpiryPolicy),
     connectionCounter(conf.maxConnections),
-    getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
+    getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
+    deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
 {
     if (conf.enableMgmt) {
         QPID_LOG(info, "Management enabled");
@@ -492,6 +493,10 @@ Broker::getKnownBrokersImpl()
     return knownBrokers;
 }
 
+bool Broker::deferDeliveryImpl(const std::string& ,
+                               const boost::intrusive_ptr<Message>& )
+{ return false; }
+
 void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
     clusterTimer = t;
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Jul  5 20:12:08 2010
@@ -69,6 +69,7 @@ struct Url;
 namespace broker {
 
 class ExpiryPolicy;
+class Message;
 
 static const  uint16_t DEFAULT_PORT=5672;
 
@@ -168,6 +169,8 @@ public:
     QueueEvents queueEvents;
     std::vector<Url> knownBrokers;
     std::vector<Url> getKnownBrokersImpl();
+    bool deferDeliveryImpl(const std::string& queue,
+                           const boost::intrusive_ptr<Message>& msg);
     std::string federationTag;
     bool recovery;
     bool clusterUpdatee;
@@ -273,6 +276,16 @@ public:
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
     
     ConnectionCounter& getConnectionCounter() {return connectionCounter;}
+
+    /**
+     * Never true in a stand-alone broker. In a cluster, return true
+     * to defer delivery of messages deliveredg in a cluster-unsafe
+     * context.
+     *@return true if delivery of a message should be deferred.
+     */
+    boost::function<bool (const std::string& queue,
+                          const boost::intrusive_ptr<Message>& msg)> deferDelivery;
+
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Jul  5 20:12:08 2010
@@ -142,6 +142,9 @@ bool Queue::isExcluded(boost::intrusive_
 }
 
 void Queue::deliver(boost::intrusive_ptr<Message> msg){
+    // Check for deferred delivery in a cluster.
+    if (broker && broker->deferDelivery(name, msg))
+        return;
     if (msg->isImmediate() && getConsumerCount() == 0) {
         if (alternateExchange) {
             DeliverableMessage deliverable(msg);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Jul  5 20:12:08 2010
@@ -137,6 +137,8 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SignalHandler.h"
 #include "qpid/framing/AMQFrame.h"
@@ -154,6 +156,7 @@
 #include "qpid/framing/ClusterConnectionAnnounceBody.h"
 #include "qpid/framing/ClusterErrorCheckBody.h"
 #include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterDeliverToQueueBody.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Helpers.h"
 #include "qpid/log/Statement.h"
@@ -232,9 +235,10 @@ struct ClusterDispatcher : public framin
     }
     void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l);
}
     void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
-
     void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
-
+    void deliverToQueue(const std::string& queue, const std::string& message) {
+        cluster.deliverToQueue(queue, message, l);
+    }
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
 };
 
@@ -310,6 +314,7 @@ void Cluster::initialize() {
     else
         myUrl = settings.url;
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+    broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
     broker.setExpiryPolicy(expiryPolicy);
     dispatcher.start();
     deliverEventQueue.bypassOff();
@@ -1099,4 +1104,30 @@ bool Cluster::isElder() const {
     return elder;
 }
 
+void Cluster::deliverToQueue(const std::string& queue, const std::string& message,
Lock& l)
+{
+    broker::Queue::shared_ptr q = broker.getQueues().find(queue);
+    if (!q) {
+        QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " <<
queue);
+        leave(l);
+    }
+    framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+    boost::intrusive_ptr<broker::Message> msg(new broker::Message);
+    msg->decodeHeader(buf);
+    msg->decodeContent(buf);
+    q->deliver(msg);
+}
+
+bool Cluster::deferDeliveryImpl(const std::string& queue,
+                                const boost::intrusive_ptr<broker::Message>& msg)
+{
+    if (isClusterSafe()) return false;
+    std::string message;
+    message.resize(msg->encodedSize());
+    framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+    msg->encode(buf);
+    mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self);
+    return true;
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Jul  5 20:12:08 2010
@@ -54,6 +54,10 @@
 
 namespace qpid {
 
+namespace broker {
+class Message;
+}
+
 namespace framing {
 class AMQBody;
 class Uuid;
@@ -124,6 +128,10 @@ class Cluster : private Cpg::Handler, pu
     // Generates a log message for debugging purposes.
     std::string debugSnapshot();
 
+    // Defer messages delivered in an unsafe context by multicasting.
+    bool deferDeliveryImpl(const std::string& queue,
+                           const boost::intrusive_ptr<broker::Message>& msg);
+
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
@@ -173,8 +181,8 @@ class Cluster : private Cpg::Handler, pu
     void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
     void timerWakeup(const MemberId&, const std::string& name, Lock&);
     void timerDrop(const MemberId&, const std::string& name, Lock&);
-
     void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
+    void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
 
     // Helper functions
     ConnectionPtr getConnection(const EventFrame&, Lock&);

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Mon Jul  5 20:12:08 2010
@@ -157,7 +157,21 @@ acl allow all all
             self.fail("Expected exception")
         except messaging.exceptions.NotFound: pass
 
-        
+    def test_link_events(self):
+        """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
+        args = ["--mgmt-pub-interval", 1] # Publish management information every second.
+        broker1 = self.cluster(1, args)[0]
+        broker2 = self.cluster(1, args)[0]
+        qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING)
+        qr = self.popen(["qpid-route", "route", "add",
+                         broker1.host_port(), broker2.host_port(),
+                         "amq.fanout", "key"
+                         ], EXPECT_EXIT_OK)
+        # Look for link event in printevents output.
+        retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out")))
+        broker1.ready()
+        broker2.ready()
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Jul  5 20:12:08 2010
@@ -110,6 +110,12 @@
       <field name="shutdown-id" type="uuid"/>
     </control>
 
+    <!-- Deliver a message to a queue -->
+    <control name="deliver-to-queue" code="0x21">
+      <field name="queue" type="str16"/>
+      <field name="message" type="vbin32"/>
+    </control>
+
   </class>
 
   <!-- Controls associated with a specific connection. -->

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Mon Jul  5 20:12:08 2010
@@ -251,6 +251,12 @@ def checkenv(name):
     if not value: raise Exception("Environment variable %s is not set" % name)
     return value
 
+def find_in_file(str, filename):
+    if not os.path.exists(filename): return False
+    f = open(filename)
+    try: return str in f.read()
+    finally: f.close()
+
 class Broker(Popen):
     "A broker process. Takes care of start, stop and logging."
     _broker_count = 0
@@ -367,15 +373,7 @@ class Broker(Popen):
     def log_ready(self):
         """Return true if the log file exists and contains a broker ready message"""
         if self._log_ready: return True
-        if not os.path.exists(self.log): return False
-        f = open(self.log)
-        try:
-            for l in f:
-                if "notice Broker running" in l:
-                    self._log_ready = True
-                    return True
-            return False
-        finally: f.close()
+        self._log_ready = find_in_file("notice Broker running", self.log)
 
     def ready(self, **kwargs):
         """Wait till broker is ready to serve clients"""

Modified: qpid/trunk/qpid/tools/src/py/qpid-printevents
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-printevents?rev=960681&r1=960680&r2=960681&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-printevents (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-printevents Mon Jul  5 20:12:08 2010
@@ -29,13 +29,15 @@ from qmf.console import Console, Session
 class EventConsole(Console):
   def event(self, broker, event):
     print event
+    sys.stdout.flush()
 
   def brokerConnected(self, broker):
     print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s"
% broker.getUrl()
+    sys.stdout.flush()
 
   def brokerDisconnected(self, broker):
     print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s"
% broker.getUrl()
-
+    sys.stdout.flush()
 
 ##
 ## Main Program



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message