qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r722622 - in /incubator/qpid/trunk/qpid/cpp/src/qpid: cluster/Cpg.cpp cluster/Event.cpp sys/PollableQueue.h
Date Tue, 02 Dec 2008 21:43:25 GMT
Author: aconway
Date: Tue Dec  2 13:43:24 2008
New Revision: 722622

URL: http://svn.apache.org/viewvc?rev=722622&view=rev
Log:
PollableQueue: fix unsafe use of deque

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=722622&r1=722621&r2=722622&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Tue Dec  2 13:43:24 2008
@@ -103,11 +103,6 @@
 }
 
 bool Cpg::mcast(const iovec* iov, int iovLen) {
-    // Thread-safety note : the cpg_ calls are thread safe, but there
-    // is a race below between calling cpg_flow_control_state_get()
-    // and calling mcast_joined() where N threads could see the state
-    // as disabled and call mcast, but only M < N messages can be sent
-    // without exceeding flow control limits.
     if (isFlowControlEnabled()) {
         QPID_LOG(warning, "CPG flow control enabled")
         return false;
@@ -135,13 +130,13 @@
 string Cpg::errorStr(cpg_error_t err, const std::string& msg) {
     switch (err) {
       case CPG_OK: return msg+": ok";
-      case CPG_ERR_LIBRARY: return msg+": library";
+      case CPG_ERR_LIBRARY: return msg+": library error";
       case CPG_ERR_TIMEOUT: return msg+": timeout";
       case CPG_ERR_TRY_AGAIN: return msg+": timeout. The aisexec daemon may not be running";
       case CPG_ERR_INVALID_PARAM: return msg+": invalid param";
       case CPG_ERR_NO_MEMORY: return msg+": no memory";
       case CPG_ERR_BAD_HANDLE: return msg+": bad handle";
-      case CPG_ERR_ACCESS: return msg+": access denied. You may need to set your group ID
to 'ais'";
+      case CPG_ERR_ACCESS: return msg+": access denied.";
       case CPG_ERR_NOT_EXIST: return msg+": not exist";
       case CPG_ERR_EXIST: return msg+": exist";
       case CPG_ERR_NOT_SUPPORTED: return msg+": not supported";

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=722622&r1=722621&r2=722622&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Dec  2 13:43:24 2008
@@ -39,7 +39,8 @@
 
 Event Event::delivered(const MemberId& m, void* d, size_t s) {
     Buffer buf(static_cast<char*>(d), s);
-    EventType type((EventType)buf.getOctet()); 
+    EventType type((EventType)buf.getOctet());
+    assert(type == DATA || type == CONTROL);
     ConnectionId connection(m, reinterpret_cast<Connection*>(buf.getLongLong()));
     uint32_t id = buf.getLong();
     assert(buf.getPosition() == OVERHEAD);
@@ -62,6 +63,7 @@
     b.putOctet(type);
     b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
     b.putLong(id);
+    assert(buf.getPosition() == OVERHEAD);
     iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize()
} };
     return cpg.mcast(iov, sizeof(iov)/sizeof(*iov));
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=722622&r1=722621&r2=722622&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Tue Dec  2 13:43:24 2008
@@ -121,14 +121,18 @@
     assert(dispatcher.id() == 0 || dispatcher.id() == Thread::current().id());
     dispatcher = Thread::current();
     while (!stopped && !queue.empty()) {
+        T value = queue.front();
+        queue.pop_front();
         bool ok = false;
         {   // unlock to allow concurrent push or call to stop() in callback.
             ScopedUnlock u(lock);
-            // FIXME aconway 2008-12-02: exception-safe if callback throws.
-            ok = callback(queue.front());
+            // FIXME aconway 2008-12-02: not exception safe if callback throws.
+            ok = callback(value);
+        }
+        if (!ok) { // callback cannot process value,  put it back.
+            queue.push_front(value);
+            stopped=true;
         }
-        if (ok) queue.pop_front();
-        else stopped=true;
     }
     dispatcher = Thread();
     if (queue.empty()) condition.clear();



Mime
View raw message