apr-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mt...@apache.org
Subject svn commit: r647540 - in /apr/apr/trunk: CHANGES include/apr_poll.h poll/unix/epoll.c poll/unix/kqueue.c poll/unix/poll.c poll/unix/port.c poll/unix/select.c test/testpoll.c
Date Sun, 13 Apr 2008 08:31:10 GMT
Author: mturk
Date: Sun Apr 13 01:31:03 2008
New Revision: 647540

URL: http://svn.apache.org/viewvc?rev=647540&view=rev
Log:
Introduce apr_pollset_wakeup()

Modified:
    apr/apr/trunk/CHANGES
    apr/apr/trunk/include/apr_poll.h
    apr/apr/trunk/poll/unix/epoll.c
    apr/apr/trunk/poll/unix/kqueue.c
    apr/apr/trunk/poll/unix/poll.c
    apr/apr/trunk/poll/unix/port.c
    apr/apr/trunk/poll/unix/select.c
    apr/apr/trunk/test/testpoll.c

Modified: apr/apr/trunk/CHANGES
URL: http://svn.apache.org/viewvc/apr/apr/trunk/CHANGES?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/CHANGES [utf-8] (original)
+++ apr/apr/trunk/CHANGES [utf-8] Sun Apr 13 01:31:03 2008
@@ -1,6 +1,10 @@
                                                      -*- coding: utf-8 -*-
 Changes for APR 1.3.0
 
+  *) Introduce apr_pollset_wakeup() for interrupting
+     the blocking apr_pollset_poll call.
+     [Mladen Turk]
+
   *) Introduce apr_pool_sys_allocator_set() for registering
      application provided memory allocation functions that
      will APR use internally whenever malloc/free is needed.

Modified: apr/apr/trunk/include/apr_poll.h
URL: http://svn.apache.org/viewvc/apr/apr/trunk/include/apr_poll.h?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/include/apr_poll.h (original)
+++ apr/apr/trunk/include/apr_poll.h Sun Apr 13 01:31:03 2008
@@ -56,6 +56,7 @@
  */
 #define APR_POLLSET_THREADSAFE 0x001 /**< Adding or Removing a Descriptor is thread safe
*/
 #define APR_POLLSET_NOCOPY     0x002 /**< Descriptors passed to apr_pollset_create() are
not copied */
+#define APR_POLLSET_WAKEABLE   0x004 /**< Pollset poll operation is interruptable */
 
 /** Used in apr_pollfd_t to determine what the apr_descriptor is */
 typedef enum { 
@@ -166,6 +167,11 @@
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors);
 
+/**
+ * Interrupt the blocked apr_pollset_poll call.
+ * @param pollset The pollset to use
+ */
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset);
 
 /**
  * Poll the descriptors in the poll structure

Modified: apr/apr/trunk/poll/unix/epoll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/epoll.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/epoll.c (original)
+++ apr/apr/trunk/poll/unix/epoll.c Sun Apr 13 01:31:03 2008
@@ -68,6 +68,8 @@
 #if APR_HAS_THREADS
     /* A thread mutex to protect operations on the rings */
     apr_thread_mutex_t *ring_lock;
+    /* Pipe descriptors used for wakeup */
+    apr_file_t *wakeup_pipe[2];    
 #endif
     /* A ring containing all of the pollfd_t that are active */
     APR_RING_HEAD(pfd_query_ring_t, pfd_elem_t) query_ring;
@@ -80,11 +82,61 @@
 
 static apr_status_t backend_cleanup(void *p_)
 {
+    apr_status_t rv = APR_SUCCESS;
     apr_pollset_t *pollset = (apr_pollset_t *) p_;
+
     close(pollset->epoll_fd);
-    return APR_SUCCESS;
+#if APR_HAS_THREADS
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        /* Close both sides of the wakeup pipe */
+        rv |= apr_file_close(pollset->wakeup_pipe[0]);
+        rv |= apr_file_close(pollset->wakeup_pipe[1]);
+    }
+#endif
+    return rv;
+}
+
+#if APR_HAS_THREADS
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+                                   &pollset->wakeup_pipe[1],
+                                   pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+    char rb[512];
+    apr_size_t nr = sizeof(rb);
+
+    while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+        /* Although we write just one byte to the other end of the pipe
+         * during wakeup, multiple treads could call the wakeup.
+         * So simply drain out from the input side of the pipe all
+         * the data.
+         */
+        if (nr != sizeof(rb))
+            break;
+    }
 }
 
+#endif
+
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
                                              apr_pool_t *p,
@@ -93,6 +145,13 @@
     apr_status_t rv;
     int fd;
 
