apr-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r649830 - in /apr/apr/trunk: CHANGES include/apr_poll.h poll/unix/epoll.c poll/unix/kqueue.c poll/unix/poll.c poll/unix/port.c poll/unix/select.c
Date Sat, 19 Apr 2008 16:26:45 GMT
Author: mturk
Date: Sat Apr 19 09:26:39 2008
New Revision: 649830

URL: http://svn.apache.org/viewvc?rev=649830&view=rev
Log:
Introduce (again) apr_pollset_wakeup API

Modified:
    apr/apr/trunk/CHANGES
    apr/apr/trunk/include/apr_poll.h
    apr/apr/trunk/poll/unix/epoll.c
    apr/apr/trunk/poll/unix/kqueue.c
    apr/apr/trunk/poll/unix/poll.c
    apr/apr/trunk/poll/unix/port.c
    apr/apr/trunk/poll/unix/select.c

Modified: apr/apr/trunk/CHANGES
URL: http://svn.apache.org/viewvc/apr/apr/trunk/CHANGES?rev=649830&r1=649829&r2=649830&view=diff
==============================================================================
--- apr/apr/trunk/CHANGES [utf-8] (original)
+++ apr/apr/trunk/CHANGES [utf-8] Sat Apr 19 09:26:39 2008
@@ -1,6 +1,10 @@
                                                      -*- coding: utf-8 -*-
 Changes for APR 1.4.0
 
+  *) Introduce apr_pollset_wakeup() for interrupting
+     the blocking apr_pollset_poll call.
+     [Mladen Turk]
+
   *) Implement apr_proc_wait_all_procs for windows.
      [Mladen Turk]
 

Modified: apr/apr/trunk/include/apr_poll.h
URL: http://svn.apache.org/viewvc/apr/apr/trunk/include/apr_poll.h?rev=649830&r1=649829&r2=649830&view=diff
==============================================================================
--- apr/apr/trunk/include/apr_poll.h (original)
+++ apr/apr/trunk/include/apr_poll.h Sat Apr 19 09:26:39 2008
@@ -56,6 +56,7 @@
  */
 #define APR_POLLSET_THREADSAFE 0x001 /**< Adding or Removing a Descriptor is thread safe
*/
 #define APR_POLLSET_NOCOPY     0x002 /**< Descriptors passed to apr_pollset_create() are
not copied */
+#define APR_POLLSET_WAKEABLE   0x004 /**< Pollset poll operation is interruptable */
 
 /** Used in apr_pollfd_t to determine what the apr_descriptor is */
 typedef enum { 
@@ -100,11 +101,17 @@
  * @param flags Optional flags to modify the operation of the pollset.
  *
  * @remark If flags equals APR_POLLSET_THREADSAFE, then a pollset is
- * created on which it is safe to make concurrent calls to
- * apr_pollset_add(), apr_pollset_remove() and apr_pollset_poll() from
- * separate threads.  This feature is only supported on some
- * platforms; the apr_pollset_create() call will fail with
- * APR_ENOTIMPL on platforms where it is not supported.
+ *         created on which it is safe to make concurrent calls to
+ *         apr_pollset_add(), apr_pollset_remove() and apr_pollset_poll()
+ *         from separate threads.  This feature is only supported on some
+ *         platforms; the apr_pollset_create() call will fail with
+ *         APR_ENOTIMPL on platforms where it is not supported.
+ * @remark If flags contain APR_POLLSET_WAKEABLE, then a pollset is
+ *         created with additional internal pipe object used for
+ *         apr_pollset_wakeup() call. The actual size of pollset is
+ *         in that case size + 1. This feature is only supported on some
+ *         platforms; the apr_pollset_create() call will fail with
+ *         APR_ENOTIMPL on platforms where it is not supported.
  */
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
@@ -160,12 +167,23 @@
  * @param timeout Timeout in microseconds
  * @param num Number of signalled descriptors (output parameter)
  * @param descriptors Array of signalled descriptors (output parameter)
+ * @remark If the pollset has been created with APR_POLLSET_WAKEABLE
+ *         and the wakeup has been called while waiting for activity
+ *         return value is APR_EINTR and num is set to number of signalled
+ *         descriptors at the time of wakeup call.
  */
 APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
                                            apr_interval_time_t timeout,
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors);
 
