Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A847C200BD8 for ; Wed, 7 Dec 2016 18:16:45 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A6DAE160AF9; Wed, 7 Dec 2016 17:16:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7EA69160B0C for ; Wed, 7 Dec 2016 18:16:44 +0100 (CET) Received: (qmail 77781 invoked by uid 500); 7 Dec 2016 17:16:43 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 77771 invoked by uid 99); 7 Dec 2016 17:16:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Dec 2016 17:16:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9C0CBE0159; Wed, 7 Dec 2016 17:16:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Message-Id: <692463ab6599468cbe11fcb1db6aa8b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: qpid-dispatch git commit: DISPATCH-103: Router serves standalone console files, stability fixes. Date: Wed, 7 Dec 2016 17:16:43 +0000 (UTC) archived-at: Wed, 07 Dec 2016 17:16:45 -0000 Repository: qpid-dispatch Updated Branches: refs/heads/master 4ae3c161d -> 4fb102058 DISPATCH-103: Router serves standalone console files, stability fixes. The router now serves the standalone console files so you can connect a browser direct to a HTTP-enabled router port to use the console. - requires libwebsockets 2.1 - packaged on fedora 25, build from source elsewhere. - router must be installed `make install` to find the console files. Fixed some stability and performance issues, currently appears to be stable and reasonably fast. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/4fb10205 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/4fb10205 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/4fb10205 Branch: refs/heads/master Commit: 4fb102058f0695e357859f281ac9723b519ad1c3 Parents: 4ae3c16 Author: Alan Conway Authored: Mon Dec 5 17:04:38 2016 -0500 Committer: Alan Conway Committed: Wed Dec 7 12:12:53 2016 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 5 + console/CMakeLists.txt | 5 +- src/config.h.in | 1 + src/http-libwebsockets.c | 297 ++++++++++++++++++++++++++---------------- src/posix/driver.c | 11 +- src/server.c | 3 +- 6 files changed, 205 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index b0c1f45..51bbc17 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -70,6 +70,11 @@ set(DOC_INSTALL_DIR ${SHARE_INSTALL_DIR}/doc CACHE PATH "Documentation directory set(QD_DOC_INSTALL_DIR ${SHARE_INSTALL_DIR}/doc/qpid-dispatch CACHE PATH "Qpid dispatch documentation directory") set(MAN_INSTALL_DIR share/man CACHE PATH "Manpage directory") set(QPID_DISPATCH_HOME_INSTALLED ${CMAKE_INSTALL_PREFIX}/${QPID_DISPATCH_HOME}) + +set(CONSOLE_BASE_INSTALL_DIR "share/qpid-dispatch/console") +set(CONSOLE_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/${CONSOLE_BASE_INSTALL_DIR}") +set(CONSOLE_STAND_ALONE_INSTALL_DIR "${CONSOLE_INSTALL_DIR}/stand-alone") + set(RUN ${PYTHON_EXECUTABLE} ${CMAKE_BINARY_DIR}/run.py) # define the configuration directory based on whether or not the install prefix is defined http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/console/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/console/CMakeLists.txt b/console/CMakeLists.txt index e64f518..7ff9333 100644 --- a/console/CMakeLists.txt +++ b/console/CMakeLists.txt @@ -17,9 +17,7 @@ ## under the License. ## -set(CONSOLE_BASE_INSTALL_DIR "share/qpid-dispatch/console") set(CONSOLE_BASE_SOURCE_DIR "${CMAKE_SOURCE_DIR}/console/stand-alone/") -set(CONSOLE_INSTALL_DIR "${CMAKE_INSTALL_PREFIX}/${CONSOLE_BASE_INSTALL_DIR}") ## ## Add option to not install the stand-alone console @@ -29,7 +27,8 @@ if(CONSOLE_INSTALL) # Static console files install( - DIRECTORY ${CONSOLE_BASE_SOURCE_DIR} DESTINATION ${CONSOLE_INSTALL_DIR}/stand-alone + DIRECTORY ${CONSOLE_BASE_SOURCE_DIR} + DESTINATION ${CONSOLE_STAND_ALONE_INSTALL_DIR} ) endif(CONSOLE_INSTALL) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/config.h.in ---------------------------------------------------------------------- diff --git a/src/config.h.in b/src/config.h.in index 0e0c23c..74a38fb 100644 --- a/src/config.h.in +++ b/src/config.h.in @@ -19,5 +19,6 @@ #define QPID_DISPATCH_VERSION "${QPID_DISPATCH_VERSION}" #define QPID_DISPATCH_LIB "${QPID_DISPATCH_LIB}" +#define QPID_CONSOLE_STAND_ALONE_INSTALL_DIR "${CONSOLE_STAND_ALONE_INSTALL_DIR}" #cmakedefine01 USE_MEMORY_POOL #cmakedefine01 QD_MEMORY_STATS http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/http-libwebsockets.c ---------------------------------------------------------------------- diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index b33a0d5..c456b07 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -30,51 +30,59 @@ #include #include "http.h" +#include "config.h" +/* Shared context for all HTTP connections. */ struct qd_http_t { sys_mutex_t *lock; qd_dispatch_t *dispatch; qd_log_source_t *log; struct lws_context *context; qd_timer_t *timer; + qdpn_connector_t **connectors; /* Indexed by file descriptor */ + size_t connectors_len; }; -/* TODO aconway 2016-11-29: First cut serializes all access to libwebsockets. - * LWS does have multi-thread facilities but it segregates file descriptors into - * "serialization groups" which does not match well with dispatches current - * and planned future threading strategies. Review when we refactor dispatch - * to use the pn_proactor. At least 2 possibilities: - * - * - treat LWS as single-threaded IO code in the 'leader follower' model, - * analogous to how we handle libuv. - * - work with LWS upstream to abstract out IO code so each LWS WSI can operate - * as a thread-independent unit, like the proton connection_driver. - */ +static inline qdpn_connector_t *fd_connector(qd_http_t *h, int fd) { + return (fd < h->connectors_len) ? h->connectors[fd] : NULL; +} + +static inline qd_http_t *wsi_http(struct lws *wsi) { + return (qd_http_t *)lws_context_user(lws_get_context(wsi)); +} -static __thread struct { - qdpn_connector_t *connector; /* Set before each lws_service call */ -} per_thread = { NULL }; +static inline qdpn_connector_t *wsi_connector(struct lws *wsi) { + return fd_connector(wsi_http(wsi), lws_get_socket_fd(wsi)); +} -typedef struct buffer_t { void *start; size_t size; size_t cap; } buffer_t; +static inline int set_fd(qd_http_t *h, int fd, qdpn_connector_t *c) { + if (fd >= h->connectors_len) { + size_t len = h->connectors_len; + h->connectors_len = (fd+1)*2; + h->connectors = realloc(h->connectors, h->connectors_len*sizeof(qdpn_connector_t*)); + if (!h->connectors) return -1; + memset(h->connectors + len, 0, h->connectors_len - len); + } + h->connectors[fd] = c; + return 0; +} -/* Extra buffering per connection, stored in the lws_wsi_user() space. */ -typedef struct buffers_t { - buffer_t wtmp; /* Temp buffer with pre-data header space required by LWS */ - buffer_t over; /* Can't control LWS read size, buffer the overflow */ - char name[256]; /* Copy of connector name for use after connector detached */ -} buffers_t; - -static void resize(buffer_t *b, size_t size) { - /* FIXME aconway 2016-11-30: handle alloc failure */ - if (b->start == NULL || b->cap < size) { - b->start = realloc(b->start, size); - b->size = b->cap = size; +/* Mark the qd connector closed, but leave the FD for LWS to clean up */ +int mark_closed(struct lws *wsi) { + qd_http_t *h = wsi_http(wsi); + int fd = lws_get_socket_fd(wsi); + qdpn_connector_t *c = fd_connector(h, fd); + if (c) { + qdpn_connector_mark_closed(c); + return set_fd(h, fd, NULL); } - b->size = size; + return 0; } -/* Push as much as possible into the transport, store overflow in over. */ -static void transport_push_max(pn_transport_t *t, pn_bytes_t buf, buffer_t *over) { +/* Push read data into the transport. + * Return 0 on success, number of bytes un-pushed on failure. + */ +static int transport_push(pn_transport_t *t, pn_bytes_t buf) { ssize_t cap; while (buf.size > 0 && (cap = pn_transport_capacity(t)) > 0) { if (buf.size > cap) { @@ -86,57 +94,109 @@ static void transport_push_max(pn_transport_t *t, pn_bytes_t buf, buffer_t *over buf.size = 0; } } - if (buf.size > 0) { - if (buf.size > over->cap) { - resize(over, buf.size); - } - memmove(over->start, buf.start, buf.size); - } - over->size = buf.size; + return buf.size; } -static qd_http_t *qd_http_from_wsi(struct lws *wsi) { - return (qd_http_t *)lws_context_user(lws_get_context(wsi)); +static int normal_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) { + lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg)); + return -1; } -static int do_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) { - if (c) qdpn_connector_mark_closed(c); - lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg)); +static int unexpected_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) { + lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg)); return -1; } +/* + * Callback for un-promoted HTTP connections, and low-level external poll operations. + * Note main HTTP file serving is handled by the "mount" struct below. + * Called with http lock held. + */ +static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, + void *user, void *in, size_t len) +{ + switch (reason) { + + case LWS_CALLBACK_HTTP: { /* Called if file mount can't find the file */ + lws_return_http_status(wsi, HTTP_STATUS_NOT_FOUND, "file not found"); + return -1; + } + + case LWS_CALLBACK_CLOSED_HTTP: + mark_closed(wsi); + break; + + /* low-level 'protocol[0]' callbacks for all protocols */ + case LWS_CALLBACK_DEL_POLL_FD: { + if (mark_closed(wsi)) { + lws_return_http_status(wsi, HTTP_STATUS_INTERNAL_SERVER_ERROR, "out of memory"); + return -1; + } + break; + } + + case LWS_CALLBACK_CHANGE_MODE_POLL_FD: { + struct lws_pollargs *p = (struct lws_pollargs*)in; + qdpn_connector_t *c = wsi_connector(wsi); + if (c) { + if (p->events & POLLIN) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE); + if (p->events & POLLOUT) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE); + } + break; + } + + default: + break; + } + + return 0; +} + +/* Buffer to allocate extra header space required by LWS. */ +typedef struct buffer_t { void *start; size_t size; size_t cap; } buffer_t; + +/* Callbacks for promoted AMQP over WS connections. + * Called with http lock held. + */ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - buffers_t *b = (buffers_t*)user; - qd_http_t *h = qd_http_from_wsi(wsi); - qdpn_connector_t *c = per_thread.connector; + qd_http_t *h = wsi_http(wsi); + qdpn_connector_t *c = wsi_connector(wsi); pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL; + const char *name = c ? qdpn_connector_name(c) : ""; switch (reason) { case LWS_CALLBACK_ESTABLISHED: { - memset(b, 0, sizeof(*b)); - strncpy(b->name, qdpn_connector_name(c), sizeof(b->name)); - qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", b->name); + memset(user, 0, sizeof(buffer_t)); + qd_log(h->log, QD_LOG_TRACE, "HTTP from %s upgraded to AMQP/WebSocket", name); break; } case LWS_CALLBACK_SERVER_WRITEABLE: { ssize_t size; if (!t || (size = pn_transport_pending(t)) < 0) { - return do_close(wsi, c, "write-closed"); + return normal_close(wsi, c, "write-closed"); } if (size > 0) { - pn_bytes_t wbuf = { size, pn_transport_head(t) }; + const void *start = pn_transport_head(t); /* lws_write() demands LWS_PRE bytes of free space before the data */ - resize(&b->wtmp, wbuf.size + LWS_PRE); - unsigned char *start = (unsigned char*)b->wtmp.start + LWS_PRE; - memcpy(start, wbuf.start, wbuf.size); - ssize_t wrote = lws_write(wsi, start, wbuf.size, LWS_WRITE_BINARY); + size_t tmpsize = size + LWS_PRE; + buffer_t *wtmp = (buffer_t*)user; + if (wtmp->start == NULL || wtmp->cap < tmpsize) { + wtmp->start = realloc(wtmp->start, tmpsize); + wtmp->size = wtmp->cap = tmpsize; + } + if (wtmp->start == NULL) { + return unexpected_close(wsi, c, "out-of-memory"); + } + void *tmpstart = wtmp->start + LWS_PRE; + memcpy(tmpstart, start, size); + ssize_t wrote = lws_write(wsi, tmpstart, size, LWS_WRITE_BINARY); if (wrote < 0) { pn_transport_close_head(t); - return do_close(wsi, c, "write-error"); + return normal_close(wsi, c, "write-error"); } else { pn_transport_pop(t, (size_t)wrote); } @@ -146,26 +206,23 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, case LWS_CALLBACK_RECEIVE: { if (!t || pn_transport_capacity(t) < 0) { - do_close(wsi, c, "read-closed"); + return normal_close(wsi, c, "read-closed"); } - assert(b->over.size == 0); - transport_push_max(t, pn_bytes(len, in), &b->over); - if (b->over.size > 0) { - qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", b->name); + if (transport_push(t, pn_bytes(len, in))) { + return unexpected_close(wsi, c, "read-overflow"); } break; } - case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: { - qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", b->name); - if (t) pn_transport_close_tail(t); - return do_close(wsi, c, "peer-close"); - } + case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: + mark_closed(wsi); + if (t) { + pn_transport_close_tail(t); + } - case LWS_CALLBACK_CLOSED: { - qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", b->name); + case LWS_CALLBACK_CLOSED: + mark_closed(wsi); break; - } default: break; @@ -173,78 +230,93 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, return 0; } -static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, - void *in, size_t len) -{ - return 0; -} +/* Mount the console directory into URL space at / */ +static const struct lws_http_mount console_mount = { + NULL, /* linked-list pointer to next*/ + "/", /* mountpoint in URL namespace on this vhost */ + QPID_CONSOLE_STAND_ALONE_INSTALL_DIR, /* where to go on the filesystem for that */ + "index.html", /* default filename if none given */ + NULL, + NULL, + NULL, + NULL, + 0, + 0, + 0, + 0, + 0, + 0, + LWSMPRO_FILE, /* mount type is a directory in a filesystem */ + 1, /* strlen("/"), ie length of the mountpoint */ +}; static void check_timer(void *void_http) { - per_thread.connector = NULL; /* Not servicing any connector */ qd_http_t *h = (qd_http_t*)void_http; sys_mutex_lock(h->lock); - /* Run LWS global timer checks. */ + /* Run LWS global timer and forced-service checks. */ lws_service_fd(h->context, NULL); + while (!lws_service_adjust_timeout(h->context, 1, 0)) { + /* -1 timeout means just do forced service */ + lws_plat_service_tsi(h->context, -1, 0); + } if (!h->timer) { h->timer = qd_timer(h->dispatch, check_timer, h); } qd_timer_cancel(h->timer); - qd_timer_schedule(h->timer, 1000); + qd_timer_schedule(h->timer, 1000); /* LWS wants per-second wakeups */ sys_mutex_unlock(h->lock); } void qd_http_connector_process(qdpn_connector_t *c) { - per_thread.connector = c; /* Pass to lws via thread-local storage */ - + qd_http_t * h = qdpn_listener_http(qdpn_connector_listener(c)); + sys_mutex_lock(h->lock); + int fd = qdpn_connector_get_fd(c); struct lws *wsi = (struct lws*)qdpn_connector_http(c); - buffers_t *b = (buffers_t*)lws_wsi_user(wsi); - qd_http_t * h = qd_http_from_wsi(wsi); - pn_transport_t *t = qdpn_connector_transport(c); - - int flags = - (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) | - (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0); - - if (b && b->over.size) { /* Consume last over-buffered read */ - transport_push_max(t, pn_bytes(b->over.size, b->over.start), &b->over); - if (b->over.size) { - flags &= ~POLLIN; /* Don't let LIBWS read if we still are over */ + /* Make sure we are still tracking this fd, could have been closed by timer */ + if (wsi) { + pn_transport_t *t = qdpn_connector_transport(c); + int flags = + (qdpn_connector_activated(c, QDPN_CONNECTOR_READABLE) ? POLLIN : 0) | + (qdpn_connector_activated(c, QDPN_CONNECTOR_WRITABLE) ? POLLOUT : 0); + struct lws_pollfd pfd = { fd, flags, flags }; + if (pn_transport_pending(t) > 0) { + lws_callback_on_writable(wsi); + } + lws_service_fd(h->context, &pfd); + if (pn_transport_closed(t)) { + mark_closed(wsi); /* Don't let the server close the FD. */ + } else { + if (pn_transport_capacity(t) > 0) + qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE); + if (pn_transport_pending(t) > 0 || lws_partial_buffered(wsi)) + qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE); + qdpn_connector_wakeup(c, pn_transport_tick(t, qdpn_now(NULL))); } } - sys_mutex_lock(h->lock); - struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags }; - lws_service_fd(h->context, &pfd); sys_mutex_unlock(h->lock); - - if (pn_transport_capacity(t) > 0) - qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE); - if (pn_transport_pending(t) > 0) - qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE); - pn_timestamp_t now = qdpn_now(NULL); - pn_timestamp_t next = pn_transport_tick(t, now); - /* If we have overflow, re-process immediately after dispatch, otherwise at - * next proton tick. */ - qdpn_connector_wakeup(c, (b && b->over.size) ? now : next); - check_timer(h); + check_timer(h); /* Make sure the timer is running */ } qd_http_connector_t *qd_http_connector(qd_http_t *h, qdpn_connector_t *c) { + if (set_fd(h, qdpn_connector_get_fd(c), c)) { + return NULL; + } struct lws* wsi = lws_adopt_socket(h->context, qdpn_connector_get_fd(c)); return (qd_http_connector_t*)wsi; } static struct lws_protocols protocols[] = { - /* first protocol must always be HTTP handler */ + /* HTTP only protocol comes first */ { - "http-only", /* name */ - callback_http, /* callback */ - sizeof(buffers_t), /* user data size */ + "http-only", + callback_http, + 0, }, /* "amqp" is the official oasis AMQP over WebSocket protocol name */ { "amqp", callback_amqpws, - sizeof(buffers_t), + sizeof(buffer_t), }, /* "binary" is an alias for "amqp", for compatibility with clients designed * to work with a WebSocket proxy @@ -252,12 +324,13 @@ static struct lws_protocols protocols[] = { { "binary", callback_amqpws, - sizeof(buffers_t), + sizeof(buffer_t), }, }; qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) { qd_http_t *h = calloc(1, sizeof(qd_http_t)); + if (!h) return NULL; h->lock = sys_mutex(); h->dispatch = d; h->log = log; @@ -268,8 +341,10 @@ qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) { info.protocols = protocols; info.gid = info.uid = -1; info.user = h; + info.mounts = &console_mount; /* Serve the console files */ + info.server_string = QD_CONNECTION_PROPERTY_PRODUCT_VALUE; h->context = lws_create_context(&info); - h->timer = NULL; /* Initialized later. */ + h->timer = NULL; /* Can't init timer here, server not initialized. */ return h; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/posix/driver.c ---------------------------------------------------------------------- diff --git a/src/posix/driver.c b/src/posix/driver.c index 53ea809..741f500 100644 --- a/src/posix/driver.c +++ b/src/posix/driver.c @@ -134,6 +134,7 @@ struct qdpn_connector_t { bool pending_read:1; bool pending_write:1; bool socket_error:1; + bool hangup:1; bool closed:1; }; @@ -528,6 +529,7 @@ qdpn_connector_t *qdpn_connector_fd(qdpn_driver_t *driver, int fd, void *context c->pending_read = false; c->pending_write = false; c->socket_error = false; + c->hangup = false; c->name[0] = '\0'; c->idx = 0; c->fd = fd; @@ -622,7 +624,9 @@ qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t *ctor) return ctor ? ctor->listener : NULL; } -/* FD is already closed, update the connector state */ +/* Mark the connector as closed, but don't close the FD (already closed or + * will be closed elsewhere) + */ void qdpn_connector_mark_closed(qdpn_connector_t *ctor) { if (!ctor) return; @@ -632,6 +636,7 @@ void qdpn_connector_mark_closed(qdpn_connector_t *ctor) qd_log(ctor->driver->log, QD_LOG_TRACE, "closed %s", ctor->name); ctor->closed = true; ctor->driver->closed_count++; + ctor->http = NULL; } sys_mutex_unlock(ctor->driver->lock); } @@ -880,7 +885,7 @@ static void qdpn_driver_rebuild(qdpn_driver_t *d) qdpn_connector_t *c = DEQ_HEAD(d->connectors); while (c) { - if (!c->closed && !c->socket_error) { + if (!c->closed && !c->socket_error && !c->hangup) { d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup); d->fds[d->nfds].fd = c->fd; d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0); @@ -946,9 +951,11 @@ int qdpn_driver_wait_3(qdpn_driver_t *d) if (revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP)) { qd_log(c->driver->log, QD_LOG_ERROR, "unexpected poll events %04x on %s", revents, c->name); + c->socket_error = true; } if (revents & POLLHUP) { qd_log(c->driver->log, QD_LOG_TRACE, "hangup on %s", c->name); + c->hangup = true; /* poll() is signalling POLLHUP. To see what happened we need * to do an actual recv() to get the error code. But we might * be in a state where we're not interested in input, in that http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/4fb10205/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index 40b5699..1e813aa 100644 --- a/src/server.c +++ b/src/server.c @@ -573,7 +573,8 @@ static const char *log_incoming(char *buf, size_t size, qdpn_connector_t *cxtr) const char *cname = qdpn_connector_name(cxtr); const char *host = qd_listener->config->host; const char *port = qd_listener->config->port; - snprintf(buf, size, "incoming connection from %s to %s:%s", cname, host, port); + snprintf(buf, size, "incoming %s connection from %s to %s:%s", + qdpn_connector_http(cxtr) ? "HTTP" : "AMQP", cname, host, port); return buf; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org