qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/2] qpid-dispatch git commit: DISPATCH-774: backport http changes to 0.8.x, use stock libwebsockets
Date Tue, 16 May 2017 18:54:37 GMT
DISPATCH-774: backport http changes to 0.8.x, use stock libwebsockets

The 0.8.x http-libwebsockets implementation now works with the standard
libwebsockets 2.1 release, it does not require a special patched version.

The changes are based on master, with some differences because of other changes
to the IO code on master.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/b1e27480
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/b1e27480
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/b1e27480

Branch: refs/heads/0.8.x
Commit: b1e27480135ab4ff536a3b8d31a7c8ffb7753677
Parents: ced41ae
Author: Alan Conway <aconway@redhat.com>
Authored: Tue May 16 13:43:21 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Tue May 16 14:43:44 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                |   2 +-
 cmake/FindLibWebSockets.cmake |  31 +-
 src/CMakeLists.txt            |   9 +-
 src/http-libwebsockets.c      | 838 ++++++++++++++++++++++---------------
 src/http-none.c               |  27 +-
 src/http.h                    |  20 +-
 src/server.c                  | 128 +++---
 src/server_private.h          |  31 +-
 8 files changed, 639 insertions(+), 447 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2187774..2bd4b24 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -103,7 +103,7 @@ find_library(rt_lib rt)
 find_package(Proton 0.15 REQUIRED)
 
 ## Optional dependencies
-include(FindLibWebSockets)
+find_package(LibWebSockets 2)
 option(USE_LIBWEBSOCKETS "Use libwebsockets for WebSocket support" ${LIBWEBSOCKETS_FOUND})
 
 ##

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/cmake/FindLibWebSockets.cmake
----------------------------------------------------------------------
diff --git a/cmake/FindLibWebSockets.cmake b/cmake/FindLibWebSockets.cmake
index 18ef18a..3723f5c 100644
--- a/cmake/FindLibWebSockets.cmake
+++ b/cmake/FindLibWebSockets.cmake
@@ -42,24 +42,19 @@ find_path(LIBWEBSOCKETS_INCLUDE_DIRS
   PATHS /usr/include
   )
 