+/**
+ * Interrupt the blocked apr_pollset_poll call.
+ * @param pollset The pollset to use
+ * @remark If the pollset was not created with APR_POLLSET_WAKEABLE the
+ *         return value is APR_EINIT.
+ */
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset);
 
 /**
  * Poll the descriptors in the poll structure

Modified: apr/apr/trunk/poll/unix/epoll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/epoll.c?rev=649830&r1=649829&r2=649830&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/epoll.c (original)
+++ apr/apr/trunk/poll/unix/epoll.c Sat Apr 19 09:26:39 2008
@@ -65,6 +65,8 @@
     struct epoll_event *pollset;
     apr_pollfd_t *result_set;
     apr_uint32_t flags;
+    /* Pipe descriptors used for wakeup */
+    apr_file_t *wakeup_pipe[2];
 #if APR_HAS_THREADS
     /* A thread mutex to protect operations on the rings */
     apr_thread_mutex_t *ring_lock;
@@ -82,9 +84,57 @@
 {
     apr_pollset_t *pollset = (apr_pollset_t *) p_;
     close(pollset->epoll_fd);
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        /* Close both sides of the wakeup pipe */
+        if (pollset->wakeup_pipe[0]) {
+            apr_file_close(pollset->wakeup_pipe[0]);
+            pollset->wakeup_pipe[0] = NULL;
+        }
+        if (pollset->wakeup_pipe[1]) {
+            apr_file_close(pollset->wakeup_pipe[1]);
+            pollset->wakeup_pipe[1] = NULL;
+        }
+    }
     return APR_SUCCESS;
 }
 
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+                                   &pollset->wakeup_pipe[1],
+                                   pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+    char rb[512];
+    apr_size_t nr = sizeof(rb);
+
+    while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+        /* Although we write just one byte to the other end of the pipe
+         * during wakeup, multiple treads could call the wakeup.
+         * So simply drain out from the input side of the pipe all
+         * the data.
+         */
+        if (nr != sizeof(rb))
+            break;
+    }
+}
+
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
                                              apr_pool_t *p,
@@ -93,6 +143,10 @@
     apr_status_t rv;
     int fd;
 
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Add room for wakeup descriptor */
+        size++;
+    }
     fd = epoll_create(size);
     if (fd < 0) {
         *pollset = NULL;
@@ -121,7 +175,6 @@
     (*pollset)->pool = p;
     (*pollset)->epoll_fd = fd;
     (*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event));
-    apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
     (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
 
     if (!(flags & APR_POLLSET_NOCOPY)) {
@@ -129,6 +182,15 @@
         APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
         APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
     }
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Create wakeup pipe */
+        if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+            close(fd);
+            *pollset = NULL;
+            return rv;
+        }
+    }
+    apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
     return APR_SUCCESS;
 }
 