+#if APR_HAS_THREADS
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Add room for wakeup descriptor */
+        size++;
+    }
+#endif
+
     fd = epoll_create(size);
     if (fd < 0) {
         *pollset = NULL;
@@ -110,7 +169,8 @@
         return rv;
     }
 #else
-    if (flags & APR_POLLSET_THREADSAFE) {
+    if (flags & APR_POLLSET_THREADSAFE ||
+        flags & APR_POLLSET_WAKEABLE) {
         *pollset = NULL;
         return APR_ENOTIMPL;
     }
@@ -121,7 +181,6 @@
     (*pollset)->pool = p;
     (*pollset)->epoll_fd = fd;
     (*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event));
-    apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
     (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
 
     if (!(flags & APR_POLLSET_NOCOPY)) {
@@ -129,6 +188,18 @@
         APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
         APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
     }
+#if APR_HAS_THREADS
+    if (flags & APR_POLLSET_WAKEABLE) {
+        /* Create wakeup pipe */
+        if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+            close(fd);
+            *pollset = NULL;
+            return rv;
+        }
+    }
+#endif
+    apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
+
     return APR_SUCCESS;
 }
 
@@ -244,8 +315,9 @@
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors)
 {
-    int ret, i;
+    int ret, i, j;
     apr_status_t rv = APR_SUCCESS;
+    apr_pollfd_t fd;
 
     if (timeout > 0) {
         timeout /= 1000;
@@ -263,23 +335,59 @@
     }
     else {
         if (pollset->flags & APR_POLLSET_NOCOPY) {
-            for (i = 0; i < ret; i++) {
-                pollset->result_set[i] =
-                    *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
-                pollset->result_set[i].rtnevents =
-                    get_epoll_revent(pollset->pollset[i].events);
+            for (i = 0, j = 0; i < ret; i++) {
+                fd = *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
+#if APR_HAS_THREADS
+                /* Check if the polled descriptor is our
+                 * wakeup pipe. In that case do not put it result set.
+                 */
+                if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                    fd.desc_type == APR_POLL_FILE &&
+                    fd.desc.f == pollset->wakeup_pipe[0]) {
+                        drain_wakeup_pipe(pollset);
+                        /* XXX: Is this a correct return value ?
+                         * We might simply return APR_SUCEESS.
+                         */
+                        rv = APR_EINTR;
+                }
+                else
+#endif
+                {
+                    pollset->result_set[j] = fd;
+                    pollset->result_set[j].rtnevents =
+                        get_epoll_revent(pollset->pollset[i].events);
+
+                    j++;
+                }
             }
+            (*num) = j;
         }
         else {
-            for (i = 0; i < ret; i++) {
-                pollset->result_set[i] =
-                    (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
-                pollset->result_set[i].rtnevents =
-                    get_epoll_revent(pollset->pollset[i].events);
+            for (i = 0, j = 0; i < ret; i++) {
+                fd = (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
+#if APR_HAS_THREADS
+                if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                    fd.desc_type == APR_POLL_FILE &&
+                    fd.desc.f == pollset->wakeup_pipe[0]) {
+                        drain_wakeup_pipe(pollset);
+                        /* XXX: Is this a correct return value ?
+                         * We might simply return APR_SUCEESS.
+                         */
+                        rv = APR_EINTR;
+                }
+                else
+#endif
+                {
+                    pollset->result_set[j] = fd;
+                    pollset->result_set[j].rtnevents =
+                        get_epoll_revent(pollset->pollset[i].events);
+                    j++;
+                }
             }
+            (*num) = j;
         }
 
-        if (descriptors) {
+        if (descriptors && (*num)) {
             *descriptors = pollset->result_set;
         }
     }
@@ -294,6 +402,22 @@
     }
 
     return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    else
+        return APR_EINIT;
+#else
+    /* In case APR was compiled without thread support
+     * makes no sense to have wakeup operation usable
+     * only in multithreading environment.
+     */
+    return APR_ENOTIMPL;
+#endif
 }
 
 struct apr_pollcb_t {

Modified: apr/apr/trunk/poll/unix/kqueue.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/kqueue.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/kqueue.c (original)
+++ apr/apr/trunk/poll/unix/kqueue.c Sun Apr 13 01:31:03 2008
@@ -281,6 +281,21 @@
     return rv;
 }
 
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return APR_ENOTIMPL;
+    else
+        return APR_EINIT;
+#else
+    /* In case APR was compiled without thread support
+     * makes no sense to have wakeup operation usable
+     * only in multithreading environment.
+     */
+    return APR_ENOTIMPL;
+#endif
+}
 
 struct apr_pollcb_t {
     apr_pool_t *pool;

Modified: apr/apr/trunk/poll/unix/poll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/poll.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/poll.c (original)
+++ apr/apr/trunk/poll/unix/poll.c Sun Apr 13 01:31:03 2008
@@ -156,11 +156,70 @@
     apr_pool_t *pool;
     apr_uint32_t nelts;
     apr_uint32_t nalloc;
+    apr_uint32_t flags;
+#if APR_HAS_THREADS
+    /* Pipe descriptors used for wakeup */
+    apr_file_t *wakeup_pipe[2];    
+#endif
     struct pollfd *pollset;
     apr_pollfd_t *query_set;
     apr_pollfd_t *result_set;
 };
 
+#if APR_HAS_THREADS
+
+static apr_status_t wakeup_pipe_cleanup(void *p)
+{
+    apr_status_t rv = APR_SUCCESS;
+    apr_pollset_t *pollset = (apr_pollset_t *)p;
+
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        /* Close both sides of the wakeup pipe */
+        rv |= apr_file_close(pollset->wakeup_pipe[0]);
+        rv |= apr_file_close(pollset->wakeup_pipe[1]);
+    }
+    return rv;
+}
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+    apr_status_t rv;
+    apr_pollfd_t fd;
+
+    if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+                                   &pollset->wakeup_pipe[1],
+                                   pollset->pool)) != APR_SUCCESS)
+        return rv;
+    fd.reqevents = APR_POLLIN;
+    fd.desc_type = APR_POLL_FILE;
+    fd.desc.f = pollset->wakeup_pipe[0];
+    /* Add the pipe to the pollset
+     */
+    return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+    char rb[512];
+    apr_size_t nr = sizeof(rb);
+
+    while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+        /* Although we write just one byte to the other end of the pipe
+         * during wakeup, multiple treads could call the wakeup.
+         * So simply drain out from the input side of the pipe all
+         * the data.
+         */
+        if (nr != sizeof(rb))
+            break;
+    }
+}
+
+#endif
+
 APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
                                              apr_uint32_t size,
                                              apr_pool_t *p,
