qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r489237 - in /incubator/qpid/branches/event-queue-2006-12-20/cpp: lib/common/sys/posix/EventChannel.cpp tests/run-system-tests tests/topictest
Date Thu, 21 Dec 2006 00:58:59 GMT
Author: aconway
Date: Wed Dec 20 16:58:59 2006
New Revision: 489237

URL: http://svn.apache.org/viewvc?view=rev&rev=489237
Log:

EventChannel.cpp: Simplified handling of wakeups for multiple events by
using epoll's level-triggered behavior to "put back" an event and let
it wake up later.

Modified:
    incubator/qpid/branches/event-queue-2006-12-20/cpp/lib/common/sys/posix/EventChannel.cpp
    incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/run-system-tests
    incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/topictest

Modified: incubator/qpid/branches/event-queue-2006-12-20/cpp/lib/common/sys/posix/EventChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/event-queue-2006-12-20/cpp/lib/common/sys/posix/EventChannel.cpp?view=diff&rev=489237&r1=489236&r2=489237
==============================================================================
--- incubator/qpid/branches/event-queue-2006-12-20/cpp/lib/common/sys/posix/EventChannel.cpp
(original)
+++ incubator/qpid/branches/event-queue-2006-12-20/cpp/lib/common/sys/posix/EventChannel.cpp
Wed Dec 20 16:58:59 2006
@@ -88,6 +88,10 @@
      */
     Event* wake(uint32_t epollFlags);
 
+    Event* pop() { Event* e = queue.front(); queue.pop_front(); return e; }
+
+    bool empty() { return queue.empty(); }
+
     void setBit(uint32_t &epollFlags);
 
     void shutdown();
@@ -117,7 +121,7 @@
     void activate(int epollFd_, int myFd_);
 
     /** Epoll woke up for this descriptor. */