@@ -244,8 +306,9 @@
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors)
 {
-    int ret, i;
+    int ret, i, j;
     apr_status_t rv = APR_SUCCESS;
+    apr_pollfd_t fd;
 
     if (timeout > 0) {
         timeout /= 1000;
@@ -263,20 +326,49 @@
     }
     else {
         if (pollset->flags & APR_POLLSET_NOCOPY) {
-            for (i = 0; i < ret; i++) {
-                pollset->result_set[i] =
-                    *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
-                pollset->result_set[i].rtnevents =
-                    get_epoll_revent(pollset->pollset[i].events);
+            for (i = 0, j = 0; i < ret; i++) {
+                fd = *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
+               /* Check if the polled descriptor is our
+                 * wakeup pipe. In that case do not put it result set.
+                 */
+                if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                    fd.desc_type == APR_POLL_FILE &&
+                    fd.desc.f == pollset->wakeup_pipe[0]) {
+                        drain_wakeup_pipe(pollset);
+                        /* XXX: Is this a correct return value ?
+                         * We might simply return APR_SUCEESS.
+                         */
+                        rv = APR_EINTR;
+                }
+                else {
+                    pollset->result_set[j] = fd;
+                    pollset->result_set[j].rtnevents =
+                        get_epoll_revent(pollset->pollset[i].events);
+                    j++;
+                }
             }
+            (*num) = j;
         }
         else {
-            for (i = 0; i < ret; i++) {
-                pollset->result_set[i] =
-                    (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
-                pollset->result_set[i].rtnevents =
-                    get_epoll_revent(pollset->pollset[i].events);
+            for (i = 0, j = 0; i < ret; i++) {
+                fd = (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
+                if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                    fd.desc_type == APR_POLL_FILE &&
+                    fd.desc.f == pollset->wakeup_pipe[0]) {
+                        drain_wakeup_pipe(pollset);
+                        /* XXX: Is this a correct return value ?
+                         * We might simply return APR_SUCEESS.
+                         */
+                        rv = APR_EINTR;
+                }
+                else {
+                    pollset->result_set[j] = fd;
+                    pollset->result_set[j].rtnevents =
+                        get_epoll_revent(pollset->pollset[i].events);
+                    j++;
+                }
             }
+            (*num) = j;
         }
 
         if (descriptors) {
@@ -294,6 +386,14 @@
     }
 
     return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    else
+        return APR_EINIT;
 }
 
 struct apr_pollcb_t {

Modified: apr/apr/trunk/poll/unix/kqueue.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/kqueue.c?rev=649830&r1=649829&r2=649830&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/kqueue.c (original)
+++ apr/apr/trunk/poll/unix/kqueue.c Sat Apr 19 09:26:39 2008
@@ -44,6 +44,8 @@
     struct kevent *ke_set;
     apr_pollfd_t *result_set;
     apr_uint32_t flags;
+    /* Pipe descriptors used for wakeup */
+    apr_file_t *wakeup_pipe[2];
 #if APR_HAS_THREADS
     /* A thread mutex to protect operations on the rings */
     apr_thread_mutex_t *ring_lock;
@@ -61,9 +63,57 @@
 {
     apr_pollset_t *pollset = (apr_pollset_t *) p_;
     close(pollset->kqueue_fd);
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        /* Close both sides of the wakeup pipe */
+        if (pollset->wakeup_pipe[0]) {
+            apr_file_close(pollset->wakeup_pipe[0]);
+            pollset->wakeup_pipe[0] = NULL;
+        }
+        if (pollset->wakeup_pipe[1]) {
+            apr_file_close(pollset->wakeup_pipe[1]);
+            pollset->wakeup_pipe[1] = NULL;
+        }
+    }
     return APR_SUCCESS;
 }
 
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+                                   &pollset->wakeup_pipe[1],
+                                   pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+    char rb[512];
+    apr_size_t nr = sizeof(rb);
+
+    while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+        /* Although we write just one byte to the other end of the pipe
+         * during wakeup, multiple treads could call the wakeup.
+         * So simply drain out from the input side of the pipe all
+         * the data.
+         */
+        if (nr != sizeof(rb))
+            break;
+    }
+}
+
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
                                              apr_pool_t *p,
@@ -85,6 +135,11 @@
         return APR_ENOTIMPL;
     }
 #endif
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Add room for wakeup descriptor */
+        size++;
+    }
+
     (*pollset)->nelts = 0;
     (*pollset)->nalloc = size;
     (*pollset)->flags = flags;
@@ -101,14 +156,21 @@
         return apr_get_netos_error();
     }
 
-    apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
-                              apr_pool_cleanup_null);
-
     (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
 
     APR_RING_INIT(&(*pollset)->query_ring, pfd_elem_t, link);
     APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
     APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Create wakeup pipe */
+        if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+            close((*pollset)->kqueue_fd);
+            *pollset = NULL;
+            return rv;
+        }
+    }
+    apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
+                              apr_pool_cleanup_null);
 
     return rv;
 }
