qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r722614 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: broker/ cluster/ sys/
Date Tue, 02 Dec 2008 20:41:49 GMT
Author: aconway
Date: Tue Dec  2 12:41:49 2008
New Revision: 722614

URL: http://svn.apache.org/viewvc?rev=722614&view=rev
Log:
Cluster: handle CPG flow-control conditions.
PollableQueue: allow dispatch functions to refuse dispatch.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp Tue Dec  2 12:41:49
2008
@@ -118,10 +118,12 @@
 
 std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection&
c)
 {
+    static bool needWarning = true;
     if (c.getBroker().getOptions().auth) {
         return std::auto_ptr<SaslAuthenticator>(new CyrusAuthenticator(c));
     } else {
         QPID_LOG(warning, "SASL: No Authentication Performed");
+        needWarning = false;
         return std::auto_ptr<SaslAuthenticator>(new NullAuthenticator(c));
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Dec  2 12:41:49 2008
@@ -99,7 +99,7 @@
         boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
     deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
-    mcastQueue(boost::bind(&Event::mcast, _1, boost::cref(name), boost::ref(cpg)), poller),
+    mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller),
     mcastId(0),
     mgmtObject(0),
     state(INIT),
@@ -109,7 +109,7 @@
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
         qmf::Package  packageInit(agent);
-        mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),myUrl.str());
+        mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
         agent->addObject (mgmtObject);
         mgmtObject->set_status("JOINING");
     }
@@ -118,7 +118,7 @@
     cpgDispatchHandle.startWatch(poller);
     deliverQueue.start();
     mcastQueue.start();
-    QPID_LOG(notice, *this << " joining cluster " << name.str() << " with
url=" << myUrl);
+    QPID_LOG(notice, *this << " joining cluster " << name << " with url="
<< myUrl);
     if (useQuorum) quorum.init();
     cpg.join(name);
     broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last
for exception safety.
@@ -184,6 +184,17 @@
         mcastQueue.push(e);
 }
 
+bool Cluster::sendMcast(const Event& e) {
+    try {
+        return e.mcast(cpg);
+    }
+    catch (const std::exception& e) {
+        QPID_LOG(critical, "Multicast failure: " << e.what());
+        leave();
+        return false;
+    }
+}
+
 std::vector<Url> Cluster::getUrls() const {
     Lock l(lock);
     return getUrls(l);
@@ -201,10 +212,10 @@
 void Cluster::leave(Lock&) { 
     if (state != LEFT) {
         state = LEFT;
-        QPID_LOG(notice, *this << " leaving cluster " << name.str());
+        QPID_LOG(notice, *this << " leaving cluster " << name);
         if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
         if (!deliverQueue.isStopped()) deliverQueue.stop();
-        try { cpg.leave(name); }
+        try { cpg.leave(); }
         catch (const std::exception& e) {
             QPID_LOG(critical, *this << " error leaving process group: " << e.what());
         }
@@ -224,7 +235,7 @@
         }
         else {                  // New shadow connection
             std::ostringstream mgmtId;
-            mgmtId << name.str() << ":"  << connectionId;
+            mgmtId << name << ":"  << connectionId;
             ConnectionMap::value_type value(connectionId,
                                             new Connection(*this, shadowOut, mgmtId.str(),
connectionId));
             i = connections.insert(value).first;
@@ -260,7 +271,7 @@
 }
 
 // Entry point: called when deliverQueue has events to process.
-void Cluster::delivered(const Event& e) {
+bool Cluster::delivered(const Event& e) {
     try {
         Lock l(lock);
         delivered(e,l);
@@ -268,7 +279,7 @@
         QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
         leave();
     }
-
+    return true;
 }
 
 void Cluster::delivered(const Event& e, Lock& l) {
@@ -334,6 +345,7 @@
 void Cluster::dispatch(sys::DispatchHandle& h) {
     try {
         cpg.dispatchAll();
+        mcastQueue.start();     // In case it was stopped by flow control.
         h.rewatch();
     } catch (const std::exception& e) {
         QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
@@ -359,7 +371,7 @@
     cpg_address */*joined*/, int /*nJoined*/)
 {
     Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current,
nCurrent) 
+    QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent)

              << AddrList(left, nLeft, "( ", ")"));
     std::string addresses;
     for (cpg_address* p = current; p < current+nCurrent; ++p) 
@@ -387,7 +399,7 @@
         }
         else {                  // Joining established group.
             state = NEWBIE;
-            QPID_LOG(info, *this << " joining established cluster");
+            QPID_LOG(info, *this << " joining cluster: " << map);
             mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), l);
         }
     }