@@ -170,19 +229,46 @@
         *pollset = NULL;
         return APR_ENOTIMPL;
     }
+    if (flags & APR_POLLSET_WAKEABLE) {
+#if APR_HAS_THREADS
+        /* Add room for wakeup descriptor */
+        size++;
+#else
+        *pollset = NULL;
+        return APR_ENOTIMPL;
+#endif
+    }
 
     *pollset = apr_palloc(p, sizeof(**pollset));
     (*pollset)->nelts = 0;
     (*pollset)->nalloc = size;
     (*pollset)->pool = p;
+    (*pollset)->flags = flags;
     (*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd));
     (*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
     (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+#if APR_HAS_THREADS
+    if (flags & APR_POLLSET_WAKEABLE) {
+        apr_status_t rv;
+        /* Create wakeup pipe */
+        if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+            *pollset = NULL;
+            return rv;
+        }
+        apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup,
+                                  wakeup_pipe_cleanup);
+    }
+#endif
+
     return APR_SUCCESS;
 }
 
 APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset)
 {
+#if APR_HAS_THREADS
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+         return apr_pool_cleanup_run(pollset->pool, pollset, backend_cleanup);
+#endif
     return APR_SUCCESS;
 }
 
@@ -242,32 +328,69 @@
                                            apr_int32_t *num,
                                            const apr_pollfd_t **descriptors)
 {
-    int rv;
+    int ret;
+    apr_status_t rv = APR_SUCCESS;
     apr_uint32_t i, j;
 
     if (timeout > 0) {
         timeout /= 1000;
     }
-    rv = poll(pollset->pollset, pollset->nelts, timeout);
-    (*num) = rv;
-    if (rv < 0) {
+    ret = poll(pollset->pollset, pollset->nelts, timeout);
+    (*num) = ret;
+    if (ret < 0) {
         return apr_get_netos_error();
     }
-    if (rv == 0) {
+    else if (res == 0) {
         return APR_TIMEUP;
     }
-    j = 0;
-    for (i = 0; i < pollset->nelts; i++) {
-        if (pollset->pollset[i].revents != 0) {
-            pollset->result_set[j] = pollset->query_set[i];
-            pollset->result_set[j].rtnevents =
-                get_revent(pollset->pollset[i].revents);
-            j++;
+    else {
+        for (i = 0, j = 0; i < pollset->nelts; i++) {
+            if (pollset->pollset[i].revents != 0) {
+#if APR_HAS_THREADS
+                /* Check if the polled descriptor is our
+                 * wakeup pipe. In that case do not put it result set.
+                 */
+                if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+                    pollset->pollset[i].desc_type == APR_POLL_FILE &&
+                    pollset->pollset[i].desc.f == pollset->wakeup_pipe[0]) {
+                        drain_wakeup_pipe(pollset);
+                        /* XXX: Is this a correct return value ?
+                         * We might simply return APR_SUCEESS.
+                         */
+                        rv = APR_EINTR;
+                }
+                else
+#endif
+                {
+                    pollset->result_set[j] = pollset->query_set[i];
+                    pollset->result_set[j].rtnevents =
+                        get_revent(pollset->pollset[i].revents);
+                    j++;
+                }
+            }
         }
+        (*num) = j;
     }
-    if (descriptors)
+    if (descriptors && (*num))
         *descriptors = pollset->result_set;
-    return APR_SUCCESS;
+    return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+    if (pollset->flags & APR_POLLSET_WAKEABLE) {
+        return apr_file_putc(1, pollset->wakeup_pipe[1]);
+    }
+    else
+        return APR_EINIT;
+#else
+    /* In case APR was compiled without thread support
+     * makes no sense to have wakeup operation usable
+     * only in multithreading environment.
+     */
+    return APR_ENOTIMPL;
+#endif
 }
 
 APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,

Modified: apr/apr/trunk/poll/unix/port.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/port.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/port.c (original)
+++ apr/apr/trunk/poll/unix/port.c Sun Apr 13 01:31:03 2008
@@ -335,6 +335,22 @@
     return rv;
 }
 
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return APR_ENOTIMPL;
+    else
+        return APR_EINIT;
+#else
+    /* In case APR was compiled without thread support
+     * makes no sense to have wakeup operation usable
+     * only in multithreading environment.
+     */
+    return APR_ENOTIMPL;
+#endif
+}
+
 struct apr_pollcb_t {
     apr_pool_t *pool;
     apr_uint32_t nalloc;

Modified: apr/apr/trunk/poll/unix/select.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/select.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/select.c (original)
+++ apr/apr/trunk/poll/unix/select.c Sun Apr 13 01:31:03 2008
@@ -175,6 +175,7 @@
 
     apr_uint32_t nelts;
     apr_uint32_t nalloc;
+    apr_uint32_t flags;
     fd_set readset, writeset, exceptset;
     int maxfd;
     apr_pollfd_t *query_set;
@@ -203,6 +204,7 @@
     (*pollset)->nelts = 0;
     (*pollset)->nalloc = size;
     (*pollset)->pool = p;
+    (*pollset)->flags = flags;
     FD_ZERO(&((*pollset)->readset));
     FD_ZERO(&((*pollset)->writeset));
     FD_ZERO(&((*pollset)->exceptset));
@@ -404,6 +406,22 @@
     if (descriptors)
         *descriptors = pollset->result_set;
     return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+    if (pollset->flags & APR_POLLSET_WAKEABLE)
+        return APR_ENOTIMPL;
+    else
+        return APR_EINIT;
+#else
+    /* In case APR was compiled without thread support
+     * makes no sense to have wakeup operation usable
+     * only in multithreading environment.
+     */
+    return APR_ENOTIMPL;
+#endif
 }
 
 APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,

Modified: apr/apr/trunk/test/testpoll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/test/testpoll.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/test/testpoll.c (original)
+++ apr/apr/trunk/test/testpoll.c Sun Apr 13 01:31:03 2008
@@ -659,6 +659,22 @@
     ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
 }
 
+static void test_wakeup(abts_case *tc, void *data)
+{
+    apr_status_t rv;
+    int i, lrv;
+    const apr_pollfd_t *descs = NULL;
+
+    for (i = 0; i < 1000; i++) {
+        rv = apr_pollset_wakeup(pollset);
+        ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+    }
+    rv = apr_pollset_poll(pollset, 0, &lrv, &descs);
+    ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EINTR(rv));
+    ABTS_INT_EQUAL(tc, 0, lrv);
+    ABTS_PTR_EQUAL(tc, NULL, descs);
+}
+
 abts_suite *testpoll(abts_suite *suite)
 {
     suite = ADD_SUITE(suite)
@@ -688,6 +704,7 @@
     abts_run_test(suite, clear_middle_pollset, NULL);
     abts_run_test(suite, send_last_pollset, NULL);
     abts_run_test(suite, clear_last_pollset, NULL);
+    abts_run_test(suite, test_wakeup, NULL);
 
     abts_run_test(suite, pollset_remove, NULL);
     



Mime
View raw message