httpd-modules-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacob Champion <jacob.champ...@ni.com>
Subject [PATCH 4/5] Read and write to the brigade from only one thread
Date Mon, 09 Mar 2015 18:43:55 GMT
The prior implementation allowed clients to write to a parallel bucket
brigade from separate threads, while the main thread read incoming
messages from the original brigade. This appears to cause thread safety
issues, especially when combined with TLS (OpenSSL has the ability to
read from the socket during a write, and vice-versa).

This patch, inspired by both mod_spdy and mod_proxy_wstunnel, causes
cross-thread invocations of mod_websocket_plugin_send() to place their
messages on a queue and signal the main thread that a write is pending.
The event loop is based on apr_pollset.

See http://mail-archives.apache.org/mod_mbox/httpd-modules-dev/201502.mbox/%3C54EE23EB.5040705@ni.com%3E
for history. Thanks to Alex Bligh and Eric Covener for their
suggestions.
---
 mod_websocket.c | 175 ++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 164 insertions(+), 11 deletions(-)

diff --git a/mod_websocket.c b/mod_websocket.c
index 1e88ef6..8e3bf26 100644
--- a/mod_websocket.c
+++ b/mod_websocket.c
@@ -25,8 +25,10 @@
  */
 
 #include "apr_base64.h"
+#include "apr_queue.h"
 #include "apr_sha1.h"
 #include "apr_strings.h"
+#include "apr_thread_cond.h"
 
 #include "httpd.h"
 #include "http_config.h"
@@ -58,6 +60,8 @@ typedef struct
 
 #define BLOCK_DATA_SIZE              4096
 
+#define QUEUE_CAPACITY                 16
+
 #define DATA_FRAMING_MASK               0
 #define DATA_FRAMING_START              1
 #define DATA_FRAMING_PAYLOAD_LENGTH     2
@@ -214,10 +218,14 @@ typedef struct _WebSocketState
 {
     request_rec *r;
     apr_bucket_brigade *obb;
+    apr_os_thread_t main_thread;
     apr_thread_mutex_t *mutex;
+    apr_thread_cond_t *cond;
     apr_array_header_t *protocols;
     int closing;
     apr_int64_t protocol_version;
+    apr_pollset_t *pollset;
+    apr_queue_t *queue;
 } WebSocketState;
 
 static request_rec *CALLBACK mod_websocket_request(const WebSocketServer *server)
@@ -367,6 +375,15 @@ static size_t mod_websocket_send_internal(WebSocketState *state,
     return written;
 }
 
+typedef struct
+{
+    int type;
+    const unsigned char * buffer;
+    size_t buffer_size;
+    int done;
+    size_t written;
+} WebSocketMessageData;
+
 static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server,
                                                  const int type,
                                                  const unsigned char *buffer,
@@ -375,11 +392,60 @@ static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer
*server,
     size_t written = 0;
 
     /* Deal with size more that 63 bits - FIXME */
+    /* FIXME - if sending a zero-length message, the API cannot distinguish
+     * between success and failure */
     if ((server != NULL) && (server->state != NULL)) {
         WebSocketState *state = server->state;
 
         apr_thread_mutex_lock(state->mutex);
-        written = mod_websocket_send_internal(state, type, buffer, buffer_size);
+
+        if (apr_os_thread_equal(apr_os_thread_current(), state->main_thread)) {
+            /* This is the main thread. It's safe to write messages directly. */
+            written = mod_websocket_send_internal(state, type, buffer, buffer_size);
+        }
+        else if ((state->pollset != NULL) && (state->queue != NULL) &&
+                 !state->closing) {
+            /* Dispatch this message to the main thread. */
+            apr_status_t rv;
+            WebSocketMessageData msg = { 0 };
+
+            /* Populate the message data. */
+            msg.type = type;
+            msg.buffer = buffer;
+            msg.buffer_size = buffer_size;
+
+            /* Queue the message. */
+            do {
+                rv = apr_queue_push(state->queue, &msg);
+            } while (APR_STATUS_IS_EINTR(rv));
+
+            if (rv != APR_SUCCESS) {
+                /* Couldn't push the message onto the queue. */
+                goto send_unlock;
+            }
+
+            /* Interrupt the pollset. */
+            rv = apr_pollset_wakeup(state->pollset);
+
+            if (rv != APR_SUCCESS) {
+                /*
+                 * Couldn't wake up poll...? We can't return zero since we've
+                 * already pushed the message, and it might actually be sent...
+                 */
+                /* TODO: log. */
+            }
+
+            /* Wait for the message to be written. */
+            while (!msg.done && !state->closing) {
+                apr_thread_cond_wait(state->cond, state->mutex);
+            }
+
+            if (msg.done) {
+                written = msg.written;
+            }
+        }
+
+send_unlock:
         apr_thread_mutex_unlock(state->mutex);
     }
 
