Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5555C200CE6 for ; Fri, 15 Sep 2017 08:29:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 537AE1609D1; Fri, 15 Sep 2017 06:29:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 75A911609CF for ; Fri, 15 Sep 2017 08:29:26 +0200 (CEST) Received: (qmail 83383 invoked by uid 500); 15 Sep 2017 06:29:23 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 83374 invoked by uid 99); 15 Sep 2017 06:29:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Sep 2017 06:29:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0D403F5647; Fri, 15 Sep 2017 06:29:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cliffjansen@apache.org To: commits@qpid.apache.org Message-Id: <47797e0c18d14528adf6e7b60ccf2b38@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: qpid-proton git commit: PROTON-1349: completed and improved implementation, but still fails many tests Date: Fri, 15 Sep 2017 06:29:23 +0000 (UTC) archived-at: Fri, 15 Sep 2017 06:29:28 -0000 Repository: qpid-proton Updated Branches: refs/heads/master 83fa38808 -> 6a57a8c98 PROTON-1349: completed and improved implementation, but still fails many tests Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6a57a8c9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6a57a8c9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6a57a8c9 Branch: refs/heads/master Commit: 6a57a8c986fbf86ad8ad109d673a89a5ae84c544 Parents: 83fa388 Author: Clifford Jansen Authored: Thu Sep 14 23:29:14 2017 -0700 Committer: Clifford Jansen Committed: Thu Sep 14 23:29:14 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/win_iocp.c | 659 +++++++++++++++++++++------------- 1 file changed, 410 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6a57a8c9/proton-c/src/proactor/win_iocp.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c index 1a79023..0ebaf90 100644 --- a/proton-c/src/proactor/win_iocp.c +++ b/proton-c/src/proactor/win_iocp.c @@ -134,8 +134,8 @@ struct iocpdesc_t { }; -// Max number of overlapped accepts per listener -#define IOCP_MAX_ACCEPTS 10 +// Max number of overlapped accepts per listener. TODO: configurable. +#define IOCP_MAX_ACCEPTS 4 // AcceptEx squishes the local and remote addresses and optional data // all together when accepting the connection. Reserve enough for @@ -715,11 +715,12 @@ static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd) void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result) { // flag to divide this routine's logic into locked/unlocked mp portions - bool mp = result && result->new_sock && result->new_sock->is_mp; + bool mp = acceptor->listen_sock->is_mp; + bool created = false; if (acceptor->listen_sock->closing) { if (result) { - if (mp && result->new_sock->socket != INVALID_SOCKET) + if (mp && result->new_sock && result->new_sock->socket != INVALID_SOCKET) closesocket(result->new_sock->socket); free(result); acceptor->accept_queue_size--; @@ -733,17 +734,18 @@ void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result) if (!mp) reset_accept_result(result); } else { - if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && - pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) { + if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && (mp || + pn_list_size(acceptor->accepts) == acceptor->accept_queue_size )) { result = accept_result(acceptor->listen_sock); acceptor->accept_queue_size++; + created = true; } else { // an async accept is still pending or max concurrent accepts already hit return; } } - if (!mp) + if (created || !mp) result->new_sock = create_same_type_socket(acceptor->listen_sock); if (result->new_sock) { // Not yet connected. @@ -782,9 +784,11 @@ static void complete_accept(accept_result_t *result, HRESULT status) if (ld->read_closed) { if (!result->new_sock->closing) pni_iocp_begin_close(result->new_sock); + pn_decref(result->new_sock); free(result); // discard reap_check(ld); } else { + assert(!ld->is_mp); // Non mp only result->base.status = status; pn_list_add(ld->acceptor->accepts, result); pni_events_update(ld, ld->events | PN_READABLE); @@ -953,7 +957,7 @@ ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, boo assert(result); result->base.iocpd = iocpd; ssize_t actual_len = len; - if (actual_len > result->buffer.size) actual_len = result->buffer.size; + if (len > result->buffer.size) actual_len = result->buffer.size; result->requested = actual_len; memmove((void *)result->buffer.start, outgoing, actual_len); outgoing += actual_len; @@ -1579,8 +1583,8 @@ std::string errno_str(const std::string& msg, bool is_wsa) { using namespace pn_experimental; -static void proactor_wakeup_stub() {} -ULONG_PTR proactor_wakeup_key = (ULONG_PTR) &proactor_wakeup_stub; +static void proactor_wake_stub() {} +ULONG_PTR proactor_wake_key = (ULONG_PTR) &proactor_wake_stub; static void psocket_wakeup_stub() {} ULONG_PTR psocket_wakeup_key = (ULONG_PTR) &psocket_wakeup_stub; @@ -1651,7 +1655,7 @@ static void pcontext_finalize(pcontext_t* ctx) { } typedef struct psocket_t { - iocpdesc_t *iocpd; // NULL if reaper, or socket open failure. Reconnect will change this value. + iocpdesc_t *iocpd; // NULL if reaper, or socket open failure. pn_listener_t *listener; /* NULL for a connection socket */ char addr_buf[PN_MAX_ADDR]; const char *host, *port; @@ -1668,6 +1672,8 @@ static void psocket_init(psocket_t* ps, pn_listener_t *listener, bool is_reaper, struct pn_proactor_t { pcontext_t context; CRITICAL_SECTION write_lock; + CRITICAL_SECTION timer_lock; + CRITICAL_SECTION bind_lock; HANDLE timer_queue; HANDLE timeout_timer; iocp_t *iocp; @@ -1676,10 +1682,14 @@ struct pn_proactor_t { pcontext_t *contexts; /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */ pn_event_batch_t batch; size_t disconnects_pending; /* unfinished proactor disconnects*/ - bool interrupt; - bool inactive; - bool timeout_request; - bool timeout_elapsed; + + // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() + bool need_interrupt; + bool need_inactive; + bool need_timeout; + bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ + bool timeout_processed; /* timout event dispatched in the most recent event batch */ + bool delayed_interrupt; bool shutting_down; }; @@ -1700,6 +1710,10 @@ typedef struct pconnection_t { bool server; /* accept, not connect */ bool connecting; bool tick_pending; + bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ + bool bound; + bool stop_timer_required; + bool can_wake; HANDLE tick_timer; struct pn_netaddr_t local, remote; /* Actual addresses */ struct addrinfo *addrinfo; /* Resolved address list */ @@ -1710,7 +1724,7 @@ struct pn_listener_t { psocket_t *psockets; /* Array of listening sockets */ size_t psockets_size; pcontext_t context; - std::queue *accept_results; // completions awaiting a pn_listener_accept + std::queue *pending_accepts; // sockets awaiting a pn_listener_accept int pending_events; // number of PN_LISTENER_ACCEPT events to be delivered pn_condition_t *condition; pn_collector_t *collector; @@ -1718,9 +1732,7 @@ struct pn_listener_t { pn_record_t *attachments; void *listener_context; size_t backlog; - bool closing; bool close_dispatched; - bool running; }; @@ -1799,11 +1811,12 @@ class unique_socket { void do_complete(iocp_result_t *result) { iocpdesc_t *iocpd = result->iocpd; // connect result gets deleted switch (result->type) { -/* accept is now processed inline to do in parallel + case IOCP_ACCEPT: - complete_accept((accept_result_t *) result, result->status); + /* accept is now processed inline to do in parallel except on teardown */ + assert(iocpd->closing); + complete_accept((accept_result_t *) result, result->status); // free's result and retires new_sock break; -*/ case IOCP_CONNECT: complete_connect((connect_result_t *) result, result->status); break; @@ -1868,11 +1881,11 @@ static inline void proactor_wake_complete(pn_proactor_t *p) { } // Call wih lock held -static void proactor_wakeup(pn_proactor_t *p) { +static void proactor_wake(pn_proactor_t *p) { if (!p->context.working && !p->context.wake_pending) { p->context.wake_pending = true; p->context.completion_ops++; - post_completion(p->iocp, proactor_wakeup_key, p); + post_completion(p->iocp, proactor_wake_key, p); } } @@ -1900,10 +1913,20 @@ class reaper { if (iocpd->closing) return false; bool rval = !iocpd->ops_in_progress; pni_iocp_begin_close(iocpd); // sets iocpd->closing - start_timer(); + pn_decref(iocpd); // may still be ref counted on zombie list + reap_timer(); return rval; } + // For cases where the close will be immediate. I.E. after a failed + // connection attempt where there is no follow-on IO. + void fast_reap(iocpdesc_t *iocpd) { + assert(iocpd && iocpd->ops_in_progress == 0 && !iocpd->closing); + csguard g(&lock_); + pni_iocp_begin_close(iocpd); + pn_decref(iocpd); + } + bool process(iocp_result_t *result) { // No queue of completions for the reaper. Just process // serialized by the lock assuming all actions are "short". @@ -1911,13 +1934,15 @@ class reaper { // consumer/producer setup but just replace the reaper with a // multi threaded alternative. csguard g(&lock_); + iocpdesc_t *iocpd = result->iocpd; if (is_write_result(result)) { csguard wg(global_wlock_); do_complete(result); } else do_complete(result); - bool rval = (result->iocpd->ops_in_progress == 0); - pni_iocp_reap_check(result->iocpd); + // result may now be NULL + bool rval = (iocpd->ops_in_progress == 0); + pni_iocp_reap_check(iocpd); return rval; } @@ -1933,12 +1958,12 @@ class reaper { csguard g(&lock_); DeleteTimerQueueTimer(timer_queue_, timer_, NULL); timer_ = NULL; - start_timer(); + reap_timer(); } private: - void start_timer() { + void reap_timer() { // Call with lock if (timer_) return; @@ -2016,13 +2041,13 @@ static void psocket_error(psocket_t *ps, int err, const char* what) { pn_connection_driver_bind(driver); /* Bind so errors will be reported */ pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", what, ps->host, ps->port, - errno_str2(err)); + errno_str2(err).c_str()); pn_connection_driver_close(driver); } else { pn_listener_t *l = as_listener(ps); pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", what, ps->host, ps->port, - errno_str2(err)); + errno_str2(err).c_str()); listener_begin_close(l); } } @@ -2045,12 +2070,23 @@ static void pconnection_finalize(void *vp_pconnection) { static const pn_class_t pconnection_class = PN_CLASS(pconnection); -static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr) { - pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t)); - if (!pc) return NULL; +static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr) { + { + csguard g(&p->bind_lock); + pn_record_t *r = pn_connection_attachments(c); + if (pn_record_get(r, PN_PROACTOR)) { + free(pc); + return "pn_connection_t already in use"; + } + pn_record_def(r, PN_PROACTOR, &pconnection_class); + pn_record_set(r, PN_PROACTOR, pc); + pc->bound = true; + pc->can_wake = true; + } + if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) { free(pc); - return NULL; + return "pn_connection_driver_init failure"; } pc->completion_queue = new std::queue(); pc->work_queue = new std::queue(); @@ -2061,20 +2097,20 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo if (server) { pn_transport_set_server(pc->driver.transport); } - pn_record_t *r = pn_connection_attachments(pc->driver.connection); - pn_record_def(r, PN_PROACTOR, &pconnection_class); - pn_record_set(r, PN_PROACTOR, pc); + pn_decref(pc); /* Will be deleted when the connection is */ - return pc; + return NULL; } +// Either stops a timer before firing or returns after the callback has +// completed (in the threadpool thread). Never "in doubt". static bool stop_timer(HANDLE tqueue, HANDLE *timer) { if (!*timer) return true; if (DeleteTimerQueueTimer(tqueue, *timer, INVALID_HANDLE_VALUE)) { *timer = NULL; return true; } - return false; + return false; // error } static bool start_timer(HANDLE tqueue, HANDLE *timer, WAITORTIMERCALLBACK cb, void *cb_arg, DWORD time) { @@ -2082,7 +2118,7 @@ static bool start_timer(HANDLE tqueue, HANDLE *timer, WAITORTIMERCALLBACK cb, vo // TODO: log err return false; } - return CreateTimerQueueTimer(timer, tqueue, reap_check_cb, cb_arg, time, 0, WT_EXECUTEONLYONCE); + return CreateTimerQueueTimer(timer, tqueue, cb, cb_arg, time, 0, WT_EXECUTEONLYONCE); } VOID CALLBACK tick_timer_cb(PVOID arg, BOOLEAN /* ignored*/ ) { @@ -2132,9 +2168,10 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) { return NULL; } +// Call with lock held when closing and transitioning away from working context static inline bool pconnection_can_free(pconnection_t *pc) { return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0 - && pc->tick_timer == NULL && !pconnection_has_event(pc); + && !pc->stop_timer_required && !pconnection_has_event(pc) && !pc->queued_disconnect; } static void pconnection_final_free(pconnection_t *pc) { @@ -2148,6 +2185,18 @@ static void pconnection_final_free(pconnection_t *pc) { /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */ } +// Call with lock held or from forced shutdown +static void pconnection_begin_close(pconnection_t *pc) { + if (!pc->context.closing) { + pc->context.closing = true; + pn_connection_driver_close(&pc->driver); + pc->stop_timer_required = true; + if (pc->context.proactor->reaper->add(pc->psocket.iocpd)) + pc->psocket.iocpd = NULL; + wakeup(&pc->psocket); + } +} + // call with lock held. return true if caller must call pconnection_final_free() static bool pconnection_cleanup(pconnection_t *pc) { delete pc->completion_queue; @@ -2156,7 +2205,7 @@ static bool pconnection_cleanup(pconnection_t *pc) { } static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->completion_queue->size() || pc->wake_count || pc->tick_pending) + if (pc->completion_queue->size() || pc->wake_count || pc->tick_pending || pc->queued_disconnect) return true; pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); return (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE)); @@ -2207,33 +2256,45 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r return NULL; pc->context.working = true; } - open = pc->psocket.iocpd && !pc->psocket.iocpd->closing; // && !reconnecting (TODO) + open = !pc->connecting && !pc->context.closing; } else { + // Just re-acquired lock after processing IO and engine work if (pconnection_has_event(pc)) return &pc->batch; - if (!open && pconnection_can_free(pc)) { - if (pconnection_cleanup(pc)) { - g.release(); - pconnection_final_free(pc); - } - return NULL; - } - if (!pconnection_work_pending(pc)) { pc->context.working = false; + if (pn_connection_driver_finished(&pc->driver)) { + pconnection_begin_close(pc); + } + if (pc->context.closing && pconnection_can_free(pc)) { + if (pconnection_cleanup(pc)) { + g.release(); + pconnection_final_free(pc); + return NULL; + } // else disconnect logic has the free obligation + } return NULL; } } + if (pc->queued_disconnect) { // From pn_proactor_disconnect() + pc->queued_disconnect = false; + if (!pc->context.closing) { + if (pc->disconnect_condition) { + pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition); + } + pn_connection_driver_close(&pc->driver); + } + } assert(pc->work_queue->empty()); if (pc->completion_queue->size()) std::swap(pc->work_queue, pc->completion_queue); if (pc->wake_count) { - waking = true; + waking = open && pc->can_wake && !pn_connection_driver_finished(&pc->driver); pc->wake_count = 0; } if (pc->tick_pending) { @@ -2250,8 +2311,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r result = (iocp_result_t *) pc->work_queue->front(); pc->work_queue->pop(); if (result->iocpd->closing) { - bool is_current = result->iocpd == pc->psocket.iocpd; // in case of reconnect - if (pc->context.proactor->reaper->process(result) && is_current) { + if (pc->context.proactor->reaper->process(result)) { pc->psocket.iocpd = NULL; // reaped open = false; } @@ -2263,13 +2323,21 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r } else if (is_connect_result(result)) { connect_step_done(pc, (connect_result_t *) result); - open = pc->psocket.iocpd && (pc->psocket.iocpd->events & PN_WRITABLE); // && !reconnecting (TODO) + open = pc->psocket.iocpd && (pc->psocket.iocpd->events & PN_WRITABLE); + if (open) + pc->connecting = false; } else do_complete(result); } } - if (open) { + if (!open) { + if (pc->stop_timer_required) { + pc->stop_timer_required = false; + // Do without context lock to avoid possible deadlock + stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer); + } + } else { pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); @@ -2292,6 +2360,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r if (rbuf.size > 0 && !pc->psocket.iocpd->read_in_progress) { bool wouldblock; ssize_t n = pni_iocp_recv(pc->psocket.iocpd, rbuf.start, rbuf.size, &wouldblock, pc->psocket.iocpd->error); + if (n > 0) { pn_connection_driver_read_done(&pc->driver, n); pconnection_tick(pc); /* check for tick changes. */ @@ -2341,17 +2410,24 @@ static void pconnection_done(pconnection_t *pc) { if (pconnection_has_event(pc) || pconnection_work_pending(pc)) { wakeup(&pc->psocket); } else if (pn_connection_driver_finished(&pc->driver)) { - if (pc->context.proactor->reaper->add(pc->psocket.iocpd)) - pc->psocket.iocpd = NULL; - pn_connection_driver_close(&pc->driver); - g.release(); // possible deadlock - stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer); - csguard g2(&pc->context.cslock); + pconnection_begin_close(pc); wakeup(&pc->psocket); } } } +static inline bool is_inactive(pn_proactor_t *p) { + return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->need_timeout && !p->shutting_down); +} + +// Call whenever transitioning from "definitely active" to "maybe inactive" +static void wake_if_inactive(pn_proactor_t *p) { + if (is_inactive(p)) { + p->need_inactive = true; + proactor_wake(p); + } +} + void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); if (pc) { @@ -2367,14 +2443,22 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (bp == p) { csguard g(&p->context.cslock); p->context.working = false; - proactor_wakeup(p); + if (p->delayed_interrupt) { + p->delayed_interrupt = false; + p->need_interrupt = true; + } + if (p->timeout_processed) { + p->timeout_processed = false; + wake_if_inactive(p); + } + if (proactor_update_batch(p)) + proactor_wake(p); return; } } static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) { pn_collector_put(p->collector, pn_proactor__class(), p, t); - p->context.working = true; } static pn_event_batch_t *proactor_process(pn_proactor_t *p) { @@ -2404,24 +2488,26 @@ static pn_event_batch_t *psocket_process(psocket_t *ps, iocp_result_t *result, r return NULL; } -pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { +static pn_event_batch_t *proactor_completion_loop(struct pn_proactor_t* p, bool can_block) { // Proact! Process inbound completions of async activity until one // of them provides a batch of events. while(true) { pn_event_batch_t *batch = NULL; - DWORD win_timeout = INFINITE; + DWORD win_timeout = can_block ? INFINITE : 0; DWORD num_xfer = 0; ULONG_PTR completion_key = 0; OVERLAPPED *overlapped = 0; bool good_op = GetQueuedCompletionStatus (p->iocp->completion_port, &num_xfer, &completion_key, &overlapped, win_timeout); + if (!overlapped && !can_block && GetLastError() == WAIT_TIMEOUT) + return NULL; // valid timeout + if (!good_op && !overlapped) { // Should never happen. shutdown? // We aren't expecting a timeout, closed completion port, or other error here. - // Logger? - fprintf(stderr, errno_str("Windows Proton proactor internal failure %d\n", false).c_str()); + pn_logf("%s", errno_str("Windows Proton proactor internal failure\n", false).c_str()); abort(); } @@ -2439,7 +2525,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { // data structure for our own use. if (completion_key == psocket_wakeup_key) batch = psocket_process((psocket_t *) overlapped, NULL, p->reaper); - else if (completion_key == proactor_wakeup_key) + else if (completion_key == proactor_wake_key) batch = proactor_process((pn_proactor_t *) overlapped); else if (completion_key == recycle_accept_key) recycle_result((accept_result_t *) overlapped); @@ -2449,26 +2535,71 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { } } +pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { + return proactor_completion_loop(p, true); +} + +pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) { + return proactor_completion_loop(p, false); +} void pn_proactor_interrupt(pn_proactor_t *p) { csguard g(&p->context.cslock); - p->interrupt = true; - proactor_wakeup(p); + if (p->context.working) + p->delayed_interrupt = true; + else + p->need_interrupt = true; + proactor_wake(p); } -// runs on a threadpool thread +// runs on a threadpool thread. Must not hold timer_lock. VOID CALLBACK timeout_cb(PVOID arg, BOOLEAN /* ignored*/ ) { pn_proactor_t *p = (pn_proactor_t *) arg; + csguard gtimer(&p->timer_lock); csguard g(&p->context.cslock); - p->timeout_request = true; - proactor_wakeup(p); + if (p->timeout_set) + p->need_timeout = true; // else cancelled + p->timeout_set = false; + if (p->need_timeout) + proactor_wake(p); } void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { - stop_timer(p->timer_queue, &p->timeout_timer); - csguard g(&p->context.cslock); - p->timeout_request = false; - start_timer(p->timer_queue, &p->timeout_timer, timeout_cb, p, t); + bool ticking = false; + csguard gtimer(&p->timer_lock); + { + csguard g(&p->context.cslock); + ticking = (p->timeout_timer != NULL); + if (t == 0) { + p->need_timeout = true; + p->timeout_set = false; + proactor_wake(p); + } + else + p->timeout_set = true; + } + // Just timer_lock held + if (ticking) { + stop_timer(p->timer_queue, &p->timeout_timer); + } + if (t) { + start_timer(p->timer_queue, &p->timeout_timer, timeout_cb, p, t); + } +} + +void pn_proactor_cancel_timeout(pn_proactor_t *p) { + bool ticking = false; + csguard gtimer(&p->timer_lock); + { + csguard g(&p->context.cslock); + p->timeout_set = false; + ticking = (p->timeout_timer != NULL); + } + if (ticking) { + stop_timer(p->timer_queue, &p->timeout_timer); + csguard g(&p->context.cslock); + wake_if_inactive(p); + } } // Return true if connect_step_done()will handle connection status @@ -2479,50 +2610,52 @@ static bool connect_step(pconnection_t *pc) { pc->ai = pc->ai->ai_next; /* Move to next address in case this fails */ unique_socket fd(::socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol)); if (fd != INVALID_SOCKET) { - pni_configure_sock_2(fd); - if (!pc->psocket.iocpd) { + // Windows ConnectEx requires loosely bound socket. + sockaddr_storage sa; + memset(&sa, 0, sizeof(sa)); + sa.ss_family = ai->ai_family; + if (!bind(fd, (SOCKADDR *) &sa, ai->ai_addrlen)) { + pni_configure_sock_2(fd); pc->psocket.iocpd = pni_iocpdesc_create(p->iocp, fd); assert(pc->psocket.iocpd); pc->psocket.iocpd->write_closed = true; pc->psocket.iocpd->read_closed = true; - } - else - pc->psocket.iocpd->socket = fd; - fd.release(); - iocpdesc_t *iocpd = pc->psocket.iocpd; - if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) { - LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex2(iocpd->socket); - // addrinfo is owned by the pconnection so pass NULL to the connect result - connect_result_t *result = connect_result(iocpd, NULL); - DWORD unused; - bool success = fn_connect_ex(iocpd->socket, ai->ai_addr, ai->ai_addrlen, - NULL, 0, &unused, (LPOVERLAPPED) result); - if (success || WSAGetLastError() == ERROR_IO_PENDING) { - iocpd->ops_in_progress++; - iocpd->active_completer = &pc->psocket; - return true; // logic resumes at connect_step_done() + fd.release(); + iocpdesc_t *iocpd = pc->psocket.iocpd; + if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) { + LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex2(iocpd->socket); + // addrinfo is owned by the pconnection so pass NULL to the connect result + connect_result_t *result = connect_result(iocpd, NULL); + DWORD unused; + bool success = fn_connect_ex(iocpd->socket, ai->ai_addr, ai->ai_addrlen, + NULL, 0, &unused, (LPOVERLAPPED) result); + if (success || WSAGetLastError() == ERROR_IO_PENDING) { + iocpd->ops_in_progress++; + iocpd->active_completer = &pc->psocket; + return true; // logic resumes at connect_step_done() + } + pn_free(result); } - pn_free(result); } - closesocket(pc->psocket.iocpd->socket); // try again - pc->psocket.iocpd->socket = INVALID_SOCKET; + if (pc->psocket.iocpd) { + pc->context.proactor->reaper->fast_reap(pc->psocket.iocpd); + pc->psocket.iocpd = NULL; + } } } + pc->context.closing = true; return false; } static void connect_step_done(pconnection_t *pc, connect_result_t *result) { csguard g(&pc->context.cslock); - if (result->base.status && pc->ai) { - if (connect_step(pc)) { - // Trying the next addrinfo possibility - g.release(); - pn_free(result); - return; - } - } - complete_connect(result, result->base.status); // starts reading if open, frees result - if (!result->base.status) { + DWORD saved_status = result->base.status; + iocpdesc_t *iocpd = result->base.iocpd; + iocpd->ops_in_progress--; + assert(pc->psocket.iocpd == iocpd); + complete_connect(result, result->base.status); // frees result, starts regular IO if connected + + if (!saved_status) { // Success pc->psocket.iocpd->write_closed = false; pc->psocket.iocpd->read_closed = false; @@ -2533,15 +2666,34 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) { pc->ai = NULL; return; } - psocket_error(&pc->psocket, GetLastError(), "connect to "); - wakeup(&pc->psocket); + else { + // Descriptor will never be used. Dispose. + // Connect failed, no IO started, i.e. no pending iocpd based events + pc->context.proactor->reaper->fast_reap(iocpd); + pc->psocket.iocpd = NULL; + // Is there a next connection target in the addrinfo to try? + if (pc->ai && connect_step(pc)) { + // Trying the next addrinfo possibility. Will return here. + return; + } + // Give up + psocket_error(&pc->psocket, saved_status, "connect to "); + pc->context.closing = true; + wakeup(&pc->psocket); + } } void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) { - pconnection_t *pc = new_pconnection_t(p, c, false, addr); + pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t)); assert(pc); // TODO: memory safety + const char *err = pconnection_setup(pc, p, c, false, addr); + if (err) { + pn_logf("pn_proactor_connect failure: %s", err); + return; + } // TODO: check case of proactor shutting down csguard g(&pc->context.cslock); + pc->connecting = true; proactor_add(&pc->context); pn_connection_open(pc->driver.connection); /* Auto-open */ @@ -2559,6 +2711,24 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) } } +void pn_proactor_release_connection(pn_connection_t *c) { + bool notify = false; + pconnection_t *pc = get_pconnection(c); + if (pc) { + csguard g(&pc->context.cslock); + // reverse lifecycle entanglement of pc and c from new_pconnection_t() + pn_incref(pc); + pn_proactor_t *p = pc->context.proactor; + csguard g2(&p->bind_lock); + pn_record_t *r = pn_connection_attachments(pc->driver.connection); + pn_record_set(r, PN_PROACTOR, NULL); + pn_connection_driver_release_connection(&pc->driver); + pc->bound = false; // Transport unbound + g2.release(); + pconnection_begin_close(pc); + } +} + void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) { csguard g(&l->context.cslock); @@ -2598,11 +2768,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in psocket_t *ps = &l->psockets[l->psockets_size++]; psocket_init(ps, l, false, addr); ps->iocpd = iocpd; + iocpd->is_mp = true; iocpd->active_completer = ps; pni_iocpdesc_start(ps->iocpd); - // TODO: make configurable or able to grow with load - for (int i=0; i < 4; i++) - begin_accept(ps->iocpd->acceptor, NULL); } } } @@ -2630,11 +2798,11 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in static pn_event_batch_t *batch_owned(pn_listener_t *l) { if (l->close_dispatched) return NULL; if (!l->context.working) { - if (pn_collector_peek(l->collector)) { + if (listener_has_event(l)) { l->context.working = true; return &l->batch; } - assert(!(l->closing && l->pending_events)); + assert(!(l->context.closing && l->pending_events)); if (l->pending_events) { pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); l->pending_events--; @@ -2654,42 +2822,68 @@ static void listener_close_all(pn_listener_t *l) { } } -static bool listener_maybe_free(pn_listener_t *l, csguard *guard) { - fprintf(stderr, "fixme: dangling listener_maybe_free. aborting\n"); - fflush(stderr); - abort(); - // TODO: revised mechanism that works with disconnect and proactor_remove +static bool listener_can_free(pn_listener_t *l) { + if (!l->close_dispatched) return false; + if (l->context.working || l->context.completion_ops) return false; + for (size_t i = 0; i < l->psockets_size; ++i) { + psocket_t *ps = &l->psockets[i]; + if (ps->iocpd) + return false; + } + return true; +} + +/* Call with lock not held */ +static inline void listener_final_free(pn_listener_t *l) { + pcontext_finalize(&l->context); + free(l->psockets); + if (l->collector) pn_collector_free(l->collector); + if (l->condition) pn_condition_free(l->condition); + if (l->attachments) pn_free(l->attachments); + free(l); +} + +/* Call with listener lock held by lg.*/ +static void internal_listener_free(pn_listener_t *l, csguard &g) { + bool can_free = true; + if (l->context.proactor) { + can_free = proactor_remove(&l->context); + } + g.release(); + if (can_free) + listener_final_free(l); + // else final free is done by proactor_disconnect() +} + +static bool listener_maybe_free(pn_listener_t *l, csguard &g) { + if (listener_can_free(l)) { + internal_listener_free(l, g); + return true; + } return false; } static pn_event_batch_t *listener_process(pn_listener_t *l, iocp_result_t *result) { accept_result_t *accept_result = NULL; - bool reaping = false; psocket_t *ps = NULL; { csguard g(&l->context.cslock); - if (l->closing) { - listener_close_all(l); - reaping = true; - } if (!result) { - ps = l->psockets; wake_complete(&l->context); - if (listener_maybe_free(l, &g)) return NULL; + if (listener_maybe_free(l, g)) return NULL; return batch_owned(l); } else ps = (psocket_t *) result->iocpd->active_completer; - if (!reaping && result->status) { - psocket_error(ps, WSAGetLastError(), "listen on "); - listener_close_all(l); - reaping = true; + if (!l->context.closing && result->status) { + psocket_error(ps, WSAGetLastError(), "listen on "); // initiates close/multi-reap } - if (reaping) { - if (l->context.proactor->reaper->process(result)) + if (l->context.closing) { + if (l->context.proactor->reaper->process(result)) { ps->iocpd = NULL; - if (listener_maybe_free(l, &g)) return NULL; + if (listener_maybe_free(l, g)) return NULL; + } return batch_owned(l); } @@ -2712,12 +2906,17 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, iocp_result_t *resul { csguard g(&l->context.cslock); l->context.completion_ops--; + accept_result->new_sock->ops_in_progress--; + ps->iocpd->ops_in_progress--; + // add even if closing to reuse cleanup code - l->accept_results->push(accept_result); + l->pending_accepts->push(accept_result); + if (!ps->iocpd->ops_in_progress) + begin_accept(ps->iocpd->acceptor, NULL); // Start another, up to IOCP_MAX_ACCEPTS l->pending_events++; - if (l->closing) + if (l->context.closing) release_pending_accepts(l); - if (listener_maybe_free(l, &g)) return NULL; + if (listener_maybe_free(l, g)) return NULL; return batch_owned(l); } } @@ -2730,8 +2929,10 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { void pn_connection_wake(pn_connection_t* c) { pconnection_t *pc = get_pconnection(c); csguard g(&pc->context.cslock); - pc->wake_count++; - wakeup(&pc->psocket); + if (!pc->context.closing) { + pc->wake_count++; + wakeup(&pc->psocket); + } } pn_proactor_t *pn_proactor() { @@ -2753,6 +2954,10 @@ pn_proactor_t *pn_proactor() { if (!WSAStartup(wsa_ver, &unused)) { wsa = true; if (iocp = pni_iocp()) { + InitializeCriticalSectionAndSpinCount(&p->context.cslock, 4000); + InitializeCriticalSectionAndSpinCount(&p->write_lock, 4000); + InitializeCriticalSectionAndSpinCount(&p->timer_lock, 4000); + InitializeCriticalSectionAndSpinCount(&p->bind_lock, 4000); try { r = new reaper(p, &p->write_lock, iocp); // success @@ -2761,15 +2966,13 @@ pn_proactor_t *pn_proactor() { p->batch.next_event = &proactor_batch_next; p->collector = c; p->timer_queue = tq; - InitializeCriticalSectionAndSpinCount(&p->context.cslock, 4000); - InitializeCriticalSectionAndSpinCount(&p->write_lock, 4000); return p; } catch (...) {} } } } } - fprintf(stderr, errno_str("Windows Proton proactor OS resource failure %d\n", false).c_str()); + fprintf(stderr, "%s\n", errno_str("Windows Proton proactor OS resource failure", false).c_str()); if (iocp) pn_free((void *) iocp); if (wsa) WSACleanup(); free(c); @@ -2780,7 +2983,10 @@ pn_proactor_t *pn_proactor() { void pn_proactor_free(pn_proactor_t *p) { DeleteTimerQueueEx(p->timer_queue, INVALID_HANDLE_VALUE); + DeleteCriticalSection(&p->timer_lock); + DeleteCriticalSection(&p->bind_lock); proactor_shutdown(p); + delete p->reaper; WSACleanup(); pn_collector_free(p->collector); @@ -2791,14 +2997,14 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { pn_listener_t *l = batch_listener(batch); { csguard g(&l->context.cslock); - if (!pn_collector_peek(l->collector) && l->pending_events > 0) { + if (!listener_has_event(l) && l->pending_events) { pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); l->pending_events--; } pn_event_t *e = pn_collector_next(l->collector); if (e && pn_event_type(e) == PN_LISTENER_CLOSE) l->close_dispatched = true; - return e; + return log_event(l, e); } } @@ -2807,11 +3013,11 @@ static void listener_done(pn_listener_t *l) { csguard g(&l->context.cslock); l->context.working = false; if (l->close_dispatched) { - listener_maybe_free(l, &g); + listener_maybe_free(l, g); return; } else - if (pn_collector_peek(l->collector)) + if (listener_has_event(l)) wakeup(l->psockets); } } @@ -2823,7 +3029,7 @@ pn_listener_t *pn_listener() { l->collector = pn_collector(); l->condition = pn_condition(); l->attachments = pn_record(); - l->accept_results = new std::queue(); + l->pending_accepts = new std::queue(); if (!l->condition || !l->collector || !l->attachments) { pn_listener_free(l); return NULL; @@ -2834,47 +3040,27 @@ pn_listener_t *pn_listener() { return l; } -static bool listener_can_free(pn_listener_t *l) { - if (!l->close_dispatched) return false; - if (!l->running) return true; - if (l->context.working) return false; - for (size_t i = 0; i < l->psockets_size; ++i) { - psocket_t *ps = &l->psockets[i]; - if (l->context.completion_ops || ps->iocpd) - return false; - } - return true; -} - -static inline void listener_final_free(pn_listener_t *l) { - pcontext_finalize(&l->context); - free(l->psockets); - free(l); -} - void pn_listener_free(pn_listener_t *l) { - /* Note at this point either the listener has never been used (freed by user) - or it has been closed, so all its sockets are closed. + /* Note at this point either the listener has never been used (freed + by user) or it has been closed, and all pending operations + completed, i.e. listener_can_free() is true. */ if (l) { - bool can_free = true; - if (l->collector) pn_collector_free(l->collector); - if (l->condition) pn_condition_free(l->condition); - if (l->attachments) pn_free(l->attachments); csguard g(&l->context.cslock); if (l->context.proactor) { - can_free = proactor_remove(&l->context); + internal_listener_free(l, g); + return; } + // freed by user g.release(); - if (can_free) - listener_final_free(l); + listener_final_free(l); } } static void listener_begin_close(pn_listener_t* l) { - if (l->closing) + if (l->context.closing) return; - l->closing = true; + l->context.closing = true; listener_close_all(l); release_pending_accepts(l); pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); @@ -2882,10 +3068,8 @@ static void listener_begin_close(pn_listener_t* l) { void pn_listener_close(pn_listener_t* l) { csguard g(&l->context.cslock); - if (!l->context.closing) { - listener_begin_close(l); - wakeup(&l->psockets[0]); - } + listener_begin_close(l); + wakeup(&l->psockets[0]); } pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { @@ -2910,9 +3094,9 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) { static void release_pending_accepts(pn_listener_t *l) { // called with lock held or at shutdown - while (!l->accept_results->empty()) { - accept_result_t *accept_result = l->accept_results->front(); - l->accept_results->pop(); + while (!l->pending_accepts->empty()) { + accept_result_t *accept_result = l->pending_accepts->front(); + l->pending_accepts->pop(); psocket_t *ps = (psocket_t *) accept_result->base.iocpd->active_completer; accept_result->new_sock->ops_in_progress--; ps->iocpd->ops_in_progress--; @@ -2927,16 +3111,15 @@ static void recycle_result(accept_result_t *accept_result) { pn_listener_t *l = ps->listener; reset_accept_result(accept_result); accept_result->new_sock = create_same_type_socket(ps->iocpd); - accept_result->new_sock->is_mp = true; { csguard g(&l->context.cslock); - if (l->closing) { + if (l->context.closing && accept_result->new_sock) { closesocket(accept_result->new_sock->socket); accept_result->new_sock->socket = INVALID_SOCKET; } begin_accept(ps->iocpd->acceptor, accept_result); // cleans up if closing l->context.completion_ops--; - if (l->closing && listener_maybe_free(l, &g)) + if (l->context.closing && listener_maybe_free(l, g)) return; } } @@ -2949,20 +3132,23 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { { csguard g(&l->context.cslock); - pconnection_t *pc = new_pconnection_t(p, c, false, ""); + pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t)); assert(pc); // TODO: memory safety + const char *err_str = pconnection_setup(pc, p, c, true, ""); + if (err_str) { + pn_logf("pn_listener_accept failure: %s", err_str); + return; + } proactor_add(&pc->context); - if (l->closing) + if (l->context.closing) err = WSAESHUTDOWN; - else if (l->accept_results->empty()) + else if (l->pending_accepts->empty()) err = WSAEWOULDBLOCK; else { - accept_result = l->accept_results->front(); - l->accept_results->pop(); + accept_result = l->pending_accepts->front(); + l->pending_accepts->pop(); ps = (psocket_t *) accept_result->base.iocpd->active_completer; - accept_result->new_sock->ops_in_progress--; - ps->iocpd->ops_in_progress--; l->context.completion_ops++; // for recycle_result iocpdesc_t *conn_iocpd = accept_result->new_sock; pc->psocket.iocpd = conn_iocpd; @@ -2984,20 +3170,20 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { // Call with lock held. Leave unchanged if events pending. // Return true if there is an event in the collector static bool proactor_update_batch(pn_proactor_t *p) { - if (pn_collector_peek(p->collector)) + if (proactor_has_event(p)) return true; - if (p->timeout_elapsed) { - p->timeout_elapsed = false; + if (p->need_timeout) { + p->need_timeout = false; proactor_add_event(p, PN_PROACTOR_TIMEOUT); return true; } - if (p->interrupt) { - p->interrupt = false; + if (p->need_interrupt) { + p->need_interrupt = false; proactor_add_event(p, PN_PROACTOR_INTERRUPT); return true; } - if (p->inactive) { - p->inactive = false; + if (p->need_inactive) { + p->need_inactive = false; proactor_add_event(p, PN_PROACTOR_INACTIVE); return true; } @@ -3006,12 +3192,15 @@ static bool proactor_update_batch(pn_proactor_t *p) { static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { pn_proactor_t *p = batch_proactor(batch); - pn_collector_t *c = p->collector; - if (!pn_collector_peek(c)) { + pn_event_t *e = pn_collector_next(p->collector); + if (!e) { csguard g(&p->context.cslock); proactor_update_batch(p); + e = pn_collector_next(p->collector); } - return pn_collector_next(c); + if (e && pn_event_type(e) == PN_PROACTOR_TIMEOUT) + p->timeout_processed = true; + return log_event(p, e); } static void proactor_add(pcontext_t *ctx) { @@ -3051,17 +3240,15 @@ static bool proactor_remove(pcontext_t *ctx) { ctx->next->prev = ctx->prev; } } - proactor_wakeup(p); + wake_if_inactive(p); return can_free; } static void pconnection_forced_shutdown(pconnection_t *pc) { - // Called by proactor_free, no competing threads, no iocp activity. + // Called by proactor_free, no competing threads processing iocp activity. + pconnection_begin_close(pc); + // Timer threads may lurk. No lock held, so no deadlock risk stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer); - if (pc->psocket.iocpd) { - pc->context.proactor->reaper->add(pc->psocket.iocpd); - pc->psocket.iocpd = NULL; - } pconnection_final_free(pc); } @@ -3095,8 +3282,8 @@ static void proactor_shutdown(pn_proactor_t *p) { } void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { -#ifdef todo_for_0_18 - // TODO: fix mutexes below from epoll to cs locks + pcontext_t *disconnecting_pcontexts = NULL; + pcontext_t *ctx = NULL; { csguard g(&p->context.cslock); // Move the whole contexts list into a disconnecting state @@ -3117,15 +3304,14 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { return; // Second pass: different locking, close the pcontexts, free them if !disconnect_ops - bool notify = false; for (ctx = disconnecting_pcontexts; ctx; ctx = ctx ? ctx->next : NULL) { bool do_free = false; - bool ctx_notify = true; - pmutex *ctx_mutex = NULL; pconnection_t *pc = pcontext_pconnection(ctx); + pn_listener_t *l = pc ? NULL : pcontext_listener(ctx); + CRITICAL_SECTION *ctx_cslock = pc ? &pc->context.cslock : &l->context.cslock; + csguard ctx_guard(ctx_cslock); if (pc) { - ctx_mutex = &pc->context.mutex; - lock(ctx_mutex); + pc->can_wake = false; if (!ctx->closing) { if (ctx->working) { // Must defer @@ -3145,10 +3331,8 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { } } } else { - pn_listener_t *l = pcontext_listener(ctx); + assert(l); - ctx_mutex = &l->context.mutex; - lock(ctx_mutex); if (!ctx->closing) { if (cond) { pn_condition_copy(pn_listener_condition(l), cond); @@ -3157,46 +3341,23 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { } } - lock(&p->context.mutex); + csguard p_guard(&p->context.cslock); if (--ctx->disconnect_ops == 0) { do_free = true; - ctx_notify = false; - notify = wake_if_inactive(p); + wake_if_inactive(p); } else { // If initiating the close, wake the pcontext to do the free. - if (ctx_notify) - ctx_notify = wake(ctx); + wakeup(pc ? &pc->psocket : l->psockets); } - unlock(&p->context.mutex); - unlock(ctx_mutex); + p_guard.release(); + ctx_guard.release(); if (do_free) { if (pc) pconnection_final_free(pc); else listener_final_free(pcontext_listener(ctx)); - } else { - if (ctx_notify) - wake_notify(ctx); } } - if (notify) - wake_notify(&p->context); -#endif -} - - -//TODO!! -void pn_proactor_release_connection(pn_connection_t *c) { - abort(); -} -void pn_proactor_cancel_timeout(pn_proactor_t *p) { - abort(); } -pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) { - abort(); - return NULL; -} - - static int pni2_snprintf(char *buf, size_t count, const char *fmt, ...); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org