Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 821E7200C6F for ; Tue, 9 May 2017 22:12:16 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 80B35160BB6; Tue, 9 May 2017 20:12:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C8800160B9A for ; Tue, 9 May 2017 22:12:15 +0200 (CEST) Received: (qmail 9051 invoked by uid 500); 9 May 2017 20:12:15 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 9040 invoked by uid 99); 9 May 2017 20:12:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 May 2017 20:12:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CCBD9DFD70; Tue, 9 May 2017 20:12:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cliffjansen@apache.org To: commits@qpid.apache.org Message-Id: <33cb6e353cf341c492f9a3e4955e73cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: qpid-proton git commit: PROTON-1460: epoll - thread safe use of driver on connection disconnect Date: Tue, 9 May 2017 20:12:14 +0000 (UTC) archived-at: Tue, 09 May 2017 20:12:16 -0000 Repository: qpid-proton Updated Branches: refs/heads/master 9c69b7d7e -> 19f345acc PROTON-1460: epoll - thread safe use of driver on connection disconnect Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/19f345ac Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/19f345ac Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/19f345ac Branch: refs/heads/master Commit: 19f345acce3748020c7b2f13b2f0937ed35170ec Parents: 9c69b7d Author: Clifford Jansen Authored: Tue May 9 13:06:41 2017 -0700 Committer: Clifford Jansen Committed: Tue May 9 13:07:56 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 45 ++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/19f345ac/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index cea9a68..fae68e9 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -425,6 +425,8 @@ typedef struct pconnection_t { bool server; /* accept, not connect */ bool tick_pending; bool timer_armed; + bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ + pn_condition_t *disconnect_condition; ptimer_t timer; // TODO: review one timerfd per connectoin // Following values only changed by (sole) working context: uint32_t current_arm; // active epoll io events @@ -580,6 +582,8 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo pc->wake_count = 0; pc->tick_pending = false; pc->timer_armed = false; + pc->queued_disconnect = false; + pc->disconnect_condition = NULL; pc->current_arm = 0; pc->connected = false; @@ -609,6 +613,7 @@ static void pconnection_final_free(pconnection_t *pc) { if (pc->addrinfo) { freeaddrinfo(pc->addrinfo); } + pn_condition_free(pc->disconnect_condition); pn_incref(pc); /* Make sure we don't do a circular free */ pn_connection_driver_destroy(&pc->driver); pn_decref(pc); @@ -707,7 +712,7 @@ static inline void pconnection_rearm(pconnection_t *pc) { } static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->new_events || pc->wake_count || pc->tick_pending) + if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) return true; @@ -823,6 +828,16 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, retry: + if (pc->queued_disconnect) { // From pn_proactor_disconnect() + pc->queued_disconnect = false; + if (!pc->context.closing) { + if (pc->disconnect_condition) { + pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition); + } + pn_connection_driver_close(&pc->driver); + } + } + if (pconnection_has_event(pc)) { unlock(&pc->context.mutex); return &pc->batch; @@ -1731,16 +1746,29 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { bool notify = false; for (ctx = disconnecting_pcontexts; ctx; ctx = ctx ? ctx->next : NULL) { bool do_free = false; + bool ctx_notify = true; pmutex *ctx_mutex = NULL; pconnection_t *pc = pcontext_pconnection(ctx); if (pc) { ctx_mutex = &pc->context.mutex; lock(ctx_mutex); if (!ctx->closing) { - if (cond) { - pn_condition_copy(pn_transport_condition(pc->driver.transport), cond); + if (ctx->working) { + // Must defer + pc->queued_disconnect = true; + if (cond) { + if (!pc->disconnect_condition) + pc->disconnect_condition = pn_condition(); + pn_condition_copy(pc->disconnect_condition, cond); + } + } + else { + // No conflicting working context. + if (cond) { + pn_condition_copy(pn_transport_condition(pc->driver.transport), cond); + } + pn_connection_driver_close(&pc->driver); } - pn_connection_driver_close(&pc->driver); } } else { pn_listener_t *l = pcontext_listener(ctx); @@ -1758,16 +1786,15 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { lock(&p->context.mutex); if (--ctx->disconnect_ops == 0) { do_free = true; - ctx = NULL; + ctx_notify = false; if (--p->disconnects_pending == 0 && !p->contexts) { p->inactive = true; notify = wake(&p->context); } } else { // If initiating the close, wake the pcontext to do the free. - if (ctx) - if (!wake(ctx)) - ctx = NULL; // Wake already pending. + if (ctx_notify) + ctx_notify = wake(ctx); } unlock(&p->context.mutex); unlock(ctx_mutex); @@ -1776,7 +1803,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { if (pc) pconnection_final_free(pc); else listener_final_free(pcontext_listener(ctx)); } else { - if (ctx) + if (ctx_notify) wake_notify(ctx); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org