@@ -542,12 +554,12 @@
 }
 
 void Cluster::stopFullCluster(Lock& l) {
-    QPID_LOG(notice, *this << " shutting down cluster " << name.str());
+    QPID_LOG(notice, *this << " shutting down cluster " << name);
     mcastControl(ClusterShutdownBody(), l);
 }
 
 void Cluster::memberUpdate(Lock& l) {
-    QPID_LOG(info, *this << map.memberCount() << " members: " << map);
+    QPID_LOG(info, *this << " member update: " << map);
     std::vector<Url> urls = getUrls(l);
     size_t size = urls.size();
     failoverExchange->setUrls(urls);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Dec  2 12:41:49 2008
@@ -118,6 +118,8 @@
     void leave(Lock&);
     std::vector<Url> getUrls(Lock&) const;
 
+    bool sendMcast(const Event& e);
+    
     // Called via CPG, deliverQueue or DumpClient threads. 
     void tryMakeOffer(const MemberId&, Lock&);
 
@@ -133,7 +135,7 @@
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void shutdown(const MemberId&, Lock&);
-    void delivered(const Event&); // deliverQueue callback
+    bool delivered(const Event&); // deliverQueue callback
     void delivered(const Event&, Lock&); // unlocked version
 
     // CPG callbacks, called in CPG IO thread.
@@ -183,7 +185,7 @@
     broker::Broker& broker;
     boost::shared_ptr<sys::Poller> poller;
     Cpg cpg;
-    const Cpg::Name name;
+    const std::string name;
     const Url myUrl;
     const MemberId myId;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Dec  2 12:41:49 2008
@@ -29,6 +29,7 @@
 #include "qpid/log/Statement.h"
 
 #include <boost/utility/in_place_factory.hpp>
+#include <boost/scoped_ptr.hpp>
 
 namespace qpid {
 namespace cluster {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Dec  2 12:41:49 2008
@@ -87,12 +87,13 @@
     }
 }
 
-void Cpg::join(const Name& group) {
-    check(cpg_join(handle, const_cast<Name*>(&group)),cantJoinMsg(group));
+void Cpg::join(const std::string& name) {
+    group = name;
+    check(cpg_join(handle, &group), cantJoinMsg(group));
 }
     
-void Cpg::leave(const Name& group) {
-    check(cpg_leave(handle,const_cast<Name*>(&group)),cantLeaveMsg(group));
+void Cpg::leave() {
+    check(cpg_leave(handle, &group), cantLeaveMsg(group));
 }
 
 bool Cpg::isFlowControlEnabled() {
@@ -101,29 +102,22 @@
     return flowState == CPG_FLOW_CONTROL_ENABLED;
 }
 
-// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping.
-void Cpg::waitForFlowControl() {
-    int delayNs=1000;           // one millisecond
-    int tries=8;                // double the delay on each try.
-    while (isFlowControlEnabled() && tries > 0) {
-        QPID_LOG(warning, "CPG flow control enabled, retry in " << delayNs <<
"ns");
-        ::usleep(delayNs);
-        --tries;
-        delayNs *= 2;
-    };
-    if (tries == 0) {
-        // FIXME aconway 2008-08-07: this is a fatal leave-the-cluster condition.
-        throw Cpg::Exception("CPG flow control enabled, failed to send.");
+bool Cpg::mcast(const iovec* iov, int iovLen) {
+    // Thread-safety note : the cpg_ calls are thread safe, but there
+    // is a race below between calling cpg_flow_control_state_get()
+    // and calling mcast_joined() where N threads could see the state
+    // as disabled and call mcast, but only M < N messages can be sent
+    // without exceeding flow control limits.
+    if (isFlowControlEnabled()) {
+        QPID_LOG(warning, "CPG flow control enabled")
+        return false;
     }
-}
-
-void Cpg::mcast(const Name& group, const iovec* iov, int iovLen) {
-    waitForFlowControl();
     cpg_error_t result;
     do {
         result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov),
iovLen);
         if (result != CPG_ERR_TRY_AGAIN) check(result, cantMcastMsg(group));
     } while(result == CPG_ERR_TRY_AGAIN);
+    return true;
 }
 
 void Cpg::shutdown() {
@@ -134,6 +128,10 @@
     }
 }
 
+void Cpg::dispatch(cpg_dispatch_t type) {
+    check(cpg_dispatch(handle,type), "Error in CPG dispatch");
+}
+
 string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
     switch (err) {
       case CPG_OK: return msg+": ok";
@@ -173,8 +171,14 @@
     return MemberId(nodeid, getpid());
 }
 
+namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; }
}
+
 ostream& operator <<(ostream& out, const MemberId& id) {
-    return out << std::hex << id.first << ":" << std::dec <<
id.second;
+    out << byte(id.first, 0) << "."
+        << byte(id.first, 1) << "."
+        << byte(id.first, 2) << "."
+        << byte(id.first, 3);
+    return out << ":" << id.second;
 }
 
 ostream& operator<<(ostream& o, const ConnectionId& c) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Tue Dec  2 12:41:49 2008