@@ -797,6 +863,26 @@ static void mod_websocket_handle_incoming(const WebSocketServer *server,
     }
 }
 
+static void mod_websocket_handle_outgoing(const WebSocketServer *server,
+                                          WebSocketMessageData *msg)
+{
+    apr_thread_mutex_lock(server->state->mutex);
+    msg->written = mod_websocket_send_internal(server->state, msg->type,
+                                               msg->buffer, msg->buffer_size);
+
+    /*
+     * Notify plugin_send() that the message has been sent.
+     *
+     * XXX Wake up _all_ the waiting threads, since we don't know which one owns
+     * this message. This is contentious if there are a lot of threads writing
+     * in parallel.
+     */
+    msg->done = 1;
+    apr_thread_cond_broadcast(server->state->cond);
+
+    apr_thread_mutex_unlock(server->state->mutex);
+}
+
 /*
  * The data framing handler requires that the server state mutex is locked by
  * the caller upon entering this function. It will be locked when leaving too.
@@ -814,6 +900,7 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     apr_pollfd_t pollfd = { 0 };
     const apr_pollfd_t *signalled;
     apr_int32_t pollcnt;
+    apr_queue_t * queue;
 
     /* We cannot use the same bucket allocator for the ouput bucket brigade
      * obb as the one associated with the connection (r->connection->bucket_alloc)
@@ -827,7 +914,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
     if ((apr_pool_create(&pool, r->pool) == APR_SUCCESS) &&
         ((bucket_alloc = apr_bucket_alloc_create(pool)) != NULL) &&
         ((obb = apr_brigade_create(pool, bucket_alloc)) != NULL) &&
-        (apr_pollset_create(&pollset, 1, pool, 0) == APR_SUCCESS)) {
+        (apr_pollset_create(&pollset, 1, pool, APR_POLLSET_WAKEABLE) == APR_SUCCESS)
&&
+        (apr_queue_create(&queue, QUEUE_CAPACITY, pool) == APR_SUCCESS)) {
         unsigned char block[BLOCK_DATA_SIZE];
         apr_int64_t block_size;
         unsigned char status_code_buffer[2];
@@ -844,6 +932,8 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         read_state.frame = &read_state.control_frame;
         read_state.opcode = 0xFF;
 
+        state->queue = queue;
+
         /* Initialize the pollset */
         pollfd.p = pool;
         pollfd.desc_type = APR_POLL_SOCKET;
@@ -851,25 +941,74 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         pollfd.desc.s = ap_get_conn_socket(state->r->connection);
         apr_pollset_add(pollset, &pollfd);
 
+        state->pollset = pollset;
+
         /* Allow the plugin to now write to the client */
         state->obb = obb;
         apr_thread_mutex_unlock(state->mutex);
 
