qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r725802 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/cluster/Cluster.cpp src/qpid/cluster/Cluster.h src/qpid/sys/PollableQueue.h xml/cluster.xml
Date Thu, 11 Dec 2008 20:33:26 GMT
Author: aconway
Date: Thu Dec 11 12:33:26 2008
New Revision: 725802

URL: http://svn.apache.org/viewvc?rev=725802&view=rev
Log:
sys/PollableQueue: dispatch in batches, allow put-back.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=725802&r1=725801&r2=725802&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Dec 11 12:33:26 2008
@@ -32,7 +32,6 @@
 #include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterConfigChangeBody.h"
 #include "qpid/framing/ClusterDumpOfferBody.h"
-#include "qpid/framing/ClusterDumpStartBody.h"
 #include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
@@ -79,7 +78,6 @@
     void ready(const std::string& url) { cluster.ready(member, url, l); }
     void configChange(const std::string& addresses) { cluster.configChange(member, addresses,
l); }
     void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee,
id, l); }
-    void dumpStart(uint64_t dumpee, const std::string& url) { cluster.dumpStart(member,
dumpee, url, l); }
     void shutdown() { cluster.shutdown(member, l); }
 
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -167,14 +165,16 @@
 
 void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
 
-bool Cluster::sendMcast(const Event& e) {
+void Cluster::sendMcast(PollableEventQueue::Queue& values) {
     try {
-        return e.mcast(cpg);
+        PollableEventQueue::Queue::iterator i = values.begin();
+        while (i != values.end() && i->mcast(cpg))
+            ++i;
+        values.erase(values.begin(), i);
     }
     catch (const std::exception& e) {
         QPID_LOG(critical, "Multicast failure: " << e.what());
         leave();
-        return false;
     }
 }
 
@@ -241,23 +241,23 @@
 }
 
 // Entry point: called when deliverQueue has events to process.
-bool Cluster::delivered(const Event& e) {
+void Cluster::delivered(PollableEventQueue::Queue& events) {
     try {
-        Lock l(lock);
-        delivered(e,l);
+        for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent,
this, _1));
+        events.clear();
     } catch (const std::exception& e) {
         QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
         leave();
     }
-    return true;
 }
 
-void Cluster::delivered(const Event& e, Lock& l) {
+void Cluster::deliveredEvent(const Event& e) {
     Buffer buf(e);
     AMQFrame frame;
     if (e.isCluster())  {
         while (frame.decode(buf)) {
             QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
+            Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope is too big.
             ClusterDispatcher dispatch(*this, e.getMemberId(), l);
             if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
                 throw Exception(QPID_MSG("Invalid cluster control"));
@@ -428,7 +428,7 @@
     if (dumper == myId) {
         assert(state == OFFER);
         if (url) {              // My offer was first.
-            dumpStart(myId, dumpee, url->str(), l);
+            dumpStart(dumpee, *url, l);
         }
         else {                  // Another offer was first.
             state = READY;
@@ -448,13 +448,11 @@
 
 // FIXME aconway 2008-10-15: no longer need a separate control now
 // that the dump control is in the deliver queue.
-void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const std::string&
urlStr, Lock&) {
+void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
     if (state == LEFT) return;
-    MemberId dumpee(dumpeeInt);
-    Url url(urlStr);
     assert(state == OFFER);
     state = DUMPER;
-    QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " <<
urlStr);
+    QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " <<
url);
     deliverQueue.stop();
     if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
     dumpThread = Thread(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=725802&r1=725801&r2=725802&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Dec 11 12:33:26 2008
@@ -115,7 +115,7 @@
     void leave(Lock&);
     std::vector<Url> getUrls(Lock&) const;
 
-    bool sendMcast(const Event& e);
+    void sendMcast(PollableEventQueue::Queue& );
     
     // Called via CPG, deliverQueue or DumpClient threads. 
     void tryMakeOffer(const MemberId&, Lock&);
@@ -128,12 +128,13 @@
     // 
     void dumpRequest(const MemberId&, const std::string&, Lock&);
     void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&,
Lock&);
-    void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const std::string&
urlStr, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void shutdown(const MemberId&, Lock&);
-    bool delivered(const Event&); // deliverQueue callback
-    void delivered(const Event&, Lock&); // unlocked version
+    void delivered(PollableEventQueue::Queue&); // deliverQueue callback
+    void deliveredEvent(const Event&); 
+
+    void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
 
     // CPG callbacks, called in CPG IO thread.
     void dispatch(sys::DispatchHandle&); // Dispatch CPG events.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=725802&r1=725801&r2=725802&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Thu Dec 11 12:33:26 2008
@@ -44,13 +44,13 @@
 template <class T>
 class PollableQueue {
   public:
+    typedef std::deque<T> Queue;
+
     /**
-     * Callback to process an item from the queue.
-     *
-     * @return If true the item is removed from the queue else it
-     * remains on the queue and the queue is stopped.
+     * Callback to process a batch of items from the queue.
+     * @param values to process, any items remaining after call are put back on the queue.
      */
-    typedef boost::function<bool (const T&)> Callback;
+    typedef boost::function<void (Queue& values)> Callback;
 
     /** When the queue is selected by the poller, values are passed to callback cb. */
     PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>&
poller);
@@ -73,7 +73,6 @@
     bool empty() { ScopedLock l(lock); return queue.empty(); }
     
   private:
-    typedef std::deque<T> Queue;
     typedef sys::Monitor::ScopedLock ScopedLock;
     typedef sys::Monitor::ScopedUnlock ScopedUnlock;
 
@@ -84,7 +83,7 @@
     boost::shared_ptr<sys::Poller> poller;
     PollableCondition condition;
     DispatchHandle handle;
-    Queue queue;
+    Queue queue, batch;
     Thread dispatcher;
     bool stopped;
 };
@@ -117,21 +116,19 @@
 }
 
 template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h)
{
-    ScopedLock l(lock);     // Prevent concurrent push
-    assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
+    ScopedLock l(lock);
+    assert(dispatcher.id() == 0);
     dispatcher = Thread::current();
     while (!stopped && !queue.empty()) {
-        T value = queue.front();
-        queue.pop_front();
-        bool ok = false;
-        {   // unlock to allow concurrent push or call to stop() in callback.
-            ScopedUnlock u(lock);
-            // FIXME aconway 2008-12-02: not exception safe if callback throws.
-            ok = callback(value);
+        assert(batch.empty());
+        batch.swap(queue);
+        {
+            ScopedUnlock u(lock);   // Allow concurrent push to queue.
+            callback(batch);
         }
-        if (!ok) { // callback cannot process value,  put it back.
-            queue.push_front(value);
-            stopped=true;
+        if (!batch.empty()) {
+            queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed
items.
+            batch.clear();
         }
     }
     dispatcher = Thread();

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=725802&r1=725801&r2=725802&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Dec 11 12:33:26 2008
@@ -36,13 +36,7 @@
       <field name="cluster-id" type="uuid"/>
     </control>
 
-    <control name = "dump-start" code="0x3" label="Used internally by dumper to mark stall
point.">
-      <field name="dumpee" type="uint64"/>
-      <field name="url" type="str16"/>      
-    </control>
-
-
-    <control name="ready" code="0x10" label="New member is ready.">
+Min     <control name="ready" code="0x10" label="New member is ready.">
       <field name="url" type="str16"/>
     </control>
 



Mime
View raw message