@@ -234,9 +296,10 @@
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors)
 {
-    int ret, i;
+    int ret, i, j;
     struct timespec tv, *tvptr;
     apr_status_t rv = APR_SUCCESS;
+    apr_pollfd_t fd;
 
     if (timeout < 0) {
         tvptr = NULL;
@@ -257,14 +320,26 @@
         rv = APR_TIMEUP;
     }
     else {
-        for (i = 0; i < ret; i++) {
-            pollset->result_set[i] =
-                (((pfd_elem_t*)(pollset->ke_set[i].udata))->pfd);
-            pollset->result_set[i].rtnevents =
-                get_kqueue_revent(pollset->ke_set[i].filter,
-                              pollset->ke_set[i].flags);
+        for (i = 0, j = 0; i < ret; i++) {
+            fd = (((pfd_elem_t*)(pollset->ke_set[i].udata))->pfd);
+            if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                fd.desc_type == APR_POLL_FILE &&
+                fd.desc.f == pollset->wakeup_pipe[0]) {
+                drain_wakeup_pipe(pollset);
+                /* XXX: Is this a correct return value ?
+                 * We might simply return APR_SUCEESS.
+                 */
+                rv = APR_EINTR;
+            }
+            else {
+                pollset->result_set[j] = fd;
+                pollset->result_set[j].rtnevents =
+                        get_kqueue_revent(pollset->ke_set[i].filter,
+                                          pollset->ke_set[i].flags);
+                j++;
+            }
         }
-
+        (*num) = j;
         if (descriptors) {
             *descriptors = pollset->result_set;
         }
@@ -281,6 +356,13 @@
     return rv;
 }
 
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    else
+        return APR_EINIT;
+}
 
 struct apr_pollcb_t {
     apr_pool_t *pool;

Modified: apr/apr/trunk/poll/unix/poll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/poll.c?rev=649830&r1=649829&r2=649830&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/poll.c (original)
+++ apr/apr/trunk/poll/unix/poll.c Sat Apr 19 09:26:39 2008
@@ -156,11 +156,69 @@
     apr_pool_t *pool;
     apr_uint32_t nelts;
     apr_uint32_t nalloc;
+    apr_uint32_t flags;
+    /* Pipe descriptors used for wakeup */
+    apr_file_t *wakeup_pipe[2];    
     struct pollfd *pollset;
     apr_pollfd_t *query_set;
     apr_pollfd_t *result_set;
 };
 
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+                                   &pollset->wakeup_pipe[1],
+                                   pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+    char rb[512];
+    apr_size_t nr = sizeof(rb);
+
+    while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+        /* Although we write just one byte to the other end of the pipe
+         * during wakeup, multiple treads could call the wakeup.
+         * So simply drain out from the input side of the pipe all
+         * the data.
+         */
+        if (nr != sizeof(rb))
+            break;
+    }
+}
+
+static apr_status_t wakeup_pipe_cleanup(void *p)
+{
+    apr_pollset_t *pollset = (apr_pollset_t *) p;
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        /* Close both sides of the wakeup pipe */
+        if (pollset->wakeup_pipe[0]) {
+            apr_file_close(pollset->wakeup_pipe[0]);
+            pollset->wakeup_pipe[0] = NULL;
+        }
+        if (pollset->wakeup_pipe[1]) {
+            apr_file_close(pollset->wakeup_pipe[1]);
+            pollset->wakeup_pipe[1] = NULL;
+        }
+    }
+
+    return APR_SUCCESS;
+}
+
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
                                              apr_pool_t *p,
@@ -170,20 +228,40 @@
         *pollset = NULL;
         return APR_ENOTIMPL;
     }
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Add room for wakeup descriptor */
+        size++;
+    }
 
     *pollset = apr_palloc(p, sizeof(**pollset));
     (*pollset)->nelts = 0;
     (*pollset)->nalloc = size;
     (*pollset)->pool = p;
