Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0A845200C7F for ; Wed, 24 May 2017 16:59:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 095F8160BB4; Wed, 24 May 2017 14:59:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25878160B9C for ; Wed, 24 May 2017 16:59:40 +0200 (CEST) Received: (qmail 94979 invoked by uid 500); 24 May 2017 14:59:40 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 94970 invoked by uid 99); 24 May 2017 14:59:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 May 2017 14:59:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3F92BDFAEB; Wed, 24 May 2017 14:59:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cliffjansen@apache.org To: commits@qpid.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: qpid-proton git commit: PROTON-1483: C epoll proactor, change timerfd accounting Date: Wed, 24 May 2017 14:59:40 +0000 (UTC) archived-at: Wed, 24 May 2017 14:59:42 -0000 Repository: qpid-proton Updated Branches: refs/heads/master d6524051f -> a4e5c84d8 PROTON-1483: C epoll proactor, change timerfd accounting Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a4e5c84d Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a4e5c84d Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a4e5c84d Branch: refs/heads/master Commit: a4e5c84d88eb32a1951e058836e5f509749a955b Parents: d652405 Author: Clifford Jansen Authored: Wed May 24 07:52:42 2017 -0700 Committer: Clifford Jansen Committed: Wed May 24 07:54:31 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 137 +++++++++++++++++++++++++------------ 1 file changed, 93 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a4e5c84d/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index a1ec1af..2f99cd9 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -117,24 +117,39 @@ typedef struct epoll_extended_t { /* * This timerfd logic assumes EPOLLONESHOT and there never being two - * active timeout callbacks. There can be multiple unclaimed expiries - * processed in a single callback. + * active timeout callbacks. There can be multiple (or zero) + * unclaimed expiries processed in a single callback. + * + * timerfd_set() documentation implies a crisp relationship between + * timer expiry count and oldt's return value, but a return value of + * zero is ambiguous. It can lead to no EPOLLIN, EPOLLIN + expected + * read, or + * + * event expiry (in kernel) -> EPOLLIN + * cancel/settime(0) (thread A) (number of expiries resets to zero) + * read(timerfd) -> -1, EAGAIN (thread B servicing epoll event) + * + * The original implementation with counters to track expiry counts + * was abandoned in favor of "in doubt" transitions and resolution + * at shutdown. */ typedef struct ptimer_t { pmutex mutex; int timerfd; epoll_extended_t epoll_io; - int pending_count; - int skip_count; + bool timer_active; + bool in_doubt; // 0 or 1 callbacks are possible + bool shutting_down; } ptimer_t; static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); if (pt->timerfd < 0) return false; pmutex_init(&pt->mutex); - pt->pending_count = 0; - pt->skip_count = 0; + pt->timer_active = false; + pt->in_doubt = false; + pt->shutting_down = false; epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER; pt->epoll_io.psocket = ps; pt->epoll_io.fd = pt->timerfd; @@ -144,49 +159,74 @@ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { return true; } -static void ptimer_set(ptimer_t *pt, uint64_t t_millis) { - // t_millis == 0 -> cancel - lock(&pt->mutex); - if (t_millis == 0 && pt->pending_count == 0) { - unlock(&pt->mutex); - return; // nothing to cancel - } +// Call with ptimer lock held +static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) { struct itimerspec newt, oldt; memset(&newt, 0, sizeof(newt)); newt.it_value.tv_sec = t_millis / 1000; newt.it_value.tv_nsec = (t_millis % 1000) * 1000000; timerfd_settime(pt->timerfd, 0, &newt, &oldt); - if (oldt.it_value.tv_sec || oldt.it_value.tv_nsec) { - // old value cancelled - assert (pt->pending_count > 0); - pt->pending_count--; - } else if (pt->pending_count) { - // cancel instance waiting on this lock - pt->skip_count++; - } - if (t_millis) - pt->pending_count++; - assert(pt->pending_count >= 0); + if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec == 0) { + // EPOLLIN is possible but not assured + pt->in_doubt = true; + } + pt->timer_active = t_millis; +} + +static void ptimer_set(ptimer_t *pt, uint64_t t_millis) { + // t_millis == 0 -> cancel + lock(&pt->mutex); + if ((t_millis == 0 && !pt->timer_active) || pt->shutting_down) { + unlock(&pt->mutex); + return; // nothing to do + } + ptimer_set_lh(pt, t_millis); unlock(&pt->mutex); } -// Callback bookkeeping. Return number of uncancelled expiry events. -static int ptimer_callback(ptimer_t *pt) { +// Callback bookkeeping. Return true if there is an expired timer. +static bool ptimer_callback(ptimer_t *pt) { lock(&pt->mutex); - uint64_t u_exp_count; + struct itimerspec current; + if (timerfd_gettime(pt->timerfd, ¤t) == 0) { + if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0) + pt->timer_active = false; + } + uint64_t u_exp_count = 0; ssize_t l = read(pt->timerfd, &u_exp_count, sizeof(uint64_t)); - (void)l; /* Silence compiler complaints in release build */ - assert(l == sizeof(uint64_t)); - assert(u_exp_count < INT_MAX); // or test and log it? - int exp_count = (int) u_exp_count; - assert(exp_count >= pt->skip_count); - assert(exp_count <= pt->pending_count); - exp_count -= pt->skip_count; - pt->skip_count = 0; - pt->pending_count -= exp_count; + if (l != sizeof(uint64_t)) { + if (l == -1) { + if (errno != EAGAIN) { + EPOLL_FATAL("timer read", errno); + } + } + else + EPOLL_FATAL("timer internal error", 0); + } + if (!pt->timer_active) { + // Expiry counter just cleared, timer not set, timerfd not armed + pt->in_doubt = false; + } unlock(&pt->mutex); - return (int) exp_count; + return (l == sizeof(uint64_t)) && u_exp_count > 0; +} + +// Return true if timerfd has and will have no pollable expiries in the current armed state +static bool ptimer_shutdown(ptimer_t *pt, bool currently_armed) { + lock(&pt->mutex); + if (currently_armed) { + ptimer_set_lh(pt, 0); + pt->shutting_down = true; + if (pt->in_doubt) + // Force at least one callback. If two, second cannot proceed with unarmed timerfd. + ptimer_set_lh(pt, 1); + } + else + pt->shutting_down = true; + bool rv = !pt->in_doubt; + unlock(&pt->mutex); + return rv; } static void ptimer_finalize(ptimer_t *pt) { @@ -626,7 +666,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled. // Return true when all possible outstanding epoll events associated with this pconnection have been processed. static inline bool pconnection_is_final(pconnection_t *pc) { - return !pc->current_arm && !pc->timer.pending_count && !pc->context.wake_ops; + return !pc->current_arm && !pc->timer_armed && !pc->context.wake_ops; } static void pconnection_final_free(pconnection_t *pc) { @@ -661,7 +701,13 @@ static void pconnection_begin_close(pconnection_t *pc) { stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd); pc->current_arm = 0; pn_connection_driver_close(&pc->driver); - ptimer_set(&pc->timer, 0); + if (ptimer_shutdown(&pc->timer, pc->timer_armed)) + pc->timer_armed = false; // disarmed in the sense that the timer will never fire again + else if (!pc->timer_armed) { + // In doubt. One last callback to collect + rearm(pc->psocket.proactor, &pc->timer.epoll_io); + pc->timer_armed = true; + } } } @@ -669,7 +715,7 @@ static void pconnection_forced_shutdown(pconnection_t *pc) { // Called by proactor_free, no competing threads, no epoll activity. pconnection_begin_close(pc); // pconnection_process will never be called again. Zero everything. - pc->timer.pending_count = 0; + pc->timer_armed = false; pc->context.wake_ops = 0; pn_connection_t *c = pc->driver.connection; pn_collector_release(pn_connection_collector(c)); @@ -804,7 +850,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, if (timeout) { timer_unarmed = true; - timer_fired = (ptimer_callback(&pc->timer) != 0); + timer_fired = ptimer_callback(&pc->timer) != 0; } lock(&pc->context.mutex); @@ -889,11 +935,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pc->timer_armed = true; // about to rearm outside the lock timer_unarmed = true; // so we remember } + bool timer_shutting_down = pc->timer.shutting_down; unlock(&pc->context.mutex); pc->hog_count++; // working context doing work - if (timer_unarmed) { + if (timer_unarmed && !timer_shutting_down) { rearm(pc->psocket.proactor, &pc->timer.epoll_io); timer_unarmed = false; } @@ -1552,7 +1599,8 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout) { return &p->batch; } } - bool rearm_timer = !p->timer_armed; + bool rearm_timer = !p->timer_armed && !p->timer.shutting_down; + p->timer_armed = true; unlock(&p->context.mutex); if (rearm_timer) rearm(p, &p->timer.epoll_io); @@ -1701,7 +1749,8 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (bp == p) { bool notify = false; lock(&p->context.mutex); - bool rearm_timer = !p->timer_armed; + bool rearm_timer = !p->timer_armed && !p->shutting_down; + p->timer_armed = true; p->context.working = false; proactor_update_batch(p); if (proactor_has_event(p)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org