@@ -19,16 +19,15 @@
  *
  */
 
-#include "qpid/cluster/types.h"
-#include "qpid/cluster/Dispatchable.h"
-
 #include "qpid/Exception.h"
+#include "qpid/cluster/Dispatchable.h"
+#include "qpid/cluster/types.h"
 #include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
 
 #include <boost/scoped_ptr.hpp>
 
 #include <cassert>
-
 #include <string.h>
 
 extern "C" {
@@ -38,7 +37,6 @@
 namespace qpid {
 namespace cluster {
 
-
 /**
  * Lightweight C++ interface to cpg.h operations.
  * 
@@ -53,6 +51,7 @@
     };
 
     struct Name : public cpg_name {
+        Name() { length = 0; }
         Name(const char* s) { copy(s, strlen(s)); }
         Name(const char* s, size_t n) { copy(s,n); }
         Name(const std::string& s) { copy(s.data(), s.size()); }
@@ -105,17 +104,21 @@
      * - CPG_DISPATCH_ALL - dispatch all available events, don't wait.
      * - CPG_DISPATCH_BLOCKING - blocking dispatch loop.
      */
-    void dispatch(cpg_dispatch_t type) {
-        check(cpg_dispatch(handle,type), "Error in CPG dispatch");
-    }
+    void dispatch(cpg_dispatch_t type);
 
     void dispatchOne() { dispatch(CPG_DISPATCH_ONE); }
     void dispatchAll() { dispatch(CPG_DISPATCH_ALL); }
     void dispatchBlocking() { dispatch(CPG_DISPATCH_BLOCKING); }
 
-    void join(const Name& group);    
-    void leave(const Name& group);
-    void mcast(const Name& group, const iovec* iov, int iovLen);
+    void join(const std::string& group);    
+    void leave();
+
+    /** Multicast to the group. NB: must not be called concurrently.
+     * 
+     *@return true if the message was multi-cast, false if
+     * it was not sent due to flow control.
+     */
+    bool mcast(const iovec* iov, int iovLen);
 
     cpg_handle_t getHandle() const { return handle; }
 
@@ -123,10 +126,13 @@
 
     int getFd();
     
+    bool isFlowControlEnabled();
+    
   private:
     static std::string errorStr(cpg_error_t err, const std::string& msg);
     static std::string cantJoinMsg(const Name&);
-    static std::string cantLeaveMsg(const Name&); std::string cantMcastMsg(const Name&);
+    static std::string cantLeaveMsg(const Name&);
+    static std::string cantMcastMsg(const Name&);
 
     static void check(cpg_error_t result, const std::string& msg) {
         if (result != CPG_OK) throw Exception(errorStr(result, msg));
@@ -150,12 +156,11 @@
         struct cpg_address *joined, int nJoined
     );
 
-    bool isFlowControlEnabled();
-    void waitForFlowControl();
-
     cpg_handle_t handle;
     Handler& handler;
     bool isShutdown;
+    Name group;
+    sys::Mutex dispatchLock;
 };
 
 inline bool operator==(const cpg_name& a, const cpg_name& b) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Tue Dec  2 12:41:49 2008
@@ -85,11 +85,11 @@
 
 // TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
 
-DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url,
+DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&
url,
                        broker::Broker& broker, const ClusterMap& m, const Cluster::Connections&
cons,
                        const boost::function<void()>& ok,
                        const boost::function<void(const std::exception&)>&
fail)
-    : dumperId(to), dumpeeId(from), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),