+        /*
+         * Main loop, inspired by mod_spdy. Alternate between data coming from
+         * the client and data coming from the server. Only block in poll() if
+         * there is no work to be done for either side.
+         */
         while ((read_state.framing_state != DATA_FRAMING_CLOSE)) {
             apr_status_t rv;
+            apr_interval_time_t timeout;
+            WebSocketMessageData *msg;
+            int work_done = 0;
 
+            /* Check to see if there is any data to read. */
+            block_size = sizeof(block);
+            rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
+
+            if (rv == APR_SUCCESS) {
+                mod_websocket_handle_incoming(server, block, block_size,
+                                              &read_state, conf, plugin_private);
+                work_done = 1;
+            }
+            else if (!APR_STATUS_IS_EAGAIN(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
+                break;
+            }
+
+            /* Check to see if there is any data to write. */
             do {
-                block_size = sizeof(block);
-                rv = mod_websocket_read_nonblock(r, (char *)block, &block_size);
-            } while (APR_STATUS_IS_EAGAIN(rv) &&
-                     apr_pollset_poll(pollset, -1, &pollcnt, &signalled) == APR_SUCCESS);
+                rv = apr_queue_trypop(state->queue, &msg);
+            } while (APR_STATUS_IS_EINTR(rv));
 
-            if (rv != APR_SUCCESS) {
+            if (rv == APR_SUCCESS) {
+                mod_websocket_handle_outgoing(server, msg);
+                work_done = 1;
+            }
+            else if (!APR_STATUS_IS_EAGAIN(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
                 break;
             }
 
-            mod_websocket_handle_incoming(server, block, block_size,
-                                          &read_state, conf, plugin_private);
+            /*
+             * If there's nothing to do, wait for new work to come in.
+             *
+             * Because Windows cannot poll on both a file pipe and a socket,
+             * plugin_send() uses apr_pollset_wakeup() to signal that new data
+             * is available to write. This is lossy (multiple threads calling
+             * wakeup() will result in only one wakeup here) so it's important
+             * that we do not block until state->queue has emptied. Otherwise
+             * it's possible to lose messages in the queue.
+             *
+             * NOTE: The wakeup pipe is drained only during apr_pollset_poll(),
+             * so we call it each iteration to avoid filling it up. We only
+             * block in poll() (negative timeout) if there was no work done
+             * during the current iteration.
+             */
+            timeout = work_done ? 0 : -1;
+            rv = apr_pollset_poll(state->pollset, timeout, &pollcnt, &signalled);
+
+            if ((rv != APR_SUCCESS) && !APR_STATUS_IS_EINTR(rv) &&
+                    !APR_STATUS_IS_TIMEUP(rv)) {
+                read_state.status_code = STATUS_CODE_INTERNAL_ERROR;
+                break;
+            }
         }
         if (read_state.message_frame.application_data != NULL) {
             free(read_state.message_frame.application_data);
@@ -890,6 +1029,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server,
         /* We are done with the bucket brigade */
         state->obb = NULL;
         apr_brigade_destroy(obb);
+
+        state->pollset = NULL;
+        apr_pollset_destroy(pollset);
+
+        state->queue = NULL;
+        apr_queue_term(queue);
     }
 }
 
@@ -957,8 +1102,10 @@ static int mod_websocket_method_handler(request_rec *r)
                                          &websocket_module);
 
                 if ((conf != NULL) && (conf->plugin != NULL)) {
-                    WebSocketState state =
-                        { r, NULL, NULL, NULL, 0, protocol_version };
+                    WebSocketState state = {
+                        r, NULL, apr_os_thread_current(), NULL, NULL, NULL, 0,
+                        protocol_version, NULL, NULL
+                    };
                     WebSocketServer server = {
                         sizeof(WebSocketServer), 1, &state,
                         mod_websocket_request, mod_websocket_header_get,
@@ -1015,6 +1162,8 @@ static int mod_websocket_method_handler(request_rec *r)
                     apr_thread_mutex_create(&state.mutex,
                                             APR_THREAD_MUTEX_DEFAULT,
                                             r->pool);
+                    apr_thread_cond_create(&state.cond, r->pool);
+
                     apr_thread_mutex_lock(state.mutex);
 
                     /*
@@ -1043,6 +1192,9 @@ static int mod_websocket_method_handler(request_rec *r)
                         mod_websocket_data_framing(&server, conf,
                                                    plugin_private);
 
+                        /* Wake up any waiting plugin_sends before closing */
+                        apr_thread_cond_broadcast(state.cond);
+
                         apr_thread_mutex_unlock(state.mutex);
 
                         /* Tell the plugin that we are disconnecting */
@@ -1068,6 +1220,7 @@ static int mod_websocket_method_handler(request_rec *r)
                     /* Close the connection */
                     ap_lingering_close(r->connection);
 
+                    apr_thread_cond_destroy(state.cond);
                     apr_thread_mutex_destroy(state.mutex);
 
                     return OK;
-- 
2.1.1


Mime
View raw message