+    (*pollset)->flags = flags;
     (*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd));
     (*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
     (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+
+    if (flags & APR_POLLSET_WAKEABLE) {
+        apr_status_t rv;
+        /* Create wakeup pipe */
+        if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+            *pollset = NULL;
+            return rv;
+        }
+        apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup,
+                                  apr_pool_cleanup_null);
+    }
     return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset)
 {
-    return APR_SUCCESS;
+    if (pollset->flags & APR_POLLSET_WAKEABLE) 
+        return apr_pool_cleanup_run(pollset->pool, pollset,
+                                    wakeup_pipe_cleanup);
+    else
+        return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
@@ -242,32 +320,57 @@
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors)
 {
-    int rv;
+    int ret;
+    apr_status_t rv = APR_SUCCESS;
     apr_uint32_t i, j;
 
     if (timeout > 0) {
         timeout /= 1000;
     }
-    rv = poll(pollset->pollset, pollset->nelts, timeout);
-    (*num) = rv;
-    if (rv < 0) {
+    ret = poll(pollset->pollset, pollset->nelts, timeout);
+    (*num) = ret;
+    if (ret < 0) {
         return apr_get_netos_error();
     }
-    if (rv == 0) {
+    else if (ret == 0) {
         return APR_TIMEUP;
     }
-    j = 0;
-    for (i = 0; i < pollset->nelts; i++) {
-        if (pollset->pollset[i].revents != 0) {
-            pollset->result_set[j] = pollset->query_set[i];
-            pollset->result_set[j].rtnevents =
-                get_revent(pollset->pollset[i].revents);
-            j++;
+    else {
+        for (i = 0, j = 0; i < pollset->nelts; i++) {
+            if (pollset->pollset[i].revents != 0) {
+                /* Check if the polled descriptor is our
+                 * wakeup pipe. In that case do not put it result set.
+                 */
+                if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                    pollset->query_set[i].desc_type == APR_POLL_FILE &&
+                    pollset->query_set[i].desc.f == pollset->wakeup_pipe[0]) {
+                        drain_wakeup_pipe(pollset);
+                        /* XXX: Is this a correct return value ?
+                         * We might simply return APR_SUCEESS.
+                         */
+                        rv = APR_EINTR;
+                }
+                else {
+                    pollset->result_set[j] = pollset->query_set[i];
+                    pollset->result_set[j].rtnevents =
+                        get_revent(pollset->pollset[i].revents);
+                    j++;
+                }
+            }
         }
+        (*num) = j;
     }
-    if (descriptors)
+    if (descriptors && (*num))
         *descriptors = pollset->result_set;
-    return APR_SUCCESS;
+    return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    else
+        return APR_EINIT;
 }
 
 APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,

Modified: apr/apr/trunk/poll/unix/port.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/port.c?rev=649830&r1=649829&r2=649830&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/port.c (original)
+++ apr/apr/trunk/poll/unix/port.c Sat Apr 19 09:26:39 2008
@@ -68,6 +68,8 @@
     port_event_t *port_set;
     apr_pollfd_t *result_set;
     apr_uint32_t flags;
+    /* Pipe descriptors used for wakeup */
+    apr_file_t *wakeup_pipe[2];
 #if APR_HAS_THREADS
     /* A thread mutex to protect operations on the rings */
     apr_thread_mutex_t *ring_lock;
@@ -86,9 +88,57 @@
 {
     apr_pollset_t *pollset = (apr_pollset_t *) p_;
     close(pollset->port_fd);
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        /* Close both sides of the wakeup pipe */
+        if (pollset->wakeup_pipe[0]) {
+            apr_file_close(pollset->wakeup_pipe[0]);
+            pollset->wakeup_pipe[0] = NULL;
+        }
+        if (pollset->wakeup_pipe[1]) {
+            apr_file_close(pollset->wakeup_pipe[1]);
+            pollset->wakeup_pipe[1] = NULL;
+        }
+    }
     return APR_SUCCESS;
 }
 
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+                                   &pollset->wakeup_pipe[1],
+                                   pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+    char rb[512];
+    apr_size_t nr = sizeof(rb);
+
+    while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+        /* Although we write just one byte to the other end of the pipe
+         * during wakeup, multiple treads could call the wakeup.
+         * So simply drain out from the input side of the pipe all
+         * the data.
+         */
+        if (nr != sizeof(rb))
+            break;
+    }
+}
+
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
                                              apr_pool_t *p,
@@ -110,6 +160,10 @@
         return APR_ENOTIMPL;
     }
 #endif
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Add room for wakeup descriptor */
+        size++;
+    }
     (*pollset)->nelts = 0;
     (*pollset)->nalloc = size;
     (*pollset)->flags = flags;
@@ -123,9 +177,6 @@
         return APR_ENOMEM;
     }
 
