Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4E446200BD4 for ; Thu, 1 Dec 2016 20:15:17 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 44814160B10; Thu, 1 Dec 2016 19:15:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 72328160B05 for ; Thu, 1 Dec 2016 20:15:16 +0100 (CET) Received: (qmail 95282 invoked by uid 500); 1 Dec 2016 19:15:15 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 95219 invoked by uid 99); 1 Dec 2016 19:15:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Dec 2016 19:15:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 62367DFC47; Thu, 1 Dec 2016 19:15:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Thu, 01 Dec 2016 19:15:16 -0000 Message-Id: <398a93de580b48ee83636e1be38072bf@git.apache.org> In-Reply-To: <4bc29c5272b64147a981b9ad3289f8e6@git.apache.org> References: <4bc29c5272b64147a981b9ad3289f8e6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] qpid-dispatch git commit: DISPATCH-103: Fix Websocket Listeners archived-at: Thu, 01 Dec 2016 19:15:17 -0000 DISPATCH-103: Fix Websocket Listeners Fix shutdown problems, websocket liseteners are now functioning properly. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/69f52f28 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/69f52f28 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/69f52f28 Branch: refs/heads/master Commit: 69f52f2836fd0324a4f49585d32e38dab72f9afc Parents: 978e809 Author: Alan Conway Authored: Thu Dec 1 13:34:44 2016 -0500 Committer: Alan Conway Committed: Thu Dec 1 14:14:40 2016 -0500 ---------------------------------------------------------------------- src/http-libwebsockets.c | 82 ++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/69f52f28/src/http-libwebsockets.c ---------------------------------------------------------------------- diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c index c7c0043..9d2b3e5 100644 --- a/src/http-libwebsockets.c +++ b/src/http-libwebsockets.c @@ -59,8 +59,9 @@ typedef struct buffer_t { void *start; size_t size; size_t cap; } buffer_t; /* Extra buffering per connection, stored in the lws_wsi_user() space. */ typedef struct buffers_t { - buffer_t wtmp; /* Temp buffer with pre-data header space required by LWS */ - buffer_t over; /* Can't control LWS read size, buffer the overflow */ + buffer_t wtmp; /* Temp buffer with pre-data header space required by LWS */ + buffer_t over; /* Can't control LWS read size, buffer the overflow */ + char name[256]; /* Copy of connector name for use after connector detached */ } buffers_t; static void resize(buffer_t *b, size_t size) { @@ -98,27 +99,33 @@ static qd_http_t *qd_http_from_wsi(struct lws *wsi) { return (qd_http_t *)lws_context_user(lws_get_context(wsi)); } +static int do_close(struct lws *wsi, qdpn_connector_t *c, const char *msg) { + if (c) qdpn_connector_mark_closed(c); + lws_close_reason(wsi, LWS_CLOSE_STATUS_NORMAL, (unsigned char*)msg, strlen(msg)); + return -1; +} + static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { buffers_t *b = (buffers_t*)user; qd_http_t *h = qd_http_from_wsi(wsi); qdpn_connector_t *c = per_thread.connector; - pn_transport_t *t = qdpn_connector_transport(c); - const char *name = qdpn_connector_name(c); + pn_transport_t *t = c ? qdpn_connector_transport(c) : NULL; switch (reason) { case LWS_CALLBACK_ESTABLISHED: { - qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", name); memset(b, 0, sizeof(*b)); + strncpy(b->name, qdpn_connector_name(c), sizeof(b->name)); + qd_log(h->log, QD_LOG_DEBUG, "HTTP from %s upgraded to AMQP/WebSocket", b->name); break; } case LWS_CALLBACK_SERVER_WRITEABLE: { - ssize_t size = pn_transport_pending(t); - if (size < 0) { - return -1; + ssize_t size; + if (!t || (size = pn_transport_pending(t)) < 0) { + return do_close(wsi, c, "write-closed"); } if (size > 0) { pn_bytes_t wbuf = { size, pn_transport_head(t) }; @@ -129,7 +136,7 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, ssize_t wrote = lws_write(wsi, start, wbuf.size, LWS_WRITE_BINARY); if (wrote < 0) { pn_transport_close_head(t); - return -1; + return do_close(wsi, c, "write-error"); } else { pn_transport_pop(t, (size_t)wrote); } @@ -138,26 +145,25 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, } case LWS_CALLBACK_RECEIVE: { - if (pn_transport_capacity(t) < 0) { - return -1; + if (!t || pn_transport_capacity(t) < 0) { + do_close(wsi, c, "read-closed"); } assert(b->over.size == 0); transport_push_max(t, pn_bytes(len, in), &b->over); if (b->over.size > 0) { - qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", name); + qd_log(h->log, QD_LOG_TRACE, "amqp/ws read buffered %z bytes on %s", b->name); } break; } case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: { - qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", name); - pn_transport_close_tail(t); - break; + qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket peer close from %s", b->name); + if (t) pn_transport_close_tail(t); + return do_close(wsi, c, "peer-close"); } case LWS_CALLBACK_CLOSED: { - qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", name); - qdpn_connector_mark_closed(c); + qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", b->name); break; } @@ -170,38 +176,21 @@ static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, static int callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - qdpn_connector_t *c = per_thread.connector; - buffers_t *b = (buffers_t*)user; - - switch (reason) { - case LWS_CALLBACK_ESTABLISHED: { - memset(b, 0, sizeof(*b)); - break; - } - case LWS_CALLBACK_CLOSED: { - qdpn_connector_mark_closed(c); - } - default: - break; - } return 0; } -static void set_timer_lh(qd_http_t *h); - -static void fire_timer(void *void_http) { +static void check_timer(void *void_http) { + per_thread.connector = NULL; /* Not servicing any connector */ qd_http_t *h = (qd_http_t*)void_http; sys_mutex_lock(h->lock); + /* Run LWS global timer checks. */ lws_service_fd(h->context, NULL); - set_timer_lh(h); - sys_mutex_unlock(h->lock); -} - -static void set_timer_lh(qd_http_t *h) { if (!h->timer) { - h->timer = qd_timer(h->dispatch, fire_timer, h); + h->timer = qd_timer(h->dispatch, check_timer, h); } + qd_timer_cancel(h->timer); qd_timer_schedule(h->timer, 1000); + sys_mutex_unlock(h->lock); } void qd_http_connector_process(qdpn_connector_t *c) { @@ -218,28 +207,25 @@ void qd_http_connector_process(qdpn_connector_t *c) { if (b && b->over.size) { /* Consume last over-buffered read */ transport_push_max(t, pn_bytes(b->over.size, b->over.start), &b->over); - if (b->over.size) { /* Don't let LIBWS read if we still are over */ - flags &= ~POLLIN; + if (b->over.size) { + flags &= ~POLLIN; /* Don't let LIBWS read if we still are over */ } } - sys_mutex_lock(h->lock); struct lws_pollfd pfd = { qdpn_connector_get_fd(c), flags, flags }; lws_service_fd(h->context, &pfd); - set_timer_lh(h); sys_mutex_unlock(h->lock); if (pn_transport_capacity(t) > 0) qdpn_connector_activate(c, QDPN_CONNECTOR_READABLE); if (pn_transport_pending(t) > 0) qdpn_connector_activate(c, QDPN_CONNECTOR_WRITABLE); - pn_timestamp_t now = qdpn_now(NULL); pn_timestamp_t next = pn_transport_tick(t, now); /* If we have overflow, re-process immediately after dispatch, otherwise at - * next proton tick. - */ + * next proton tick. */ qdpn_connector_wakeup(c, (b && b->over.size) ? now : next); + check_timer(h); } qd_http_connector_t *qd_http_connector(qd_http_t *h, qdpn_connector_t *c) { @@ -268,7 +254,6 @@ static struct lws_protocols protocols[] = { callback_amqpws, sizeof(buffers_t), }, - }; qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) { @@ -277,6 +262,7 @@ qd_http_t *qd_http(qd_dispatch_t *d, qd_log_source_t *log) { h->dispatch = d; h->log = log; lws_set_log_level(0, NULL); + struct lws_context_creation_info info = {0}; info.port = CONTEXT_PORT_NO_LISTEN; info.protocols = protocols; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org