-include(FindPackageHandleStandardArgs)
-
-find_package_handle_standard_args(LIBWEBSOCKETS DEFAULT_MSG LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
-
-if(LIBWEBSOCKETS_FOUND)
-  # For the moment we need a patched version of LibWebSockets:
-  # https://github.com/alanconway/libwebsockets/tree/v2.1-stable-aconway-adopt-ssl
-  # This function check verifies we have it.
-  set(CMAKE_REQUIRED_INCLUDES ${LIBWEBSOCKETS_INCLUDE_DIRS})
-  set(CMAKE_REQUIRED_LIBRARIES ${LIBWEBSOCKETS_LIBRARIES})
-  check_function_exists(lws_adopt_socket_vhost LWS_ADOPT_SOCKET_VHOST_FOUND)
-  if (NOT LWS_ADOPT_SOCKET_VHOST_FOUND)
-    message("Cannot use LibWebSockets, no function lws_adopt_socket_vhost")
-    unset(LIBWEBSOCKETS_FOUND)
+# We need vhost support which appeared in v2.0 of libwebsockets
+set(CMAKE_REQUIRED_INCLUDES ${LIBWEBSOCKETS_INCLUDE_DIRS})
+set(CMAKE_REQUIRED_LIBRARIES ${LIBWEBSOCKETS_LIBRARIES})
+set(MSG DEFAULT_MSG)
+if (LIBWEBSOCKETS_LIBRARIES AND LIBWEBSOCKETS_INCLUDE_DIRS)
+  check_function_exists(lws_create_vhost LIBWEBSOCKETS_OK)
+  if (NOT LIBWEBSOCKETS_OK)
+    set(MSG "Cannot use LibWebSockets version < 2 in ${LIBWEBSOCKETS_LIBRARIES}")
+    set(LIBWEBSOCKETS_LIBRARIES "NOTFOUND")
+    set(LIBWEBSOCKETS_INCLUDE_DIRS "NOTFOUND")
   endif()
 endif()
 
-if(NOT LIBWEBSOCKETS_FOUND)
-  set(LIBWEBSOCKETS_LIBRARIES "")
-  set(LIBWEBSOCKETS_INCLUDE_DIRS "")
-endif()
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(LIBWEBSOCKETS ${MSG} LIBWEBSOCKETS_LIBRARIES LIBWEBSOCKETS_INCLUDE_DIRS)
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 4c00206..5edea1d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -39,8 +39,10 @@ add_custom_command (
 include_directories(
   ${CMAKE_CURRENT_SOURCE_DIR}
   ${CMAKE_CURRENT_BINARY_DIR}
-  ${LIBWEBSOCKETS_INCLUDE_DIRS}
   )
+if (USE_LIBWEBSOCKETS)
+  include_directories(${LIBWEBSOCKETS_INCLUDE_DIRS})
+endif()
 
 # Build the qpid-dispatch library.
 set(qpid_dispatch_SOURCES
@@ -112,7 +114,10 @@ if (CMAKE_C_COMPILER_ID STREQUAL "GNU")
 endif (CMAKE_C_COMPILER_ID STREQUAL "GNU")
 
 add_library(qpid-dispatch SHARED ${qpid_dispatch_SOURCES})
-target_link_libraries(qpid-dispatch ${Proton_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES} ${LIBWEBSOCKETS_LIBRARIES})
+target_link_libraries(qpid-dispatch ${Proton_LIBRARIES} ${pthread_lib} ${rt_lib} ${dl_lib} ${PYTHON_LIBRARIES})
+if (USE_LIBWEBSOCKETS)
+  target_link_libraries(qpid-dispatch ${LIBWEBSOCKETS_LIBRARIES})
+endif()
 set_target_properties(qpid-dispatch PROPERTIES
   LINK_FLAGS "${CATCH_UNDEFINED}"
   )

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index 745b090..dc8ff58 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -19,446 +19,620 @@
 
 #include <qpid/dispatch/atomic.h>
 #include <qpid/dispatch/amqp.h>
-#include <qpid/dispatch/driver.h>
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/timer.h>
 
+#include <proton/connection_driver.h>
+
 #include <libwebsockets.h>
 
 #include <assert.h>
+#include <ctype.h>
 #include <errno.h>
 #include <inttypes.h>
+#include <time.h>
 
 #include "http.h"
 #include "server_private.h"
 #include "config.h"
 
+static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH"; /* Default */
+
+/* Log for LWS messages. For dispatch server messages use qd_http_server_t::log */
 static qd_log_source_t* http_log;
 
-static const char *CIPHER_LIST = "ALL:aNULL:!eNULL:@STRENGTH";
+static qd_log_level_t qd_level(int lll) {
+    switch (lll) {
+    case LLL_ERR: return QD_LOG_ERROR;
+    case LLL_WARN: return QD_LOG_WARNING;
+    /* LWS is noisy compared to dispatch on the informative levels, downgrade */
+    case LLL_NOTICE: return QD_LOG_DEBUG;
+    default: return QD_LOG_TRACE; /* Everything else to trace  */
+    }
+}
+
+static void logger(int lll, const char *line)  {
+    size_t  len = strlen(line);
+    while (len > 1 && isspace(line[len-1])) { /* Strip trailing newline */
+        --len;
+    }
+    qd_log(http_log, qd_level(lll), "%.*s", len, line);
+}
+
+static void log_init() {
+    http_log = qd_log_source("HTTP");
+    int levels = 0;
+    for (int i = 0; i < LLL_COUNT; ++i) {
+        int lll = 1<<i;
+        levels |= qd_log_enabled(http_log, qd_level(lll)) ? lll : 0;
+    }
+    lws_set_log_level(levels, logger);
+}
+
+/* Intermediate write buffer: LWS needs extra header space on write.  */
+typedef struct buffer_t {
+    char *start;
+    size_t size, cap;
+} buffer_t;
+
+/* Ensure size bytes in buffer, make buf empty if alloc fails */
+static void buffer_set_size(buffer_t *buf, size_t size) {
+    if (size > buf->cap) {
+        buf->cap = (size > buf->cap * 2) ? size : buf->cap * 2;
+        buf->start = realloc(buf->start, buf->cap);
+    }
+    if (buf->start) {
+        buf->size = size;
+    } else {
+        buf->size = buf->cap = 0;
+    }
+}
 
-/* Associate file-descriptors, LWS instances and qdpn_connectors */
-typedef struct fd_data_t {
-    qdpn_connector_t *connector;
+/* AMQPWS connection: set as lws user data and qd_conn->context */
+struct qd_http_connection_t {
+    pn_connection_driver_t driver;
+    qd_connection_t* qd_conn;
+    buffer_t wbuf;   /* LWS requires allocated header space at start of buffer */
     struct lws *wsi;
-} fd_data_t;
+    char name[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port */
+    char hostip[NI_MAXHOST];          /* Remote host IP address */
+    bool closed;
+};
+
+/* Navigating from WSI pointer to qd objects */
+static qd_http_server_t *wsi_server(struct lws *wsi);
+static qd_http_listener_t *wsi_listener(struct lws *wsi);
+static qd_log_source_t *wsi_log(struct lws *wsi);
 
-/* HTTP server state shared by all listeners  */
+
+/* Declare LWS callbacks and protocol list */
+static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
+                         void *user, void *in, size_t len);
+static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
+                           void *user, void *in, size_t len);
+
+static struct lws_protocols protocols[] = {
+    /* HTTP only protocol comes first */
+    {
+        "http-only",
+        callback_http,
+        0,
+    },
+    /* "amqp" is the official oasis AMQP over WebSocket protocol name */
+    {
+        "amqp",
+        callback_amqpws,
+        sizeof(qd_http_connection_t),
+    },
+    /* "binary" is an alias for "amqp", for compatibility with clients designed
+     * to work with a WebSocket proxy
+     */
+    {
+        "binary",
+        callback_amqpws,
+        sizeof(qd_http_connection_t),
+    },
+    { NULL, NULL, 0, 0 } /* terminator */
+};
+
+
+static inline int unexpected_close(struct lws *wsi, const char *msg) {
+    lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION,
+                     (unsigned char*)msg, strlen(msg));
+    char peer[64] = "";
+    lws_get_peer_simple(wsi, peer, sizeof(peer));
+    qd_log(wsi_log(wsi), QD_LOG_ERROR, "Error on HTTP connection from %s: %s", peer, msg);
+    return -1;
+}
+
+static int handle_events(qd_http_connection_t* c) {
+    if (!c->qd_conn) {
+        return unexpected_close(c->wsi, "not-established");
+    }
+    qd_connection_process(c->qd_conn);
+    if (pn_connection_driver_write_buffer(&c->driver).size) {
+        lws_callback_on_writable(c->wsi);
+    }
+    if (pn_connection_driver_finished(&c->driver)) {
+        lws_close_reason(c->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0);
+        c->closed = true;
+        qd_connection_process(c->qd_conn);
+        return -1;
+    }
+    return 0;
+}
+
+/* The server has a bounded, thread-safe queue for external work */
+typedef struct work_t {
+    enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type;
+    void *value;
+} work_t;
+
+#define WORK_MAX 8              /* Just decouple threads, not a big buffer */
+
+typedef struct work_queue_t {
+    sys_mutex_t *lock;
+    sys_cond_t *cond;
+    work_t work[WORK_MAX];
+    size_t head, len;          /* Ring buffer */
+} work_queue_t;
+
+/* HTTP Server runs in a single thread, communication from other threads via work_queue */
 struct qd_http_server_t {
-    qd_dispatch_t *dispatch;
+    qd_server_t *server;
+    sys_thread_t *thread;
+    work_queue_t work;
     qd_log_source_t *log;
-    sys_mutex_t *lock;         /* For now use LWS as a thread-unsafe library. */
     struct lws_context *context;
-    qd_timer_t *timer;
-    int vhost_id;               /* unique identifier for vhost name */
-    fd_data_t *fd;              /* indexed by file descriptor */
-    size_t fd_len;
+    pn_timestamp_t now;         /* Cache current time in thread_run */
+    pn_timestamp_t next_tick;   /* Next requested tick service */
 };
 
-/* Per-HTTP-listener */
+static void work_queue_destroy(work_queue_t *wq) {
+    if (wq->lock) sys_mutex_free(wq->lock);
+    if (wq->cond) sys_cond_free(wq->cond);
+}
+
+static void work_queue_init(work_queue_t *wq) {
+    wq->lock = sys_mutex();
+    wq->cond = sys_cond();
+}
+
+ /* Block till there is space */
+static void work_push(qd_http_server_t *hs, work_t w) {
+    work_queue_t *wq = &hs->work;
+    sys_mutex_lock(wq->lock);
+    while (wq->len == WORK_MAX) {
+        lws_cancel_service(hs->context); /* Wake up the run thread to clear space */
+        sys_cond_wait(wq->cond, wq->lock);
+    }
+    wq->work[(wq->head + wq->len) % WORK_MAX] = w;
+    ++wq->len;
+    sys_mutex_unlock(wq->lock);
+    lws_cancel_service(hs->context); /* Wake up the run thread to handle my work */
+}
+
+/* Non-blocking, return { W_NONE, NULL } if empty */
+static work_t work_pop(qd_http_server_t *hs) {
+    work_t w = { W_NONE, NULL };
+    work_queue_t *wq = &hs->work;
+    sys_mutex_lock(wq->lock);
+    if (wq->len > 0) {
+        w = wq->work[wq->head];
+        wq->head = (wq->head + 1) % WORK_MAX;
+        --wq->len;
+        sys_cond_signal(wq->cond);
+    }
+    sys_mutex_unlock(wq->lock);
+    return w;
+}
+
+/* Each qd_http_listener_t is associated with an lws_vhost */
 struct qd_http_listener_t {
+    qd_listener_t *listener;
     qd_http_server_t *server;
     struct lws_vhost *vhost;
+    char host_port[NI_MAXHOST + NI_MAXSERV];
     struct lws_http_mount mount;
-    char name[256];             /* vhost name */
 };
 
-/* Get wsi/connector associated with fd or NULL if nothing on record. */
-static inline fd_data_t *fd_data(qd_http_server_t *s, int fd) {
-    fd_data_t *d = (fd < s->fd_len) ? &s->fd[fd] : NULL;
-    return (d && (d->connector || d->wsi)) ? d : NULL;
+void qd_http_listener_free(qd_http_listener_t *hl) {
+    if (!hl) return;
+    if (hl->listener) {
+        hl->listener->http = NULL;
+    }
+    free(hl);
 }
 
-static inline qd_http_server_t *wsi_http_server(struct lws *wsi) {
-    return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
+static qd_http_listener_t *qd_http_listener(qd_http_server_t *hs, qd_listener_t *li) {
+    qd_http_listener_t *hl = calloc(1, sizeof(*hl));
+    if (hl) {
+        hl->server = hs;
+        hl->listener = li;
+        li->http = hl;
+    } else {
+        qd_log(hs->log, QD_LOG_CRITICAL, "No memory for HTTP listen on %s", hl->host_port);
+    }
+    return hl;
 }
 
-static inline qdpn_connector_t *wsi_connector(struct lws *wsi) {
-    fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
-    return d ? d->connector : NULL;
+static int qd_port_int(const char* port_str) {
+    if (!strcmp(port_str, "amqp")) return 5672;
+    if (!strcmp(port_str, "amqps")) return 5671;
+    errno = 0;
+    unsigned long n = strtoul(port_str, NULL, 10);
+    if (errno || n > 0xFFFF) return -1;
+    return n;
 }
 
-static inline fd_data_t *set_fd(qd_http_server_t *s, int fd, qdpn_connector_t *c, struct lws *wsi) {
-    if (!s->fd || fd >= s->fd_len) {
-        size_t oldlen = s->fd_len;
-        s->fd_len = fd + 16;    /* Don't double, low-range FDs will be re-used first. */
-        void *newfds = realloc(s->fd, s->fd_len*sizeof(*s->fd));
-        if (!newfds) return NULL;
-        s->fd = newfds;
-        memset(s->fd + oldlen, 0, sizeof(*s->fd)*(s->fd_len - oldlen));
-    }
-    fd_data_t *d = &s->fd[fd];
-    d->connector = c;
-    d->wsi = wsi;
-    return d;
-}
+static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs) {
+    log_init();                 /* Update log flags at each listener */
 
-/* Push read data into the transport.
- * Return 0 on success, number of bytes un-pushed on failure.
- */
-static int transport_push(pn_transport_t *t, pn_bytes_t buf) {
-    ssize_t cap;
-    while (buf.size > 0 && (cap = pn_transport_capacity(t)) > 0) {
-        if (buf.size > cap) {
-            pn_transport_push(t, buf.start, cap);
-            buf.start += cap;
-            buf.size -= cap;
-        } else {
-            pn_transport_push(t, buf.start, buf.size);
-            buf.size = 0;
-        }
+    const qd_server_config_t *config = hl->listener->config;
+
+    int port = qd_port_int(config->port);
+    snprintf(hl->host_port, sizeof(hl->host_port), "%s:%s", config->host, config->port);
+    if (port <= 0) {
+        qd_log(hs->log, QD_LOG_ERROR, "HTTP listener %s invalid port", hl->host_port);
+        goto error;
     }
-    return buf.size;
-}
+    struct lws_http_mount *m = &hl->mount;
+    m->mountpoint = "/";    /* URL mount point */
+    m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
+    m->origin = (config->http_root && *config->http_root) ? /* File system root */
+        config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
+    m->def = "index.html";  /* Default file name */
+    m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
 
-static inline int normal_close(struct lws *wsi, const char *msg) {
-    lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg));
-    return -1;
+    struct lws_context_creation_info info = {0};
+    info.mounts = m;
+    info.port = port;
+    info.protocols = protocols;
+    info.keepalive_timeout = 1;
+    info.ssl_cipher_list = CIPHER_LIST;
+    info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
+    if (config->ssl_profile) {
+        info.ssl_cert_filepath = config->ssl_certificate_file;
+        info.ssl_private_key_filepath = config->ssl_private_key_file;
+        info.ssl_private_key_password = config->ssl_password;
+        info.ssl_ca_filepath = config->ssl_trusted_certificates;
+        info.options |=
+            LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
+            (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
+            (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
+    }
+    info.vhost_name = hl->host_port;
+    hl->vhost = lws_create_vhost(hs->context, &info);
+    if (hl->vhost) {
+        /* Store hl pointer in vhost */
+        void *vp = lws_protocol_vh_priv_zalloc(hl->vhost, &protocols[0], sizeof(hl));
+        memcpy(vp, &hl, sizeof(hl));
+        qd_log(hs->log, QD_LOG_NOTICE, "Listening for HTTP on %s", hl->host_port);
+        return;
+    } else {
+        qd_log(hs->log, QD_LOG_NOTICE, "Error listening for HTTP on %s", hl->host_port);
+        goto error;
+    }
+    return;
+
+  error:
+    qd_http_listener_free(hl);
 }
 
-static inline int unexpected_close(struct lws *wsi, const char *msg) {
-    lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg));
-    return -1;
+static void listener_close(qd_http_listener_t *hl, qd_http_server_t *hs) {
+    /* TODO aconway 2017-04-13: can't easily stop listeners under libwebsockets */
+    qd_log(hs->log, QD_LOG_ERROR, "Cannot close HTTP listener %s", hl->host_port);
 }
 
 /*
- * Callback for un-promoted HTTP connections, and low-level external poll operations.
+ * LWS callback for un-promoted HTTP connections.
  * Note main HTTP file serving is handled by the "mount" struct below.
- * Called with http lock held.
  */
 static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
                          void *user, void *in, size_t len)
 {
     switch (reason) {
 
-    case LWS_CALLBACK_HTTP:     /* Called if file mount can't find the file */
-        lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+    case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: {
+        /* TODO aconway 2017-05-15: policy support */
+        char peer[64];
+        lws_get_peer_simple(wsi, peer, sizeof(peer));
+        qd_log(wsi_log(wsi), QD_LOG_DEBUG, "Incoming HTTP connection to %s from %s",
+               wsi_listener(wsi)->host_port, peer);
+        return 0;
+    }
+    case LWS_CALLBACK_PROTOCOL_DESTROY:
+        qd_http_listener_free(wsi_listener(wsi));
         return -1;
 
-    case LWS_CALLBACK_ADD_POLL_FD: {
-        /* Record WSI against FD here, the connector will be recorded when lws_service returns. */
-        set_fd(wsi_http_server(wsi), lws_get_socket_fd(wsi), 0, wsi);
-        break;
-    }
-    case LWS_CALLBACK_DEL_POLL_FD: {
-        fd_data_t *d = fd_data(wsi_http_server(wsi), lws_get_socket_fd(wsi));
-        if (d) {
-            /* Tell dispatch to forget this FD, but let LWS do the actual close() */
-            if (d->connector) qdpn_connector_mark_closed(d->connector);
-            memset(d, 0, sizeof(*d));
-        }
-        break;
-    }
-    case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
-        struct lws_pollargs *p = (struct lws_pollargs*)in;
-        qdpn_connector_t *c = wsi_connector(wsi);
-        if (c) {
-            if (p->events & POLLIN) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
-            if (p->events & POLLOUT) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
-        }
-        break;
+    case LWS_CALLBACK_HTTP: {
+        /* Called if file mount can't find the file */
+        lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, (char*)in);
+        return -1;
     }
 
-    /* NOTE: Not using LWS_CALLBACK_LOCK/UNLOCK_POLL as we are serializing all HTTP work for now. */
-
     default:
-        break;
+        return 0;
     }
-
-    return 0;
 }
 
-/* Buffer to allocate extra header space required by LWS.  */
-typedef struct buffer_t { char *start; size_t size; size_t cap; } buffer_t;
+const char *qd_http_connection_name(qd_http_connection_t* hc) { return hc->name; }
+const char *qd_http_connection_hostip(qd_http_connection_t* hc) { return hc->hostip; }
+bool qd_http_connection_closed(qd_http_connection_t* hc) { return hc->closed; }
 
-/* Callbacks for promoted AMQP over WS connections.
- * Called with http lock held.
- */
+/* Wake up a connection managed by the http server thread */
+void qd_http_connection_wake(qd_http_connection_t *c)
+{
+    if (c && c->qd_conn->listener) {
+        qd_http_server_t *hs = wsi_server(c->wsi);
+        work_t w = { W_WAKE, c };
+        work_push(hs, w);
+    }
+}
+
+/* Callbacks for promoted AMQP over WS connections. */
 static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
                            void *user, void *in, size_t len)
 {
-    qdpn_connector_t *c = wsi_connector(wsi);
-    pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL;
+    qd_http_server_t *hs = wsi_server(wsi);
+    qd_http_connection_t *c = (qd_http_connection_t*)user;
 
     switch (reason) {
 
     case LWS_CALLBACK_ESTABLISHED: {
-        qd_log(wsi_http_server(wsi)->log, QD_LOG_DEBUG,
-               "Upgraded incoming HTTP connection from  %s[%"PRIu64"] to AMQP over WebSocket",
-               qdpn_connector_name(c),
-               qd_connection_connection_id((qd_connection_t*)qdpn_connector_context(c)));
-        memset(user, 0, sizeof(buffer_t));
-        break;
+        /* Upgrade accepted HTTP connection to AMQPWS */
+        memset(c, 0, sizeof(*c));
+        c->wsi = wsi;
+        qd_http_listener_t *hl = wsi_listener(wsi);
+        if (hl == NULL) {
+            return unexpected_close(c->wsi, "cannot-upgrade");
+        }
+        qd_connection_t *ctx = c->qd_conn = qd_server_connection_allocate();
+        if (c->qd_conn == NULL) {
+            return unexpected_close(c->wsi, "out-of-memory");
+        }
+        c->qd_conn->context = c;
+        c->qd_conn->listener = hl->listener;
+        lws_get_peer_simple(wsi, c->hostip, sizeof(c->hostip));
+        strncpy(c->name, c->hostip, sizeof(c->name));
+        int err = pn_connection_driver_init(&c->driver, c->qd_conn->pn_conn, NULL);
+        if (err) {
+            return unexpected_close(c->wsi, pn_code(err));
+        }
+        c->qd_conn->http = c;
+        c->qd_conn->server        = hs->server;
+        c->qd_conn->connection_id = qd_server_connection_id(c->qd_conn->server);
+        /* TODO aconway 2017-05-15: no policy checks for HTTP connections */
+        c->qd_conn->policy_counted = false;
+        const qd_server_config_t *config = hl->listener->config;
+        c->qd_conn->role = strdup(config->role);
+        c->qd_conn->pn_conn = c->driver.connection;
+        pn_connection_set_context(c->qd_conn->pn_conn, ctx);
+        c->qd_conn->collector = c->driver.collector;
+        qd_server_decorate_connection(c->qd_conn->server, c->qd_conn->pn_conn, config);
+
+        qd_log(hs->log, QD_LOG_DEBUG,
+               "[%"PRIu64"] upgraded HTTP connection from %s to AMQPWS",
+               qd_connection_connection_id(c->qd_conn), c->hostip);
+        pn_connection_driver_bind(&c->driver);
+        return handle_events(c);
     }
 
     case LWS_CALLBACK_SERVER_WRITEABLE: {
-        ssize_t size;
-        if (!t || (size = pn_transport_pending(t)) < 0) {
-            return normal_close(wsi, "write-closed");
-        }
-        if (size > 0) {
-            const void *start = pn_transport_head(t);
-            /* lws_write() demands LWS_PRE bytes of free space before the data */
-            size_t tmpsize = size + LWS_PRE;
-            buffer_t *wtmp = (buffer_t*)user;
-            if (wtmp->start == NULL || wtmp->cap < tmpsize) {
-                wtmp->start = realloc(wtmp->start, tmpsize);
-                wtmp->size = wtmp->cap = tmpsize;
+        if (handle_events(c)) return -1;
+        pn_bytes_t dbuf = pn_connection_driver_write_buffer(&c->driver);
+        if (dbuf.size) {
+            /* lws_write() demands LWS_PRE bytes of free space before the data,
+             * so we must copy from the driver's buffer to larger temporary wbuf
+             */
+            buffer_set_size(&c->wbuf, LWS_PRE + dbuf.size);
+            if (c->wbuf.start == NULL) {
+                return unexpected_close(c->wsi, "out-of-memory");
             }
-            if (wtmp->start == NULL) {
-                return unexpected_close(wsi, "out-of-memory");
-            }
-            void *tmpstart = wtmp->start + LWS_PRE;
-            memcpy(tmpstart, start, size);
-            ssize_t wrote = lws_write(wsi, tmpstart, size, LWS_WRITE_BINARY);
+            unsigned char* buf = (unsigned char*)c->wbuf.start + LWS_PRE;
+            memcpy(buf, dbuf.start, dbuf.size);
+            ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY);
             if (wrote < 0) {
-                pn_transport_close_head(t);
-                return normal_close(wsi, "write-error");
+                pn_connection_driver_write_close(&c->driver);
+                return unexpected_close(c->wsi, "write-error");
             } else {
-                pn_transport_pop(t, (size_t)wrote);
+                pn_connection_driver_write_done(&c->driver, wrote);
             }
         }
-        break;
+        return handle_events(c);
     }
 
     case LWS_CALLBACK_RECEIVE: {
-        if (!t || pn_transport_capacity(t) < 0) {
-            return normal_close(wsi, "read-closed");
-        }
-        if (transport_push(t, pn_bytes(len, in))) {
-            return unexpected_close(wsi, "read-overflow");
+        while (len > 0) {
+            if (handle_events(c)) return -1;
+            pn_rwbytes_t dbuf = pn_connection_driver_read_buffer(&c->driver);
+            if (dbuf.size == 0) {
+                return unexpected_close(c->wsi, "unexpected-data");
+            }
+            size_t copy = (len < dbuf.size) ? len : dbuf.size;
+            memcpy(dbuf.start, in, copy);
+            pn_connection_driver_read_done(&c->driver, copy);
+            len -= copy;
+            in = (char*)in + copy;
         }
-        break;
+        return handle_events(c);
     }
 
-    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
-        if (t) {
-            pn_transport_close_tail(t);
+    case LWS_CALLBACK_USER: {
+        pn_timestamp_t next_tick = pn_transport_tick(c->driver.transport, hs->now);
+        if (next_tick && next_tick > hs->now && next_tick < hs->next_tick) {
+            hs->next_tick = next_tick;
         }
+        return handle_events(c);
+    }
 
-    case LWS_CALLBACK_CLOSED:
-        break;
+    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
+        pn_connection_driver_read_close(&c->driver);
+        return handle_events(c);
+    }
+
+    case LWS_CALLBACK_CLOSED: {
+        qd_log(wsi_log(wsi), QD_LOG_DEBUG, "HTTP connection closed to %s from %s",
+               wsi_listener(wsi)->host_port, c->name);
+        if (c->driver.transport) {
+            pn_connection_driver_close(&c->driver);
+            handle_events(c);
+        }
+        pn_connection_driver_destroy(&c->driver);
+        c->qd_conn->pn_conn = NULL;
+        c->qd_conn->collector = NULL;
+        qd_connection_free(c->qd_conn);
+        free(c->wbuf.start);
+        return -1;
+    }
 
     default:
-        break;
+        return 0;
     }
-    return 0;
 }
 
-static void check_timer(void *void_http_server) {
-    qd_http_server_t *s = (qd_http_server_t*)void_http_server;
-    /* Run LWS global timer and forced-service checks. */
-    sys_mutex_lock(s->lock);
-    lws_service_fd(s->context, NULL);
-    while (!lws_service_adjust_timeout(s->context, 1, 0)) {
-        /* -1 timeout means just do forced service */
-        lws_plat_service_tsi(s->context, -1, 0);
-    }
-    if (!s->timer) {
-        s->timer = qd_timer(s->dispatch, check_timer, s);
-    }
-    sys_mutex_unlock(s->lock);
-    /* Timer is locked using server lock. */
-    qd_timer_cancel(s->timer);
-    qd_timer_schedule(s->timer, 1000); /* LWS wants per-second wakeups */
-}
+#define DEFAULT_TICK 1000
 
-static qd_http_listener_t * qdpn_connector_http_listener(qdpn_connector_t* c) {
-    qd_listener_t* ql = (qd_listener_t*)qdpn_listener_context(qdpn_connector_listener(c));
-    return qd_listener_http(ql);
+static pn_timestamp_t now(void)
+{
+    struct timespec now;
+#ifdef CLOCK_MONOTONIC_COARSE
+    int cid = CLOCK_MONOTONIC_COARSE;
+#else
+    int cid = CLOCK_MONOTONIC;
+#endif
+    if (clock_gettime(cid, &now)) {
+        qd_error_errno(errno, "clock_gettime");
+        exit(1);
+    }
+    return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
 }
 
-static void http_connector_process(qdpn_connector_t *c) {
-    qd_http_listener_t *hl = qdpn_connector_http_listener(c);
-    qd_http_server_t *s = hl->server;
-    sys_mutex_lock(s->lock);
-    int fd = qdpn_connector_get_fd(c);
-    fd_data_t *d = fd_data(s, fd);
-    /* Make sure we are still tracking this fd, could have been closed by timer */
-    if (d) {
-        pn_transport_t *t = qdpn_connector_transport(c);
-        int flags =
-            (qdpn_connector_hangup(c) ? POLLHUP : 0) |
-            (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) |
-            (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0);
-        struct lws_pollfd pfd = { fd, flags, flags };
-        if (pn_transport_pending(t) > 0) {
-            lws_callback_on_writable(d->wsi);
+static void* http_thread_run(void* v) {
+    qd_http_server_t *hs = v;
+    qd_log(hs->log, QD_LOG_INFO, "HTTP server thread running");
+    int result = 0;
+    while(result >= 0) {
+        /* Send a USER event to run transport ticks, may decrease hs->next_tick. */
+        hs->now = now();
+        hs->next_tick = hs->now + DEFAULT_TICK;
+        lws_callback_all_protocol(hs->context, &protocols[1], LWS_CALLBACK_USER);
+        lws_callback_all_protocol(hs->context, &protocols[2], LWS_CALLBACK_USER);
+        pn_millis_t timeout = (hs->next_tick > hs->now) ? hs->next_tick - hs->now : 1;
+
+        /* Run LWS event loop*/
+        result = lws_service(hs->context, timeout);
+
+        /* Process any work items on the queue */
+        for (work_t w = work_pop(hs); w.type != W_NONE; w = work_pop(hs)) {
+            switch (w.type) {
+            case W_NONE:
+                break;
+            case W_STOP:
+                result = -1;
+                break;
+            case W_LISTEN:
+                listener_start((qd_http_listener_t*)w.value, hs);
+                break;
+            case W_CLOSE:
+                listener_close((qd_http_listener_t*)w.value, hs);
+                break;
+            case W_WAKE: {
+                qd_http_connection_t *c = w.value;
+                pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
+                                 PN_CONNECTION_WAKE);
+                handle_events(c);
+                break;
+            }
+            }
         }
-        lws_service_fd(s->context, &pfd);
-        d = fd_data(s, fd);    /* We may have stopped tracking during service */
-        if (pn_transport_capacity(t) > 0)
-            qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE);
-        if (pn_transport_pending(t) > 0 || (d && lws_partial_buffered(d->wsi)))
-            qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE);
-        pn_timestamp_t wake = pn_transport_tick(t, qdpn_now(NULL));
-        if (wake) qdpn_connector_wakeup(c, wake);
-    }
-    sys_mutex_unlock(s->lock);
-    check_timer(s);             /* Make sure the timer is running */
+    }
+    qd_log(hs->log, QD_LOG_INFO, "HTTP server thread exit");
+    return NULL;
 }
 
-/* Dispatch closes a connector because it is HUP, socket_error or transport_closed()  */
-static void http_connector_close(qdpn_connector_t *c) {
-    int fd = qdpn_connector_get_fd(c);
-    qd_http_server_t *s = qdpn_connector_http_listener(c)->server;
-    sys_mutex_lock(s->lock);
-    fd_data_t *d = fd_data(s, fd);
-    if (d) {                    /* Only if we are still tracking fd */
-        /* Shutdown but let LWS do the close(),  possibly in later timer */
-        shutdown(qdpn_connector_get_fd(c), SHUT_RDWR);
-        short flags = POLLIN|POLLOUT|POLLHUP;
-        struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags };
-        lws_service_fd(s->context, &pfd);
-        qdpn_connector_mark_closed(c);
-        memset(d, 0 , sizeof(*d));
-    }
-    sys_mutex_unlock(s->lock);
+void qd_http_server_free(qd_http_server_t *hs) {
+    if (!hs) return;
+    if (hs->thread) {
+        /* Thread safe, stop via work queue then clean up */
+        work_t work = { W_STOP, NULL };
+        work_push(hs, work);
+        sys_thread_join(hs->thread);
+        sys_thread_free(hs->thread);
+        hs->thread = NULL;
+    }
+    work_queue_destroy(&hs->work);
+    if (hs->context) lws_context_destroy(hs->context);
+    free(hs);
 }
 
-static struct qdpn_connector_methods_t http_methods = {
-    http_connector_process,
-    http_connector_close
-};
-
-void qd_http_listener_accept(qd_http_listener_t *hl, qdpn_connector_t *c) {
-    qd_http_server_t *s = hl->server;
-    sys_mutex_lock(s->lock);
-    int fd = qdpn_connector_get_fd(c);
-    struct lws *wsi = lws_adopt_socket_vhost(hl->vhost, fd);
-    fd_data_t *d = fd_data(s, fd);
-    if (d) {          /* FD was adopted by LWS, so dispatch must not close it */
-        qdpn_connector_set_methods(c, &http_methods);
-        if (wsi) d->connector = c;
-    }
-    sys_mutex_unlock(s->lock);
-    if (!wsi) {       /* accept failed, dispatch should forget the FD. */
-        qdpn_connector_mark_closed(c);
+qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log) {
+    log_init();
+    qd_http_server_t *hs = calloc(1, sizeof(*hs));
+    if (hs) {
+        work_queue_init(&hs->work);
+        struct lws_context_creation_info info = {0};
+        info.gid = info.uid = -1;
+        info.user = hs;
+        info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
+        info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
+            LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
+            LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
+        info.max_http_header_pool = 32;
+        info.timeout_secs = 1;
+
+        hs->context = lws_create_context(&info);
+        hs->server = s;
+        hs->log = log;              /* For messages from this file */
+        if (!hs->context) {
+            qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server");
+            qd_http_server_free(hs);
+            hs = NULL;
+        }
     }
+    return hs;
 }
 
-static struct lws_protocols protocols[] = {
-    /* HTTP only protocol comes first */
-    {
-        "http-only",
-        callback_http,
-        0,
-    },
-    /* "amqp" is the official oasis AMQP over WebSocket protocol name */
-    {
-        "amqp",
-        callback_amqpws,
-        sizeof(buffer_t),
-    },
-    /* "binary" is an alias for "amqp", for compatibility with clients designed
-     * to work with a WebSocket proxy
-     */
-    {
-        "binary",
-        callback_amqpws,
-        sizeof(buffer_t),
-    },
-    { NULL, NULL, 0, 0 } /* terminator */
-};
+/* Thread safe calls that put items on work queue */
 
-static qd_log_level_t qd_level(int lll) {
-    switch (lll) {
-    case LLL_ERR: return QD_LOG_ERROR;
-    case LLL_WARN: return QD_LOG_WARNING;
-    case LLL_NOTICE: return QD_LOG_INFO;
-    case LLL_INFO:return QD_LOG_DEBUG;
-    case LLL_DEBUG: return QD_LOG_TRACE;
-    default: return QD_LOG_NONE;
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li)
+{
+    sys_mutex_lock(hs->work.lock);
+    if (!hs->thread) {
+        hs->thread = sys_thread(http_thread_run, hs);
     }
+    bool ok = hs->thread;
+    sys_mutex_unlock(hs->work.lock);
+    if (!ok) return NULL;
+
+    qd_http_listener_t *hl = qd_http_listener(hs, li);
+    if (hl) {
+        work_t w = { W_LISTEN, hl };
+        work_push(hs, w);
+    }
+    return hl;
 }
 
-static void emit_lws_log(int lll, const char *line)  {
-    size_t  len = strlen(line);
-    while (len > 1 && isspace(line[len-1]))
-        --len;
-    qd_log(http_log, qd_level(lll), "%.*s", len, line);
-}
-
-qd_http_server_t *qd_http_server(qd_dispatch_t *d, qd_log_source_t *log) {
-    if (!http_log) http_log = qd_log_source("HTTP");
-    qd_http_server_t *s = calloc(1, sizeof(*s));
-    if (!s) return NULL;
-    s->log = log;
-    s->lock = sys_mutex();
-    s->dispatch = d;
-    int levels =
-        (qd_log_enabled(log, QD_LOG_ERROR) ? LLL_ERR : 0) |
-        (qd_log_enabled(log, QD_LOG_WARNING) ? LLL_WARN : 0) |
-        (qd_log_enabled(log, QD_LOG_INFO) ? LLL_NOTICE : 0) |
-        (qd_log_enabled(log, QD_LOG_DEBUG) ? LLL_INFO : 0) |
-        (qd_log_enabled(log, QD_LOG_TRACE) ? LLL_DEBUG : 0);
-    lws_set_log_level(levels, emit_lws_log);
-
-    struct lws_context_creation_info info = {0};
-    info.gid = info.uid = -1;
-    info.user = s;
-    info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE;
-    info.options = LWS_SERVER_OPTION_EXPLICIT_VHOSTS |
-        LWS_SERVER_OPTION_SKIP_SERVER_CANONICAL_NAME |
-        LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
-    info.max_http_header_pool = 32;
-    info.timeout_secs = 1;
-    s->context = lws_create_context(&info);
-    if (!s->context) {
-        free(s);
-        return NULL;
-    }
-    return s;
+void qd_http_listener_close(qd_http_listener_t *hl)
+{
+    work_t w = { W_CLOSE, hl };
+    work_push(hl->server, w);
 }
 
-void qd_http_server_free(qd_http_server_t *s) {
-    sys_mutex_free(s->lock);
-    lws_context_destroy(s->context);
-    if (s->timer) qd_timer_free(s->timer);
-    if (s->fd) free(s->fd);
-    free(s);
+static qd_http_server_t *wsi_server(struct lws *wsi) {
+    return (qd_http_server_t*)lws_context_user(lws_get_context(wsi));
 }
 
-qd_http_listener_t *qd_http_listener(qd_http_server_t *s, const qd_server_config_t *config) {
-    qd_http_listener_t *hl = calloc(1, sizeof(*hl));
-    if (!hl) return NULL;
-    hl->server = s;
-
-    struct lws_context_creation_info info = {0};
-
-    struct lws_http_mount *m = &hl->mount;
-    m->mountpoint = "/";    /* URL mount point */
-    m->mountpoint_len = strlen(m->mountpoint); /* length of the mountpoint */
-    m->origin = (config->http_root && *config->http_root) ? /* File system root */
-        config->http_root : QPID_CONSOLE_STAND_ALONE_INSTALL_DIR;
-    m->def = "index.html";  /* Default file name */
-    m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
-    info.mounts = m;
-    info.port = CONTEXT_PORT_NO_LISTEN_SERVER; /* Don't use LWS listener */
-    info.protocols = protocols;
-    info.keepalive_timeout = 1;
-    info.ssl_cipher_list = CIPHER_LIST;
-    info.options |= LWS_SERVER_OPTION_VALIDATE_UTF8;
-    if (config->ssl_profile) {
-        info.ssl_cert_filepath = config->ssl_certificate_file;
-        info.ssl_private_key_filepath = config->ssl_private_key_file;
-        info.ssl_private_key_password = config->ssl_password;
-        info.ssl_ca_filepath = config->ssl_trusted_certificates;
-        info.options |=
-            LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT |
-            (config->ssl_required ? 0 : LWS_SERVER_OPTION_ALLOW_NON_SSL_ON_SSL_PORT) |
-            (config->requireAuthentication ? LWS_SERVER_OPTION_REQUIRE_VALID_OPENSSL_CLIENT_CERT : 0);
-    }
-    snprintf(hl->name, sizeof(hl->name), "vhost%x", s->vhost_id++);
-    info.vhost_name = hl->name;
-    hl->vhost = lws_create_vhost(s->context, &info);
-    if (!hl->vhost) {
-        free(hl);
-        return NULL;
+static qd_http_listener_t *wsi_listener(struct lws *wsi) {
+    qd_http_listener_t *hl = NULL;
+    struct lws_vhost *vhost = lws_get_vhost(wsi);
+    if (vhost) {                /* Get qd_http_listener from vhost data */
+        void *vp = lws_protocol_vh_priv_get(vhost, &protocols[0]);
+        memcpy(&hl, vp, sizeof(hl));
     }
     return hl;
 }
 
-void qd_http_listener_free(qd_http_listener_t *hl) {
-    free(hl);
+static qd_log_source_t *wsi_log(struct lws *wsi) {
+    return wsi_server(wsi)->log;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/http-none.c
----------------------------------------------------------------------
diff --git a/src/http-none.c b/src/http-none.c
index b9af9e1..bdc32b0 100644
--- a/src/http-none.c
+++ b/src/http-none.c
@@ -18,32 +18,27 @@
  */
 
 #include <qpid/dispatch/log.h>
-#include <qpid/dispatch/driver.h>
 #include "http.h"
 
+struct qd_dispatch_t;
+
 /* No HTTP implementation available. */
 
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *d, qd_log_source_t *log)
+qd_http_server_t *qd_http_server(struct qd_server_t *s, qd_log_source_t *log)
 {
     qd_log(log, QD_LOG_WARNING, "HTTP support is not available");
     return 0;
 }
 
-void qd_http_server_free(qd_http_server_t *h)
-{
-}
+void qd_http_server_free(qd_http_server_t *h) {}
 
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
-                                     const struct qd_server_config_t *config)
-{
-    return 0;
-}
+void* qd_http_server_run(void* qd_http_server) { return 0; }
 
-void qd_http_listener_free(qd_http_listener_t *hl)
-{
-}
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li) { return 0; }
 
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c)
-{
-}
+struct qd_qd_http_connection_t { int dummy; };
 
+const char *qd_http_connection_name(qd_http_connection_t* hc) { return ""; }
+const char *qd_http_connection_hostip(qd_http_connection_t* hc) { return ""; }
+bool qd_http_connection_closed(qd_http_connection_t* hc) { return false; }
+void qd_http_connection_wake(qd_http_connection_t *qd_conn) {}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/http.h
----------------------------------------------------------------------
diff --git a/src/http.h b/src/http.h
index fae3f1d..8c064d2 100644
--- a/src/http.h
+++ b/src/http.h
@@ -23,17 +23,21 @@
 typedef struct qd_http_listener_t qd_http_listener_t;
 typedef struct qd_http_server_t qd_http_server_t;
 
-struct qd_dispatch_t;
+struct qd_listener_t;
 struct qd_log_source_t;
 struct qd_server_config_t;
-struct qdpn_connector_t;
+struct qd_server_t;
+
+qd_http_server_t *qd_http_server(struct qd_server_t *server, struct qd_log_source_t *log);
 
-qd_http_server_t *qd_http_server(struct qd_dispatch_t *dispatch, struct qd_log_source_t *log);
 void qd_http_server_free(qd_http_server_t*);
-qd_http_listener_t *qd_http_listener(struct qd_http_server_t *s,
-                                     const struct qd_server_config_t *config);
-void qd_http_listener_free(qd_http_listener_t *hl);
-/* On error, qdpn_connector_closed(c) is true. */
-void qd_http_listener_accept(qd_http_listener_t *hl, struct qdpn_connector_t *c);
+qd_http_listener_t *qd_http_server_listen(qd_http_server_t *s, struct qd_listener_t *li);
+
+typedef struct qd_http_connection_t qd_http_connection_t;
+
+const char *qd_http_connection_name(qd_http_connection_t* hc);
+const char *qd_http_connection_hostip(qd_http_connection_t* hc);
+bool qd_http_connection_closed(qd_http_connection_t* hc);
+void qd_http_connection_wake(qd_http_connection_t* hc);
 
 #endif // QD_HTTP_H

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/server.c
----------------------------------------------------------------------
diff --git a/src/server.c b/src/server.c
index 42fed58..8d069e2 100644
--- a/src/server.c
+++ b/src/server.c
@@ -92,18 +92,6 @@ struct qd_server_t {
 };
 
 /**
- * Listener objects represent the desire to accept incoming transport connections.
- */
-struct qd_listener_t {
-    qd_server_t              *server;
-    const qd_server_config_t *config;
-    void                     *context;
-    qdpn_listener_t          *pn_listener;
-    qd_http_listener_t       *http;
-};
-
-
-/**
  * Connector objects represent the desire to create and maintain an outgoing transport connection.
  */
 struct qd_connector_t {
@@ -160,7 +148,9 @@ static qd_thread_t *thread(qd_server_t *qd_server, int id)
     return thread;
 }
 
-static void free_qd_connection(qd_connection_t *ctx)
+static void invoke_deferred_calls(qd_connection_t *conn, bool discard);
+
+void qd_connection_free(qd_connection_t *ctx)
 {
     if (ctx->policy_settings) {
         if (ctx->policy_settings->sources)
@@ -185,6 +175,10 @@ static void free_qd_connection(qd_connection_t *ctx)
 
     free(ctx->role);
 
+    if (ctx->deferred_call_lock) {
+        invoke_deferred_calls(ctx, true);  // Discard any pending deferred calls
+        sys_mutex_free(ctx->deferred_call_lock);
+    }
     free_qd_connection_t(ctx);
 }
 
@@ -423,7 +417,7 @@ static const char *transport_get_user(qd_connection_t *conn, pn_transport_t *tpo
  * Allocate a new qd_connection
  *  with DEQ items initialized, call lock allocated, and all other fields cleared.
  */
-static qd_connection_t *connection_allocate()
+qd_connection_t *qd_server_connection_allocate()
 {
     qd_connection_t *ctx = new_qd_connection_t();
     ZERO(ctx);
@@ -434,6 +428,14 @@ static qd_connection_t *connection_allocate()
     return ctx;
 }
 
+/* Get the next connection ID, thread safe */
+uint64_t qd_server_connection_id(qd_server_t *server)
+{
+    sys_mutex_lock(server->lock);
+    uint64_t id = server->next_connection_id++;
+    sys_mutex_unlock(server->lock);
+    return id;
+}
 
 void qd_connection_set_user(qd_connection_t *conn)
 {
@@ -525,7 +527,7 @@ static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr)
 }
 
 
-static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config)
+void qd_server_decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config)
 {
     size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
 
@@ -621,7 +623,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
 
         char logbuf[qd_log_max_len()];
 
-        ctx = connection_allocate();
+        ctx = qd_server_connection_allocate();
         ctx->server        = qd_server;
         ctx->owner_thread  = CONTEXT_UNSPECIFIED_OWNER;
         ctx->pn_cxtr       = cxtr;
@@ -638,7 +640,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
         pn_connection_t *conn = pn_connection();
         ctx->collector = pn_collector();
         pn_connection_collect(conn, ctx->collector);
-        decorate_connection(qd_server, conn, ctx->listener->config);
+        qd_server_decorate_connection(qd_server, conn, ctx->listener->config);
         qdpn_connector_set_connection(cxtr, conn);
         pn_connection_set_context(conn, ctx);
         ctx->pn_conn = conn;
@@ -672,13 +674,7 @@ static void thread_process_listeners_LH(qd_server_t *qd_server)
             pn_transport_set_tracer(tport, transport_tracer);
         }
 
-        if (li->http) {
-            // Set up HTTP if configured, HTTP provides its own SSL.
-            qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring HTTP%s on %s",
-                   config->ssl_profile ? "S" : "",
-                   log_incoming(logbuf, sizeof(logbuf), cxtr));
-            qd_http_listener_accept(li->http, cxtr);
-        } else if (config->ssl_profile)  {
+        if (!li->http && config->ssl_profile)  {
             // Set up SSL if configured and HTTP is not providing SSL.
             qd_log(qd_server->log_source, QD_LOG_TRACE, "Configuring SSL on %s",
                    log_incoming(logbuf, sizeof(logbuf), cxtr));
@@ -766,31 +762,32 @@ static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
 }
 
 
-static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
+int qd_connection_process(qd_connection_t *ctx)
 {
-    qd_connection_t *ctx = qdpn_connector_context(cxtr);
     int events = 0;
     int passes = 0;
+    qdpn_connector_t *cxtr = ctx->pn_cxtr;
+    qd_http_connection_t *hc = ctx->http;
 
     if (ctx->closed)
         return 0;
-
+    qd_server_t *qd_server = ctx->server;
     do {
         passes++;
 
         //
         // Step the engine for pre-handler processing
         //
-        qdpn_connector_process(cxtr);
-
+        if (cxtr) {
+            qdpn_connector_process(cxtr);
+        }
         //
         // If the connector has closed, notify the client via callback.
         //
-        if (qdpn_connector_closed(cxtr)) {
+        if ((cxtr && qdpn_connector_closed(cxtr)) || (hc && qd_http_connection_closed(hc))) {
             if (ctx->opened)
                 qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
-                                        QD_CONN_EVENT_CLOSE,
-                                        (qd_connection_t*) qdpn_connector_context(cxtr));
+                                        QD_CONN_EVENT_CLOSE, ctx);
             ctx->closed = true;
             events = 0;
             break;
@@ -798,7 +795,7 @@ static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
 
         invoke_deferred_calls(ctx, false);
 
-        qd_connection_t *qd_conn   = (qd_connection_t*) qdpn_connector_context(cxtr);
+        qd_connection_t *qd_conn   = ctx;
         pn_collector_t  *collector = qd_connection_collector(qd_conn);
         pn_event_t      *event;
 
@@ -1046,7 +1043,7 @@ static void *thread_run(void *arg)
             // Even if the connector has failed there are still events that 
             // must be processed so that associated links will be cleaned up.
             //
-            work_done = process_connector(qd_server, cxtr);
+            work_done = qd_connection_process(ctx);
 
             //
             // Check to see if the connector was closed during processing
@@ -1070,9 +1067,7 @@ static void *thread_run(void *arg)
                 }
 
                 qdpn_connector_free(cxtr);
-                invoke_deferred_calls(ctx, true);  // Discard any pending deferred calls
-                sys_mutex_free(ctx->deferred_call_lock);
-                free_qd_connection(ctx);
+                qd_connection_free(ctx);
                 qd_server->threads_active--;
                 sys_mutex_unlock(qd_server->lock);
             } else {
@@ -1145,7 +1140,7 @@ static void cxtr_try_open(void *context)
     if (ct->state != CXTR_STATE_CONNECTING)
         return;
 
-    qd_connection_t *ctx = connection_allocate();
+    qd_connection_t *ctx = qd_server_connection_allocate();
     ctx->server       = ct->server;
     ctx->owner_thread = CONTEXT_UNSPECIFIED_OWNER;
     ctx->pn_conn      = pn_connection();
@@ -1161,7 +1156,7 @@ static void cxtr_try_open(void *context)
     qd_log(ct->server->log_source, QD_LOG_INFO, "Connecting to %s:%s", ct->config->host, ct->config->port);
 
     pn_connection_collect(ctx->pn_conn, ctx->collector);
-    decorate_connection(ctx->server, ctx->pn_conn, ct->config);
+    qd_server_decorate_connection(ctx->server, ctx->pn_conn, ct->config);
 
     //
     // qdpn_connector is not thread safe
@@ -1175,8 +1170,7 @@ static void cxtr_try_open(void *context)
     const qd_server_config_t *config = ct->config;
 
     if (ctx->pn_cxtr == 0) {
-        sys_mutex_free(ctx->deferred_call_lock);
-        free_qd_connection(ctx);
+        qd_connection_free(ctx);
         ct->delay = 10000;
         qd_timer_schedule(ct->timer, ct->delay);
         return;
@@ -1354,7 +1348,7 @@ qd_server_t *qd_server(qd_dispatch_t *qd, int thread_count, const char *containe
     qd_server->heartbeat_timer        = 0;
     qd_server->next_connection_id     = 1;
     qd_server->py_displayname_obj     = 0;
-    qd_server->http                   = qd_http_server(qd, qd_server->log_source);
+    qd_server->http                   = qd_http_server(qd_server, qd_server->log_source);
     qd_log(qd_server->log_source, QD_LOG_INFO, "Container Name: %s", qd_server->container_name);
 
     return qd_server;
@@ -1534,17 +1528,16 @@ void qd_server_resume(qd_dispatch_t *qd)
 
 void qd_server_activate(qd_connection_t *ctx, bool awaken)
 {
-    if (!ctx)
-        return;
-
     qdpn_connector_t *ctor = ctx->pn_cxtr;
-    if (!ctor)
-        return;
-
-    if (!qdpn_connector_closed(ctor)) {
-        qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
-        if (awaken)
-            qdpn_driver_wakeup(ctx->server->driver);
+    qd_http_connection_t *hc = ctx->http;
+    if (ctor) {
+        if (!qdpn_connector_closed(ctor)) {
+            qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE);
+            if (awaken)
+                qdpn_driver_wakeup(ctx->server->driver);
+        }
+    } else if (hc) {
+        qd_http_connection_wake(hc);
     }
 }
 
@@ -1636,6 +1629,7 @@ qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *con
 {
     qd_server_t   *qd_server = qd->server;
     qd_listener_t *li        = new_qd_listener_t();
+    ZERO(li);
 
     if (!li)
         return 0;
@@ -1646,23 +1640,23 @@ qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *con
     li->http = NULL;
 
     if (config->http) {
-        li->http = qd_http_listener(qd_server->http, config);
+        li->http = qd_http_server_listen(qd_server->http, li);
         if (!li->http) {
             free_qd_listener_t(li);
             qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start HTTP listener on %s:%s",
                    config->host, config->port);
             return NULL;
         }
-    }
+    } else {
+        li->pn_listener = qdpn_listener(
+            qd_server->driver, config->host, config->port, config->protocol_family, li);
 
-    li->pn_listener = qdpn_listener(
-        qd_server->driver, config->host, config->port, config->protocol_family, li);
-
-    if (!li->pn_listener) {
-        free_qd_listener_t(li);
-        qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start listener on %s:%s",
-               config->host, config->port);
-        return NULL;
+        if (!li->pn_listener) {
+            free_qd_listener_t(li);
+            qd_log(qd_server->log_source, QD_LOG_ERROR, "Cannot start listener on %s:%s",
+                   config->host, config->port);
+            return NULL;
+        }
     }
     qd_log(qd_server->log_source, QD_LOG_TRACE, "Listening on %s:%s%s", config->host, config->port,
            config->http ? (config->ssl_profile ? "(HTTPS)":"(HTTP)") : "");
@@ -1675,15 +1669,15 @@ void qd_server_listener_free(qd_listener_t* li)
 {
     if (!li)
         return;
-    if (li->http) qd_http_listener_free(li->http);
-    qdpn_listener_free(li->pn_listener);
+    /* The http listener is freed by the http server */
+    if (li->pn_listener) qdpn_listener_free(li->pn_listener);
     free_qd_listener_t(li);
 }
 
 
 void qd_server_listener_close(qd_listener_t* li)
 {
-    if (li)
+    if (li && li->pn_listener)
         qdpn_listener_close(li->pn_listener);
 }
 
@@ -1741,11 +1735,11 @@ void qd_server_timer_cancel_LH(qd_timer_t *timer)
 qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; }
 
 const char* qd_connection_name(const qd_connection_t *c) {
-    return qdpn_connector_name(c->pn_cxtr);
+    return c->pn_cxtr ? qdpn_connector_name(c->pn_cxtr) : qd_http_connection_name(c->http);
 }
 
 const char* qd_connection_hostip(const qd_connection_t *c) {
-    return qdpn_connector_hostip(c->pn_cxtr);
+    return c->pn_cxtr ? qdpn_connector_hostip(c->pn_cxtr) : qd_http_connection_hostip(c->http);
 }
 
 qd_connector_t* qd_connection_connector(const qd_connection_t *c) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/b1e27480/src/server_private.h
----------------------------------------------------------------------
diff --git a/src/server_private.h b/src/server_private.h
index 576b5ec..b613133 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -35,17 +35,22 @@
 void qd_server_timer_pending_LH(qd_timer_t *timer);
 void qd_server_timer_cancel_LH(qd_timer_t *timer);
 
-/* FIXME aconway 2017-01-19: to include/server.h? */
-
 struct qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
 
 const char* qd_connection_name(const qd_connection_t *c);
 const char* qd_connection_hostip(const qd_connection_t *c);
 qd_connector_t* qd_connection_connector(const qd_connection_t *c);
+void qd_connection_free(qd_connection_t *c);
+int qd_connection_process(qd_connection_t *ctx);
 
 const qd_server_config_t *qd_connector_config(const qd_connector_t *c);
 
-qd_http_listener_t *qd_listener_http(qd_listener_t *l);
+uint64_t qd_server_connection_id(qd_server_t *server);
+qd_connection_t *qd_server_connection_allocate();
+
+void qd_server_decorate_connection(
+    qd_server_t *qd_server, pn_connection_t *conn, const qd_server_config_t *config);
+
 
 #define CONTEXT_NO_OWNER -1
 #define CONTEXT_UNSPECIFIED_OWNER -2
@@ -81,6 +86,25 @@ typedef struct qd_pn_free_link_session_t {
 
 DEQ_DECLARE(qd_pn_free_link_session_t, qd_pn_free_link_session_list_t);
 
+#ifndef NI_MAXHOST
+# define NI_MAXHOST 1025
+#endif
+
+#ifndef NI_MAXSERV
+# define NI_MAXSERV 32
+#endif
+
+/**
+ * Listener objects represent the desire to accept incoming transport connections.
+ */
+struct qd_listener_t {
+    qd_server_t              *server;
+    const qd_server_config_t *config;
+    void                     *context;
+    qdpn_listener_t          *pn_listener;
+    qd_http_listener_t       *http;
+};
+
 /**
  * Connection objects wrap Proton connection objects.
  */
@@ -92,6 +116,7 @@ struct qd_connection_t {
     int                       owner_thread;
     int                       enqueued;
     qdpn_connector_t         *pn_cxtr;
+    qd_http_connection_t     *http;
     pn_connection_t          *pn_conn;
     pn_collector_t           *collector;
     pn_ssl_t                 *ssl;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message