Author: mturk
Date: Sun Apr 13 01:31:03 2008
New Revision: 647540
URL: http://svn.apache.org/viewvc?rev=647540&view=rev
Log:
Introduce apr_pollset_wakeup()
Modified:
apr/apr/trunk/CHANGES
apr/apr/trunk/include/apr_poll.h
apr/apr/trunk/poll/unix/epoll.c
apr/apr/trunk/poll/unix/kqueue.c
apr/apr/trunk/poll/unix/poll.c
apr/apr/trunk/poll/unix/port.c
apr/apr/trunk/poll/unix/select.c
apr/apr/trunk/test/testpoll.c
Modified: apr/apr/trunk/CHANGES
URL: http://svn.apache.org/viewvc/apr/apr/trunk/CHANGES?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/CHANGES [utf-8] (original)
+++ apr/apr/trunk/CHANGES [utf-8] Sun Apr 13 01:31:03 2008
@@ -1,6 +1,10 @@
-*- coding: utf-8 -*-
Changes for APR 1.3.0
+ *) Introduce apr_pollset_wakeup() for interrupting
+ the blocking apr_pollset_poll call.
+ [Mladen Turk]
+
*) Introduce apr_pool_sys_allocator_set() for registering
application provided memory allocation functions that
will APR use internally whenever malloc/free is needed.
Modified: apr/apr/trunk/include/apr_poll.h
URL: http://svn.apache.org/viewvc/apr/apr/trunk/include/apr_poll.h?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/include/apr_poll.h (original)
+++ apr/apr/trunk/include/apr_poll.h Sun Apr 13 01:31:03 2008
@@ -56,6 +56,7 @@
*/
#define APR_POLLSET_THREADSAFE 0x001 /**< Adding or Removing a Descriptor is thread safe
*/
#define APR_POLLSET_NOCOPY 0x002 /**< Descriptors passed to apr_pollset_create() are
not copied */
+#define APR_POLLSET_WAKEABLE 0x004 /**< Pollset poll operation is interruptable */
/** Used in apr_pollfd_t to determine what the apr_descriptor is */
typedef enum {
@@ -166,6 +167,11 @@
apr_int32_t *num,
const apr_pollfd_t **descriptors);
+/**
+ * Interrupt the blocked apr_pollset_poll call.
+ * @param pollset The pollset to use
+ */
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset);
/**
* Poll the descriptors in the poll structure
Modified: apr/apr/trunk/poll/unix/epoll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/epoll.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/epoll.c (original)
+++ apr/apr/trunk/poll/unix/epoll.c Sun Apr 13 01:31:03 2008
@@ -68,6 +68,8 @@
#if APR_HAS_THREADS
/* A thread mutex to protect operations on the rings */
apr_thread_mutex_t *ring_lock;
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
#endif
/* A ring containing all of the pollfd_t that are active */
APR_RING_HEAD(pfd_query_ring_t, pfd_elem_t) query_ring;
@@ -80,11 +82,61 @@
static apr_status_t backend_cleanup(void *p_)
{
+ apr_status_t rv = APR_SUCCESS;
apr_pollset_t *pollset = (apr_pollset_t *) p_;
+
close(pollset->epoll_fd);
- return APR_SUCCESS;
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ rv |= apr_file_close(pollset->wakeup_pipe[0]);
+ rv |= apr_file_close(pollset->wakeup_pipe[1]);
+ }
+#endif
+ return rv;
+}
+
+#if APR_HAS_THREADS
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
}
+#endif
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -93,6 +145,13 @@
apr_status_t rv;
int fd;
+#if APR_HAS_THREADS
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Add room for wakeup descriptor */
+ size++;
+ }
+#endif
+
fd = epoll_create(size);
if (fd < 0) {
*pollset = NULL;
@@ -110,7 +169,8 @@
return rv;
}
#else
- if (flags & APR_POLLSET_THREADSAFE) {
+ if (flags & APR_POLLSET_THREADSAFE ||
+ flags & APR_POLLSET_WAKEABLE) {
*pollset = NULL;
return APR_ENOTIMPL;
}
@@ -121,7 +181,6 @@
(*pollset)->pool = p;
(*pollset)->epoll_fd = fd;
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event));
- apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
if (!(flags & APR_POLLSET_NOCOPY)) {
@@ -129,6 +188,18 @@
APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
}
+#if APR_HAS_THREADS
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ close(fd);
+ *pollset = NULL;
+ return rv;
+ }
+ }
+#endif
+ apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
+
return APR_SUCCESS;
}
@@ -244,8 +315,9 @@
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int ret, i;
+ int ret, i, j;
apr_status_t rv = APR_SUCCESS;
+ apr_pollfd_t fd;
if (timeout > 0) {
timeout /= 1000;
@@ -263,23 +335,59 @@
}
else {
if (pollset->flags & APR_POLLSET_NOCOPY) {
- for (i = 0; i < ret; i++) {
- pollset->result_set[i] =
- *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
- pollset->result_set[i].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
+ for (i = 0, j = 0; i < ret; i++) {
+ fd = *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
+#if APR_HAS_THREADS
+ /* Check if the polled descriptor is our
+ * wakeup pipe. In that case do not put it result set.
+ */
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else
+#endif
+ {
+ pollset->result_set[j] = fd;
+ pollset->result_set[j].rtnevents =
+ get_epoll_revent(pollset->pollset[i].events);
+
+ j++;
+ }
}
+ (*num) = j;
}
else {
- for (i = 0; i < ret; i++) {
- pollset->result_set[i] =
- (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
- pollset->result_set[i].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
+ for (i = 0, j = 0; i < ret; i++) {
+ fd = (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
+#if APR_HAS_THREADS
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else
+#endif
+ {
+ pollset->result_set[j] = fd;
+ pollset->result_set[j].rtnevents =
+ get_epoll_revent(pollset->pollset[i].events);
+ j++;
+ }
}
+ (*num) = j;
}
- if (descriptors) {
+ if (descriptors && (*num)) {
*descriptors = pollset->result_set;
}
}
@@ -294,6 +402,22 @@
}
return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
}
struct apr_pollcb_t {
Modified: apr/apr/trunk/poll/unix/kqueue.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/kqueue.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/kqueue.c (original)
+++ apr/apr/trunk/poll/unix/kqueue.c Sun Apr 13 01:31:03 2008
@@ -281,6 +281,21 @@
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return APR_ENOTIMPL;
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
+}
struct apr_pollcb_t {
apr_pool_t *pool;
Modified: apr/apr/trunk/poll/unix/poll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/poll.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/poll.c (original)
+++ apr/apr/trunk/poll/unix/poll.c Sun Apr 13 01:31:03 2008
@@ -156,11 +156,70 @@
apr_pool_t *pool;
apr_uint32_t nelts;
apr_uint32_t nalloc;
+ apr_uint32_t flags;
+#if APR_HAS_THREADS
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
+#endif
struct pollfd *pollset;
apr_pollfd_t *query_set;
apr_pollfd_t *result_set;
};
+#if APR_HAS_THREADS
+
+static apr_status_t wakeup_pipe_cleanup(void *p)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_pollset_t *pollset = (apr_pollset_t *)p;
+
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ rv |= apr_file_close(pollset->wakeup_pipe[0]);
+ rv |= apr_file_close(pollset->wakeup_pipe[1]);
+ }
+ return rv;
+}
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
+#endif
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -170,19 +229,46 @@
*pollset = NULL;
return APR_ENOTIMPL;
}
+ if (flags & APR_POLLSET_WAKEABLE) {
+#if APR_HAS_THREADS
+ /* Add room for wakeup descriptor */
+ size++;
+#else
+ *pollset = NULL;
+ return APR_ENOTIMPL;
+#endif
+ }
*pollset = apr_palloc(p, sizeof(**pollset));
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->pool = p;
+ (*pollset)->flags = flags;
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd));
(*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+#if APR_HAS_THREADS
+ if (flags & APR_POLLSET_WAKEABLE) {
+ apr_status_t rv;
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ *pollset = NULL;
+ return rv;
+ }
+ apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup,
+ wakeup_pipe_cleanup);
+ }
+#endif
+
return APR_SUCCESS;
}
APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset)
{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_pool_cleanup_run(pollset->pool, pollset, backend_cleanup);
+#endif
return APR_SUCCESS;
}
@@ -242,32 +328,69 @@
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int rv;
+ int ret;
+ apr_status_t rv = APR_SUCCESS;
apr_uint32_t i, j;
if (timeout > 0) {
timeout /= 1000;
}
- rv = poll(pollset->pollset, pollset->nelts, timeout);
- (*num) = rv;
- if (rv < 0) {
+ ret = poll(pollset->pollset, pollset->nelts, timeout);
+ (*num) = ret;
+ if (ret < 0) {
return apr_get_netos_error();
}
- if (rv == 0) {
+ else if (res == 0) {
return APR_TIMEUP;
}
- j = 0;
- for (i = 0; i < pollset->nelts; i++) {
- if (pollset->pollset[i].revents != 0) {
- pollset->result_set[j] = pollset->query_set[i];
- pollset->result_set[j].rtnevents =
- get_revent(pollset->pollset[i].revents);
- j++;
+ else {
+ for (i = 0, j = 0; i < pollset->nelts; i++) {
+ if (pollset->pollset[i].revents != 0) {
+#if APR_HAS_THREADS
+ /* Check if the polled descriptor is our
+ * wakeup pipe. In that case do not put it result set.
+ */
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ pollset->pollset[i].desc_type == APR_POLL_FILE &&
+ pollset->pollset[i].desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else
+#endif
+ {
+ pollset->result_set[j] = pollset->query_set[i];
+ pollset->result_set[j].rtnevents =
+ get_revent(pollset->pollset[i].revents);
+ j++;
+ }
+ }
}
+ (*num) = j;
}
- if (descriptors)
+ if (descriptors && (*num))
*descriptors = pollset->result_set;
- return APR_SUCCESS;
+ return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ }
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
}
APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,
Modified: apr/apr/trunk/poll/unix/port.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/port.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/port.c (original)
+++ apr/apr/trunk/poll/unix/port.c Sun Apr 13 01:31:03 2008
@@ -335,6 +335,22 @@
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return APR_ENOTIMPL;
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
+}
+
struct apr_pollcb_t {
apr_pool_t *pool;
apr_uint32_t nalloc;
Modified: apr/apr/trunk/poll/unix/select.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/poll/unix/select.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/poll/unix/select.c (original)
+++ apr/apr/trunk/poll/unix/select.c Sun Apr 13 01:31:03 2008
@@ -175,6 +175,7 @@
apr_uint32_t nelts;
apr_uint32_t nalloc;
+ apr_uint32_t flags;
fd_set readset, writeset, exceptset;
int maxfd;
apr_pollfd_t *query_set;
@@ -203,6 +204,7 @@
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->pool = p;
+ (*pollset)->flags = flags;
FD_ZERO(&((*pollset)->readset));
FD_ZERO(&((*pollset)->writeset));
FD_ZERO(&((*pollset)->exceptset));
@@ -404,6 +406,22 @@
if (descriptors)
*descriptors = pollset->result_set;
return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return APR_ENOTIMPL;
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
}
APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,
Modified: apr/apr/trunk/test/testpoll.c
URL: http://svn.apache.org/viewvc/apr/apr/trunk/test/testpoll.c?rev=647540&r1=647539&r2=647540&view=diff
==============================================================================
--- apr/apr/trunk/test/testpoll.c (original)
+++ apr/apr/trunk/test/testpoll.c Sun Apr 13 01:31:03 2008
@@ -659,6 +659,22 @@
ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
}
+static void test_wakeup(abts_case *tc, void *data)
+{
+ apr_status_t rv;
+ int i, lrv;
+ const apr_pollfd_t *descs = NULL;
+
+ for (i = 0; i < 1000; i++) {
+ rv = apr_pollset_wakeup(pollset);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+ rv = apr_pollset_poll(pollset, 0, &lrv, &descs);
+ ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EINTR(rv));
+ ABTS_INT_EQUAL(tc, 0, lrv);
+ ABTS_PTR_EQUAL(tc, NULL, descs);
+}
+
abts_suite *testpoll(abts_suite *suite)
{
suite = ADD_SUITE(suite)
@@ -688,6 +704,7 @@
abts_run_test(suite, clear_middle_pollset, NULL);
abts_run_test(suite, send_last_pollset, NULL);
abts_run_test(suite, clear_last_pollset, NULL);
+ abts_run_test(suite, test_wakeup, NULL);
abts_run_test(suite, pollset_remove, NULL);
|