-    apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
-                              apr_pool_cleanup_null);
-
     (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
 
     APR_RING_INIT(&(*pollset)->query_ring, pfd_elem_t, link);
@@ -133,6 +184,17 @@
     APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
     APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
 
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Create wakeup pipe */
+        if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+            close((*pollset)->port_fd);
+            *pollset = NULL;
+            return rv;
+        }
+    }
+    apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
+                              apr_pool_cleanup_null);
+
     return rv;
 }
 
@@ -249,11 +311,12 @@
                                            const apr_pollfd_t **descriptors)
 {
     apr_os_sock_t fd;
-    int ret, i;
+    int ret, i, j;
     unsigned int nget;
     pfd_elem_t *ep;
     struct timespec tv, *tvptr;
     apr_status_t rv = APR_SUCCESS;
+    apr_pollfd_t fp;
 
     if (timeout < 0) {
         tvptr = NULL;
@@ -304,21 +367,32 @@
 
         pollset_lock_rings();
 
-        for (i = 0; i < nget; i++) {
-            pollset->result_set[i] =
-                (((pfd_elem_t*)(pollset->port_set[i].portev_user))->pfd);
-            pollset->result_set[i].rtnevents =
-                get_revent(pollset->port_set[i].portev_events);
-
-            APR_RING_REMOVE((pfd_elem_t*)pollset->port_set[i].portev_user, link);
-
-            APR_RING_INSERT_TAIL(&(pollset->add_ring), 
-                                 (pfd_elem_t*)pollset->port_set[i].portev_user,
-                                 pfd_elem_t, link);
+        for (i = 0, j = 0; i < nget; i++) {
+            fp = (((pfd_elem_t*)(pollset->port_set[i].portev_user))->pfd);
+            if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                fd.desc_type == APR_POLL_FILE &&
+                fd.desc.f == pollset->wakeup_pipe[0]) {
+                drain_wakeup_pipe(pollset);
+                /* XXX: Is this a correct return value ?
+                 * We might simply return APR_SUCEESS.
+                 */
+                rv = APR_EINTR;
+            }
+            else {
+                pollset->result_set[j] = fp;            
+                pollset->result_set[j].rtnevents =
+                    get_revent(pollset->port_set[i].portev_events);
+
+                APR_RING_REMOVE((pfd_elem_t*)pollset->port_set[i].portev_user,
+                                link);
+                APR_RING_INSERT_TAIL(&(pollset->add_ring), 
+                                (pfd_elem_t*)pollset->port_set[i].portev_user,
+                                pfd_elem_t, link);
+                j++;
+            }
         }
-
         pollset_unlock_rings();
-
+        (*num) = j;
         if (descriptors) {
             *descriptors = pollset->result_set;
         }
@@ -333,6 +407,14 @@
     pollset_unlock_rings();
 
     return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    else
+        return APR_EINIT;
 }
 
 struct apr_pollcb_t {

Modified: apr/apr/trunk/poll/unix/select.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/select.c?rev=649830&r1=649829&r2=649830&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/select.c (original)
+++ apr/apr/trunk/poll/unix/select.c Sat Apr 19 09:26:39 2008
@@ -179,11 +179,104 @@
     int maxfd;
     apr_pollfd_t *query_set;
     apr_pollfd_t *result_set;
+    apr_uint32_t flags;
+    /* Pipe descriptors used for wakeup */
+    apr_file_t *wakeup_pipe[2];
 #ifdef NETWARE
     int set_type;
 #endif
 };
 
+#if !APR_FILES_AS_SOCKETS
+#if defined (WIN32)
+
+extern apr_status_t
+apr_file_socket_pipe_create(apr_file_t **in,
+                            apr_file_t **out,
+                            apr_pool_t *p);
+
+/* Create a dummy wakeup socket pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_socket_pipe_create(&pollset->wakeup_pipe[0],
+                                          &pollset->wakeup_pipe[1],
+                                          pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+#else  /* !WIN32 */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    return APR_ENOTIMPL;
+}
+#endif /* WIN32 */
+#else  /* APR_FILES_AS_SOCKETS */
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+                                   &pollset->wakeup_pipe[1],
+                                   pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+#endif /* !APR_FILES_AS_SOCKETS */
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+    char rb[512];
+    apr_size_t nr = sizeof(rb);
+
+    while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+        /* Although we write just one byte to the other end of the pipe
+         * during wakeup, multiple treads could call the wakeup.
+         * So simply drain out from the input side of the pipe all
+         * the data.
+         */
+        if (nr != sizeof(rb))
+            break;
+    }
+}
+
+static apr_status_t wakeup_pipe_cleanup(void *p)
+{
+    apr_pollset_t *pollset = (apr_pollset_t *) p;
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        /* Close both sides of the wakeup pipe */
+        if (pollset->wakeup_pipe[0]) {
+            apr_file_close(pollset->wakeup_pipe[0]);
+            pollset->wakeup_pipe[0] = NULL;
+        }
+        if (pollset->wakeup_pipe[1]) {
+            apr_file_close(pollset->wakeup_pipe[1]);
+            pollset->wakeup_pipe[1] = NULL;
+        }
+    }
+
+    return APR_SUCCESS;
+}
+
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
                                              apr_pool_t *p,