-    EventPair wake(uint32_t epollEvents);
+    Event* wake(uint32_t epollEvents);
 
     /** Shut down: close and remove file descriptor.
      * May be re-activated if fd is reused.
@@ -134,11 +138,13 @@
   private:
     void update();
     void epollCtl(int op, uint32_t events);
+    Queue* pick();
 
     Mutex lock;
     int epollFd;
     int myFd;
     Queue inQueue, outQueue;
+    bool preferIn;
 
   friend class Queue;
 };
@@ -205,6 +211,7 @@
         epollFlags |= myEvent;
 }
 
+// TODO aconway 2006-12-20: REMOVE
 Event* EventChannel::Queue::wake(uint32_t epollFlags) {
     // Called with lock held.
     if (!queue.empty() && (isMyEvent(epollFlags))) {
@@ -259,6 +266,7 @@
     outQueue.shutdown();
 }
 
+// TODO aconway 2006-12-20: Inline into wake().
 void EventChannel::Descriptor::update() {
     // Caller holds lock.
     if (isShutdown())             // Nothing to do
@@ -276,12 +284,26 @@
     ee.data.ptr = this;
     ee.events = events;
     int status = ::epoll_ctl(epollFd, op, myFd, &ee);
-    if (status < 0)
-        throw QPID_POSIX_ERROR(errno);
+    if (status < 0) {
+        if (errno == EBADF)     // FD was closed externally.
+            shutdownUnsafe();
+        else
+            throw QPID_POSIX_ERROR(errno);
+    }
 }
     
 
-EventPair EventChannel::Descriptor::wake(uint32_t epollEvents) {
+EventChannel::Queue* EventChannel::Descriptor::pick() {
+    if (inQueue.empty() && outQueue.empty()) 
+        return 0;
+    if (inQueue.empty() || outQueue.empty())
+        return !inQueue.empty() ? &inQueue : &outQueue;
+    // Neither is empty, pick fairly.
+    preferIn = !preferIn;
+    return preferIn ? &inQueue : &outQueue;
+}
+
+Event* EventChannel::Descriptor::wake(uint32_t epollEvents) {
     Mutex::ScopedLock l(lock);
     // On error, shut down the Descriptor and both queues.
     if (epollEvents & (EPOLLERR | EPOLLHUP)) {
@@ -291,14 +313,24 @@
         // exception on the events. Can we get more accurate error
         // reporting somehow?
     }
-    // Check the queues even if shutdown - we want to drain the
-    // events that have been marked with exceptions.
-    EventPair ready(inQueue.wake(epollEvents), outQueue.wake(epollEvents));
+    Queue*q = 0;
+    bool in = (epollEvents & EPOLLIN);
+    bool out = (epollEvents & EPOLLOUT);
+    if ((in && out) || isShutdown()) 
+        q = pick();         // Choose fairly, either non-empty queue.
+    else if (in) 
+        q = &inQueue;
+    else if (out) 
+        q = &outQueue;
+    Event* e = (q && !q->empty()) ? q->pop() : 0;
     update();
-    return ready;
+    if (e)
+        e->complete(*this);
+    return e;
 }
 
 
+
 // ================================================================
 // EventChannel::Impl
 
@@ -405,11 +437,10 @@
         (timeoutNs == TIME_INFINITE) ? -1 : timeoutNs/TIME_MSEC;
     CleanStruct<epoll_event> ee;
     Event* event = 0;
-    bool doSwap = true;
 
     // Loop till we get a completed event. Some events may repost
     // themselves and return 0, e.g. incomplete read or write events.
-    //
+    //TODO aconway 2006-12-20: FIX THIS!
     while (!event) {
         int n = epoll_wait(epollFd, &ee, 1, timeoutMs); // Thread safe.
         if (n == 0)             // Timeout
@@ -426,21 +457,7 @@
         assert(ed != 0);
         // TODO aconway 2006-12-20: DEBUG
         cout << endl << epoll(ee.events) << endl;
-        EventPair ready = ed->wake(ee.events); 
-
-        // We can only return one event so if both completed push one
-        // onto the dispatch queue to be dispatched in another thread.
-        if (ready.first && ready.second) {
-            // Keep it fair: in & out take turns to be returned first.
-            if (doSwap)
-                swap(ready.first, ready.second);
-            doSwap = !doSwap;
-            event = ready.first;
-            dispatchQueue->push(ready.second);
-        }
-        else {
-            event = ready.first ? ready.first : ready.second;
-        }
+        event = ed->wake(ee.events);
     }
     return event;
 }

Modified: incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/run-system-tests
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/run-system-tests?view=diff&rev=489237&r1=489236&r2=489237
==============================================================================
--- incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/run-system-tests (original)
+++ incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/run-system-tests Wed Dec 20 16:58:59
2006
@@ -24,7 +24,7 @@
 }
 
 run_test ./client_test
-run_test ./topictest -l2 -m2 -b1
+run_test ./topictest -s2 -m2 -b1
 
 # Run the python tests.
 if test -d ../../python ;  then

Modified: incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/topictest
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/topictest?view=diff&rev=489237&r1=489236&r2=489237
==============================================================================
--- incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/topictest (original)
+++ incubator/qpid/branches/event-queue-2006-12-20/cpp/tests/topictest Wed Dec 20 16:58:59
2006
@@ -1,5 +1,5 @@
 #!/bin/bash
-# Run the c++ or topic test
+# Run the C++ topic test
 
 # Defaults
 SUBSCRIBERS=10
@@ -12,7 +12,7 @@
 	m) MESSAGES=$OPTARG ;;
 	b) BATCHES=$OPTARG ;;
 	?)
-	    echo "Usage: %0 [-l <subscribers>] [-m <messages.] [-b <batches>]"
+	    echo "Usage: %0 [-s <subscribers>] [-m <messages.] [-b <batches>]"
 	    exit 1
 	    ;;
     esac



Mime
View raw message