+    : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),

       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Dec  2 12:41:49 2008
@@ -56,14 +56,14 @@
     return e;
 }
     
-void Event::mcast (const Cpg::Name& name, Cpg& cpg) const {
+bool Event::mcast (Cpg& cpg) const {
     char header[OVERHEAD];
     Buffer b(header, OVERHEAD);
     b.putOctet(type);
     b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
     b.putLong(id);
     iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize()
} };
-    cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
+    return cpg.mcast(iov, sizeof(iov)/sizeof(*iov));
 }
 
 Event::operator Buffer() const  {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Dec  2 12:41:49 2008
@@ -51,7 +51,7 @@
     /** Create an event containing a control */
     static Event control(const framing::AMQBody&, const ConnectionId&, uint32_t id=0);
     
-    void mcast(const Cpg::Name& name, Cpg& cpg) const;
+    bool mcast(Cpg& cpg) const;
     
     EventType getType() const { return type; }
     ConnectionId getConnectionId() const { return connectionId; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=722614&r1=722613&r2=722614&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Tue Dec  2 12:41:49 2008
@@ -44,8 +44,13 @@
 template <class T>
 class PollableQueue {
   public:
-    /** Callback to process a range of items. */
-    typedef boost::function<void (const T&)> Callback;
+    /**
+     * Callback to process an item from the queue.
+     *
+     * @return If true the item is removed from the queue else it
+     * remains on the queue and the queue is stopped.
+     */
+    typedef boost::function<bool (const T&)> Callback;
 
     /** When the queue is selected by the poller, values are passed to callback cb. */
     PollableQueue(const Callback& cb, const boost::shared_ptr<sys::Poller>&
poller);
@@ -66,6 +71,7 @@
 
     size_t size() { ScopedLock l(lock); return queue.size(); }
     bool empty() { ScopedLock l(lock); return queue.empty(); }
+    
   private:
     typedef std::deque<T> Queue;
     typedef sys::Monitor::ScopedLock ScopedLock;
@@ -94,7 +100,7 @@
 
 template <class T> void PollableQueue<T>::start() {
     ScopedLock l(lock);
-    assert(stopped);
+    if (!stopped) return;
     stopped = false;
     if (!queue.empty()) condition.set();
     handle.rewatch();
@@ -115,25 +121,27 @@
     assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
     dispatcher = Thread::current();
     while (!stopped && !queue.empty()) {
-        T value = queue.front();
-        queue.pop_front();
-        {   // callback outside the lock to allow concurrent push.
+        bool ok = false;
+        {   // unlock to allow concurrent push or call to stop() in callback.
             ScopedUnlock u(lock);
-            callback(value);
+            // FIXME aconway 2008-12-02: exception-safe if callback throws.
+            ok = callback(queue.front());
         }
+        if (ok) queue.pop_front();
+        else stopped=true;
     }
+    dispatcher = Thread();
     if (queue.empty()) condition.clear();
     if (stopped) lock.notifyAll();
-    dispatcher = Thread();
-    if (!stopped) h.rewatch();
+    else h.rewatch();
 }
 
 template <class T> void PollableQueue<T>::stop() {
     ScopedLock l(lock);
-    assert(!stopped);
+    if (stopped) return;
     handle.unwatch();
     stopped = true;
-    // No deadlock if stop is called from the dispatcher thread
+    // Avoid deadlock if stop is called from the dispatch thread
     while (dispatcher.id() && dispatcher.id() != Thread::current().id())
         lock.wait();
 }



Mime
View raw message