@@ -193,6 +286,10 @@
         *pollset = NULL;
         return APR_ENOTIMPL;
     }
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Add room for wakeup descriptor */
+        size++;
+    }
 #ifdef FD_SETSIZE
     if (size > FD_SETSIZE) {
         *pollset = NULL;
@@ -203,6 +300,7 @@
     (*pollset)->nelts = 0;
     (*pollset)->nalloc = size;
     (*pollset)->pool = p;
+    (*pollset)->flags = flags;
     FD_ZERO(&((*pollset)->readset));
     FD_ZERO(&((*pollset)->writeset));
     FD_ZERO(&((*pollset)->exceptset));
@@ -213,12 +311,26 @@
     (*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
     (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
 
+    if (flags & APR_POLLSET_WAKEABLE) {
+        apr_status_t rv;
+        /* Create wakeup pipe */
+        if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+            *pollset = NULL;
+            return rv;
+        }
+        apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup,
+                                  apr_pool_cleanup_null);
+    }
     return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t * pollset)
 {
-    return APR_SUCCESS;
+    if (pollset->flags & APR_POLLSET_WAKEABLE) 
+        return apr_pool_cleanup_run(pollset->pool, pollset,
+                                    wakeup_pipe_cleanup);
+    else
+        return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
@@ -335,10 +447,11 @@
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors)
 {
-    int rv;
+    int rs;
     apr_uint32_t i, j;
     struct timeval tv, *tvptr;
     fd_set readset, writeset, exceptset;
+    apr_status_t rv = APR_SUCCESS;
 
     if (timeout < 0) {
         tvptr = NULL;
@@ -355,19 +468,19 @@
 
 #ifdef NETWARE
     if (HAS_PIPES(pollset->set_type)) {
-        rv = pipe_select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
+        rs = pipe_select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
                          tvptr);
     }
     else
 #endif
-        rv = select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
+        rs = select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
                     tvptr);
 
-    (*num) = rv;
-    if (rv < 0) {
+    (*num) = rs;
+    if (rs < 0) {
         return apr_get_netos_error();
     }
-    if (rv == 0) {
+    if (rs == 0) {
         return APR_TIMEUP;
     }
     j = 0;
@@ -377,11 +490,22 @@
             fd = pollset->query_set[i].desc.s->socketdes;
         }
         else {
+            if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                pollset->query_set[i].desc.f == pollset->wakeup_pipe[0]) {
+                drain_wakeup_pipe(pollset);
+                /* XXX: Is this a correct return value ?
+                 * We might simply return APR_SUCEESS.
+                 */
+                rv = APR_EINTR;
+                continue;
+            }
+            else {
 #if !APR_FILES_AS_SOCKETS
-            return APR_EBADF;
+                return APR_EBADF;
 #else
-            fd = pollset->query_set[i].desc.f->filedes;
+                fd = pollset->query_set[i].desc.f->filedes;
 #endif
+            }
         }
         if (FD_ISSET(fd, &readset) || FD_ISSET(fd, &writeset) ||
             FD_ISSET(fd, &exceptset)) {
@@ -403,7 +527,15 @@
 
     if (descriptors)
         *descriptors = pollset->result_set;
-    return APR_SUCCESS;
+    return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    else
+        return APR_EINIT;
 }
 
 APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,



Mime
View raw message