qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cliffjan...@apache.org
Subject qpid-proton git commit: PROTON-1349: completed and improved implementation, but still fails many tests
Date Fri, 15 Sep 2017 06:29:23 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master 83fa38808 -> 6a57a8c98


PROTON-1349: completed and improved implementation, but still fails many tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6a57a8c9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6a57a8c9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6a57a8c9

Branch: refs/heads/master
Commit: 6a57a8c986fbf86ad8ad109d673a89a5ae84c544
Parents: 83fa388
Author: Clifford Jansen <cliffjansen@apache.org>
Authored: Thu Sep 14 23:29:14 2017 -0700
Committer: Clifford Jansen <cliffjansen@apache.org>
Committed: Thu Sep 14 23:29:14 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/win_iocp.c | 659 +++++++++++++++++++++-------------
 1 file changed, 410 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6a57a8c9/proton-c/src/proactor/win_iocp.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/win_iocp.c b/proton-c/src/proactor/win_iocp.c
index 1a79023..0ebaf90 100644
--- a/proton-c/src/proactor/win_iocp.c
+++ b/proton-c/src/proactor/win_iocp.c
@@ -134,8 +134,8 @@ struct iocpdesc_t {
 };
 
 
-// Max number of overlapped accepts per listener
-#define IOCP_MAX_ACCEPTS 10
+// Max number of overlapped accepts per listener.  TODO: configurable.
+#define IOCP_MAX_ACCEPTS 4
 
 // AcceptEx squishes the local and remote addresses and optional data
 // all together when accepting the connection. Reserve enough for
@@ -715,11 +715,12 @@ static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
 void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
 {
   // flag to divide this routine's logic into locked/unlocked mp portions
-  bool mp = result && result->new_sock && result->new_sock->is_mp;
+  bool mp = acceptor->listen_sock->is_mp;
+  bool created = false;
 
   if (acceptor->listen_sock->closing) {
     if (result) {
-      if (mp && result->new_sock->socket != INVALID_SOCKET)
+      if (mp && result->new_sock && result->new_sock->socket != INVALID_SOCKET)
         closesocket(result->new_sock->socket);
       free(result);
       acceptor->accept_queue_size--;
@@ -733,17 +734,18 @@ void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
     if (!mp)
       reset_accept_result(result);
   } else {
-    if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS &&
-        pn_list_size(acceptor->accepts) == acceptor->accept_queue_size ) {
+    if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && (mp ||
+           pn_list_size(acceptor->accepts) == acceptor->accept_queue_size )) {
       result = accept_result(acceptor->listen_sock);
       acceptor->accept_queue_size++;
+      created = true;
     } else {
       // an async accept is still pending or max concurrent accepts already hit
       return;
     }
   }
 
-  if (!mp)
+  if (created || !mp)
     result->new_sock = create_same_type_socket(acceptor->listen_sock);
   if (result->new_sock) {
     // Not yet connected.
@@ -782,9 +784,11 @@ static void complete_accept(accept_result_t *result, HRESULT status)
   if (ld->read_closed) {
     if (!result->new_sock->closing)
       pni_iocp_begin_close(result->new_sock);
+    pn_decref(result->new_sock);
     free(result);    // discard
     reap_check(ld);
   } else {
+    assert(!ld->is_mp);  // Non mp only
     result->base.status = status;
     pn_list_add(ld->acceptor->accepts, result);
     pni_events_update(ld, ld->events | PN_READABLE);
@@ -953,7 +957,7 @@ ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, boo
     assert(result);
     result->base.iocpd = iocpd;
     ssize_t actual_len = len;
-    if (actual_len > result->buffer.size) actual_len = result->buffer.size;
+    if (len > result->buffer.size) actual_len = result->buffer.size;
     result->requested = actual_len;
     memmove((void *)result->buffer.start, outgoing, actual_len);
     outgoing += actual_len;
@@ -1579,8 +1583,8 @@ std::string errno_str(const std::string& msg, bool is_wsa) {
 
 using namespace pn_experimental;
 
-static void proactor_wakeup_stub() {}
-ULONG_PTR proactor_wakeup_key = (ULONG_PTR) &proactor_wakeup_stub;
+static void proactor_wake_stub() {}
+ULONG_PTR proactor_wake_key = (ULONG_PTR) &proactor_wake_stub;
 
 static void psocket_wakeup_stub() {}
 ULONG_PTR psocket_wakeup_key = (ULONG_PTR) &psocket_wakeup_stub;
@@ -1651,7 +1655,7 @@ static void pcontext_finalize(pcontext_t* ctx) {
 }
 
 typedef struct psocket_t {
-  iocpdesc_t *iocpd;    // NULL if reaper, or socket open failure. Reconnect will change this value.
+  iocpdesc_t *iocpd;    // NULL if reaper, or socket open failure.
   pn_listener_t *listener;      /* NULL for a connection socket */
   char addr_buf[PN_MAX_ADDR];
   const char *host, *port;
@@ -1668,6 +1672,8 @@ static void psocket_init(psocket_t* ps, pn_listener_t *listener, bool is_reaper,
 struct pn_proactor_t {
   pcontext_t context;
   CRITICAL_SECTION write_lock;
+  CRITICAL_SECTION timer_lock;
+  CRITICAL_SECTION bind_lock;
   HANDLE timer_queue;
   HANDLE timeout_timer;
   iocp_t *iocp;
@@ -1676,10 +1682,14 @@ struct pn_proactor_t {
   pcontext_t *contexts;         /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
   pn_event_batch_t batch;
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
-  bool interrupt;
-  bool inactive;
-  bool timeout_request;
-  bool timeout_elapsed;
+
+  // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+  bool need_interrupt;
+  bool need_inactive;
+  bool need_timeout;
+  bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+  bool timeout_processed;  /* timout event dispatched in the most recent event batch */
+  bool delayed_interrupt;
   bool shutting_down;
 };
 
@@ -1700,6 +1710,10 @@ typedef struct pconnection_t {
   bool server;                /* accept, not connect */
   bool connecting;
   bool tick_pending;
+  bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
+  bool bound;
+  bool stop_timer_required;
+  bool can_wake;
   HANDLE tick_timer;
   struct pn_netaddr_t local, remote; /* Actual addresses */
   struct addrinfo *addrinfo;         /* Resolved address list */
@@ -1710,7 +1724,7 @@ struct pn_listener_t {
   psocket_t *psockets;          /* Array of listening sockets */
   size_t psockets_size;
   pcontext_t context;
-  std::queue<accept_result_t *> *accept_results;  // completions awaiting a pn_listener_accept
+  std::queue<accept_result_t *> *pending_accepts;  // sockets awaiting a pn_listener_accept
   int pending_events;          // number of PN_LISTENER_ACCEPT events to be delivered
   pn_condition_t *condition;
   pn_collector_t *collector;
@@ -1718,9 +1732,7 @@ struct pn_listener_t {
   pn_record_t *attachments;
   void *listener_context;
   size_t backlog;
-  bool closing;
   bool close_dispatched;
-  bool running;
 };
 
 
@@ -1799,11 +1811,12 @@ class unique_socket {
 void do_complete(iocp_result_t *result) {
   iocpdesc_t *iocpd = result->iocpd;  // connect result gets deleted
   switch (result->type) {
-/* accept is now processed inline to do in parallel
+
   case IOCP_ACCEPT:
-    complete_accept((accept_result_t *) result, result->status);
+    /* accept is now processed inline to do in parallel except on teardown */
+    assert(iocpd->closing);
+    complete_accept((accept_result_t *) result, result->status);  // free's result and retires new_sock
     break;
-*/
   case IOCP_CONNECT:
     complete_connect((connect_result_t *) result, result->status);
     break;
@@ -1868,11 +1881,11 @@ static inline void proactor_wake_complete(pn_proactor_t *p) {
 }
 
 // Call wih lock held
-static void proactor_wakeup(pn_proactor_t *p) {
+static void proactor_wake(pn_proactor_t *p) {
   if (!p->context.working && !p->context.wake_pending) {
     p->context.wake_pending = true;
     p->context.completion_ops++;
-    post_completion(p->iocp, proactor_wakeup_key, p);
+    post_completion(p->iocp, proactor_wake_key, p);
   }
 }
 
@@ -1900,10 +1913,20 @@ class reaper {
         if (iocpd->closing) return false;
         bool rval = !iocpd->ops_in_progress;
         pni_iocp_begin_close(iocpd); // sets iocpd->closing
-        start_timer();
+        pn_decref(iocpd);            // may still be ref counted on zombie list
+        reap_timer();
         return rval;
     }
 
+    // For cases where the close will be immediate.  I.E. after a failed
+    // connection attempt where there is no follow-on IO.
+    void fast_reap(iocpdesc_t *iocpd) {
+        assert(iocpd && iocpd->ops_in_progress == 0 && !iocpd->closing);
+        csguard g(&lock_);
+        pni_iocp_begin_close(iocpd);
+        pn_decref(iocpd);
+    }
+
     bool process(iocp_result_t *result) {
         // No queue of completions for the reaper.  Just process
         // serialized by the lock assuming all actions are "short".
@@ -1911,13 +1934,15 @@ class reaper {
         // consumer/producer setup but just replace the reaper with a
         // multi threaded alternative.
         csguard g(&lock_);
+        iocpdesc_t *iocpd = result->iocpd;
         if (is_write_result(result)) {
             csguard wg(global_wlock_);
             do_complete(result);
         }
         else do_complete(result);
-        bool rval = (result->iocpd->ops_in_progress == 0);
-        pni_iocp_reap_check(result->iocpd);
+        // result may now be NULL
+        bool rval = (iocpd->ops_in_progress == 0);
+        pni_iocp_reap_check(iocpd);
         return rval;
     }
 
@@ -1933,12 +1958,12 @@ class reaper {
         csguard g(&lock_);
         DeleteTimerQueueTimer(timer_queue_, timer_, NULL);
         timer_ = NULL;
-        start_timer();
+        reap_timer();
     }
 
 
   private:
-    void start_timer() {
+    void reap_timer() {
         // Call with lock
         if (timer_)
             return;
@@ -2016,13 +2041,13 @@ static void psocket_error(psocket_t *ps, int err, const char* what) {
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
     pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
                                 what, ps->host, ps->port,
-                                errno_str2(err));
+                                errno_str2(err).c_str());
     pn_connection_driver_close(driver);
   } else {
     pn_listener_t *l = as_listener(ps);
     pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
                         what, ps->host, ps->port,
-                        errno_str2(err));
+                        errno_str2(err).c_str());
     listener_begin_close(l);
   }
 }
@@ -2045,12 +2070,23 @@ static void pconnection_finalize(void *vp_pconnection) {
 
 static const pn_class_t pconnection_class = PN_CLASS(pconnection);
 
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr) {
-  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
-  if (!pc) return NULL;
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr) {
+  {
+    csguard g(&p->bind_lock);
+    pn_record_t *r = pn_connection_attachments(c);
+    if (pn_record_get(r, PN_PROACTOR)) {
+      free(pc);
+      return "pn_connection_t already in use";
+    }
+    pn_record_def(r, PN_PROACTOR, &pconnection_class);
+    pn_record_set(r, PN_PROACTOR, pc);
+    pc->bound = true;
+    pc->can_wake = true;
+  }
+
   if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     free(pc);
-    return NULL;
+    return "pn_connection_driver_init failure";
   }
   pc->completion_queue = new std::queue<iocp_result_t *>();
   pc->work_queue = new std::queue<iocp_result_t *>();
@@ -2061,20 +2097,20 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo
   if (server) {
     pn_transport_set_server(pc->driver.transport);
   }
-  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
-  pn_record_def(r, PN_PROACTOR, &pconnection_class);
-  pn_record_set(r, PN_PROACTOR, pc);
+
   pn_decref(pc);                /* Will be deleted when the connection is */
-  return pc;
+  return NULL;
 }
 
+// Either stops a timer before firing or returns after the callback has 
+// completed (in the threadpool thread).  Never "in doubt".
 static bool stop_timer(HANDLE tqueue, HANDLE *timer) {
   if (!*timer) return true;
   if (DeleteTimerQueueTimer(tqueue, *timer, INVALID_HANDLE_VALUE)) {
     *timer = NULL;
     return true;
   }
-  return false;
+  return false;  // error
 }
 
 static bool start_timer(HANDLE tqueue, HANDLE *timer, WAITORTIMERCALLBACK cb, void *cb_arg, DWORD time) {
@@ -2082,7 +2118,7 @@ static bool start_timer(HANDLE tqueue, HANDLE *timer, WAITORTIMERCALLBACK cb, vo
     // TODO: log err
     return false;
   }
-  return CreateTimerQueueTimer(timer, tqueue, reap_check_cb, cb_arg, time, 0, WT_EXECUTEONLYONCE);
+  return CreateTimerQueueTimer(timer, tqueue, cb, cb_arg, time, 0, WT_EXECUTEONLYONCE);
 }
 
 VOID CALLBACK tick_timer_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
@@ -2132,9 +2168,10 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
   return NULL;
 }
 
+// Call with lock held when closing and transitioning away from working context
 static inline bool pconnection_can_free(pconnection_t *pc) {
   return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0
-    && pc->tick_timer == NULL && !pconnection_has_event(pc);
+    && !pc->stop_timer_required && !pconnection_has_event(pc) && !pc->queued_disconnect;
 }
 
 static void pconnection_final_free(pconnection_t *pc) {
@@ -2148,6 +2185,18 @@ static void pconnection_final_free(pconnection_t *pc) {
   /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
 }
 
+// Call with lock held or from forced shutdown
+static void pconnection_begin_close(pconnection_t *pc) {
+  if (!pc->context.closing) {
+    pc->context.closing = true;
+    pn_connection_driver_close(&pc->driver);
+    pc->stop_timer_required = true;
+    if (pc->context.proactor->reaper->add(pc->psocket.iocpd))
+      pc->psocket.iocpd = NULL;
+    wakeup(&pc->psocket);
+  }
+}
+
 // call with lock held.  return true if caller must call pconnection_final_free()
 static bool pconnection_cleanup(pconnection_t *pc) {
   delete pc->completion_queue;
@@ -2156,7 +2205,7 @@ static bool pconnection_cleanup(pconnection_t *pc) {
 }
 
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->completion_queue->size() || pc->wake_count || pc->tick_pending)
+  if (pc->completion_queue->size() || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
     return true;
   pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
   return (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE));
@@ -2207,33 +2256,45 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r
             return NULL;
           pc->context.working = true;
         }
-        open = pc->psocket.iocpd && !pc->psocket.iocpd->closing;  // && !reconnecting (TODO)
+        open = !pc->connecting && !pc->context.closing;
       }
       else {
+        // Just re-acquired lock after processing IO and engine work
         if (pconnection_has_event(pc))
           return &pc->batch;
 
-        if (!open && pconnection_can_free(pc)) {
-          if (pconnection_cleanup(pc)) {
-            g.release();
-            pconnection_final_free(pc);
-          }
-          return NULL;
-        }
-
         if (!pconnection_work_pending(pc)) {
           pc->context.working = false;
+          if (pn_connection_driver_finished(&pc->driver)) {
+            pconnection_begin_close(pc);
+          }
+          if (pc->context.closing && pconnection_can_free(pc)) {
+            if (pconnection_cleanup(pc)) {
+              g.release();
+              pconnection_final_free(pc);
+              return NULL;
+            } // else disconnect logic has the free obligation
+          }
           return NULL;
         }
       }
 
+      if (pc->queued_disconnect) {  // From pn_proactor_disconnect()
+        pc->queued_disconnect = false;
+        if (!pc->context.closing) {
+          if (pc->disconnect_condition) {
+            pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
+          }
+          pn_connection_driver_close(&pc->driver);
+        }
+      }
 
       assert(pc->work_queue->empty());
       if (pc->completion_queue->size())
           std::swap(pc->work_queue, pc->completion_queue);
 
       if (pc->wake_count) {
-        waking = true;
+        waking = open && pc->can_wake && !pn_connection_driver_finished(&pc->driver);
         pc->wake_count = 0;
       }
       if (pc->tick_pending) {
@@ -2250,8 +2311,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r
       result = (iocp_result_t *) pc->work_queue->front();
       pc->work_queue->pop();
       if (result->iocpd->closing) {
-        bool is_current = result->iocpd == pc->psocket.iocpd;  // in case of reconnect
-        if (pc->context.proactor->reaper->process(result) && is_current) {
+        if (pc->context.proactor->reaper->process(result)) {
           pc->psocket.iocpd = NULL;  // reaped
           open = false;
         }
@@ -2263,13 +2323,21 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r
         }
         else if (is_connect_result(result)) {
           connect_step_done(pc, (connect_result_t *) result);
-          open = pc->psocket.iocpd && (pc->psocket.iocpd->events & PN_WRITABLE);  // && !reconnecting (TODO)
+          open = pc->psocket.iocpd && (pc->psocket.iocpd->events & PN_WRITABLE);
+          if (open)
+            pc->connecting = false;
         }
         else do_complete(result);
       }
     }
 
-    if (open) {
+    if (!open) {
+      if (pc->stop_timer_required) {
+        pc->stop_timer_required = false;
+        // Do without context lock to avoid possible deadlock
+        stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer);
+      }
+    } else {
 
       pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
       pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
@@ -2292,6 +2360,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *r
       if (rbuf.size > 0 && !pc->psocket.iocpd->read_in_progress) {
         bool wouldblock;
         ssize_t n = pni_iocp_recv(pc->psocket.iocpd, rbuf.start, rbuf.size, &wouldblock, pc->psocket.iocpd->error);
+
         if (n > 0) {
           pn_connection_driver_read_done(&pc->driver, n);
           pconnection_tick(pc);         /* check for tick changes. */
@@ -2341,17 +2410,24 @@ static void pconnection_done(pconnection_t *pc) {
     if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
       wakeup(&pc->psocket);
     } else if (pn_connection_driver_finished(&pc->driver)) {
-      if (pc->context.proactor->reaper->add(pc->psocket.iocpd))
-        pc->psocket.iocpd = NULL;
-      pn_connection_driver_close(&pc->driver);
-      g.release(); // possible deadlock
-      stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer);
-      csguard g2(&pc->context.cslock);
+      pconnection_begin_close(pc);
       wakeup(&pc->psocket);
     }
   }
 }
 
+static inline bool is_inactive(pn_proactor_t *p) {
+  return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->need_timeout && !p->shutting_down);
+}
+
+// Call whenever transitioning from "definitely active" to "maybe inactive"
+static void wake_if_inactive(pn_proactor_t *p) {
+  if (is_inactive(p)) {
+    p->need_inactive = true;
+    proactor_wake(p);
+  }
+}
+
 void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   if (pc) {
@@ -2367,14 +2443,22 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
   if (bp == p) {
     csguard g(&p->context.cslock);
     p->context.working = false;
-    proactor_wakeup(p);
+    if (p->delayed_interrupt) {
+      p->delayed_interrupt = false;
+      p->need_interrupt = true;
+    }
+    if (p->timeout_processed) {
+      p->timeout_processed = false;
+      wake_if_inactive(p);
+    }
+    if (proactor_update_batch(p))
+      proactor_wake(p);
     return;
   }
 }
 
 static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) {
   pn_collector_put(p->collector, pn_proactor__class(), p, t);
-  p->context.working = true;
 }
 
 static pn_event_batch_t *proactor_process(pn_proactor_t *p) {
@@ -2404,24 +2488,26 @@ static pn_event_batch_t *psocket_process(psocket_t *ps, iocp_result_t *result, r
   return NULL;
 }
 
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+static pn_event_batch_t *proactor_completion_loop(struct pn_proactor_t* p, bool can_block) {
   // Proact! Process inbound completions of async activity until one
   // of them provides a batch of events.
   while(true) {
     pn_event_batch_t *batch = NULL;
 
-    DWORD win_timeout = INFINITE;
+    DWORD win_timeout = can_block ? INFINITE : 0;
     DWORD num_xfer = 0;
     ULONG_PTR completion_key = 0;
     OVERLAPPED *overlapped = 0;
 
     bool good_op = GetQueuedCompletionStatus (p->iocp->completion_port, &num_xfer,
                                               &completion_key, &overlapped, win_timeout);
+    if (!overlapped && !can_block && GetLastError() == WAIT_TIMEOUT)
+      return NULL;  // valid timeout
+
     if (!good_op && !overlapped) {
       // Should never happen.  shutdown?
       // We aren't expecting a timeout, closed completion port, or other error here.
-      // Logger?
-      fprintf(stderr, errno_str("Windows Proton proactor internal failure %d\n", false).c_str());
+      pn_logf("%s", errno_str("Windows Proton proactor internal failure\n", false).c_str());
       abort();
     }
 
@@ -2439,7 +2525,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
       // data structure for our own use.
       if (completion_key == psocket_wakeup_key)
         batch = psocket_process((psocket_t *) overlapped, NULL, p->reaper);
-      else if (completion_key == proactor_wakeup_key)
+      else if (completion_key == proactor_wake_key)
         batch = proactor_process((pn_proactor_t *) overlapped);
       else if (completion_key == recycle_accept_key)
         recycle_result((accept_result_t *) overlapped);
@@ -2449,26 +2535,71 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   }
 }
 
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+  return proactor_completion_loop(p, true);
+}
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+  return proactor_completion_loop(p, false);
+}
 
 void pn_proactor_interrupt(pn_proactor_t *p) {
   csguard g(&p->context.cslock);
-  p->interrupt = true;
-  proactor_wakeup(p);
+  if (p->context.working)
+    p->delayed_interrupt = true;
+  else
+    p->need_interrupt = true;
+  proactor_wake(p);
 }
 
-// runs on a threadpool thread
+// runs on a threadpool thread.  Must not hold timer_lock.
 VOID CALLBACK timeout_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
   pn_proactor_t *p = (pn_proactor_t *) arg;
+  csguard gtimer(&p->timer_lock);
   csguard g(&p->context.cslock);
-  p->timeout_request = true;
-  proactor_wakeup(p);
+  if (p->timeout_set)
+    p->need_timeout = true;  // else cancelled
+  p->timeout_set = false;
+  if (p->need_timeout)
+    proactor_wake(p);
 }
 
 void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
-  stop_timer(p->timer_queue, &p->timeout_timer);
-  csguard g(&p->context.cslock);
-  p->timeout_request = false;
-  start_timer(p->timer_queue, &p->timeout_timer, timeout_cb, p, t);
+  bool ticking = false;
+  csguard gtimer(&p->timer_lock);
+  {
+    csguard g(&p->context.cslock);
+    ticking = (p->timeout_timer != NULL);
+    if (t == 0) {
+      p->need_timeout = true;
+      p->timeout_set = false;
+      proactor_wake(p);
+    }
+    else
+      p->timeout_set = true;
+  }
+  // Just timer_lock held
+  if (ticking) {
+    stop_timer(p->timer_queue, &p->timeout_timer);
+  }
+  if (t) {
+    start_timer(p->timer_queue, &p->timeout_timer, timeout_cb, p, t);
+  }
+}
+
+void pn_proactor_cancel_timeout(pn_proactor_t *p) {
+  bool ticking = false;
+  csguard gtimer(&p->timer_lock);
+  {
+    csguard g(&p->context.cslock);
+    p->timeout_set = false;
+    ticking = (p->timeout_timer != NULL);
+  }
+  if (ticking) {
+    stop_timer(p->timer_queue, &p->timeout_timer);
+    csguard g(&p->context.cslock);
+    wake_if_inactive(p);
+  }
 }
 
 // Return true if connect_step_done()will handle connection status
@@ -2479,50 +2610,52 @@ static bool connect_step(pconnection_t *pc) {
     pc->ai = pc->ai->ai_next; /* Move to next address in case this fails */
     unique_socket fd(::socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol));
     if (fd != INVALID_SOCKET) {
-      pni_configure_sock_2(fd);
-      if (!pc->psocket.iocpd) {
+      // Windows ConnectEx requires loosely bound socket.
+      sockaddr_storage sa;
+      memset(&sa, 0, sizeof(sa));
+      sa.ss_family = ai->ai_family;
+      if (!bind(fd, (SOCKADDR *) &sa, ai->ai_addrlen)) {
+        pni_configure_sock_2(fd);
         pc->psocket.iocpd = pni_iocpdesc_create(p->iocp, fd);
         assert(pc->psocket.iocpd);
         pc->psocket.iocpd->write_closed = true;
         pc->psocket.iocpd->read_closed = true;
-      }
-      else
-        pc->psocket.iocpd->socket = fd;
-      fd.release();
-      iocpdesc_t *iocpd = pc->psocket.iocpd;
-      if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) {
-        LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex2(iocpd->socket);
-        // addrinfo is owned by the pconnection so pass NULL to the connect result
-        connect_result_t *result = connect_result(iocpd, NULL);
-        DWORD unused;
-        bool success = fn_connect_ex(iocpd->socket, ai->ai_addr, ai->ai_addrlen,
-                                     NULL, 0, &unused, (LPOVERLAPPED) result);
-        if (success || WSAGetLastError() == ERROR_IO_PENDING) {
-          iocpd->ops_in_progress++;
-          iocpd->active_completer = &pc->psocket;
-          return true;  // logic resumes at connect_step_done()
+        fd.release();
+        iocpdesc_t *iocpd = pc->psocket.iocpd;
+        if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) {
+          LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex2(iocpd->socket);
+          // addrinfo is owned by the pconnection so pass NULL to the connect result
+          connect_result_t *result = connect_result(iocpd, NULL);
+          DWORD unused;
+          bool success = fn_connect_ex(iocpd->socket, ai->ai_addr, ai->ai_addrlen,
+                                       NULL, 0, &unused, (LPOVERLAPPED) result);
+          if (success || WSAGetLastError() == ERROR_IO_PENDING) {
+            iocpd->ops_in_progress++;
+            iocpd->active_completer = &pc->psocket;
+            return true;  // logic resumes at connect_step_done()
+          }
+          pn_free(result);
         }
-        pn_free(result);
       }
-      closesocket(pc->psocket.iocpd->socket);  // try again
-      pc->psocket.iocpd->socket = INVALID_SOCKET;
+      if (pc->psocket.iocpd) {
+        pc->context.proactor->reaper->fast_reap(pc->psocket.iocpd);
+        pc->psocket.iocpd = NULL;
+      }
     }
   }
+  pc->context.closing = true;
   return false;
 }
 
 static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
   csguard g(&pc->context.cslock);
-  if (result->base.status && pc->ai) {
-    if (connect_step(pc)) {
-      // Trying the next addrinfo possibility
-      g.release();
-      pn_free(result);
-      return;
-    }
-  }
-  complete_connect(result, result->base.status);  // starts reading if open, frees result
-  if (!result->base.status) {
+  DWORD saved_status = result->base.status;
+  iocpdesc_t *iocpd = result->base.iocpd;
+  iocpd->ops_in_progress--;
+  assert(pc->psocket.iocpd == iocpd);
+  complete_connect(result, result->base.status);  // frees result, starts regular IO if connected
+
+  if (!saved_status) {
     // Success
     pc->psocket.iocpd->write_closed = false;
     pc->psocket.iocpd->read_closed = false;
@@ -2533,15 +2666,34 @@ static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
     pc->ai = NULL;
     return;
   }
-  psocket_error(&pc->psocket, GetLastError(), "connect to ");
-  wakeup(&pc->psocket);
+  else {
+    // Descriptor will never be used.  Dispose.
+    // Connect failed, no IO started, i.e. no pending iocpd based events
+    pc->context.proactor->reaper->fast_reap(iocpd);
+    pc->psocket.iocpd = NULL;
+    // Is there a next connection target in the addrinfo to try?
+    if (pc->ai && connect_step(pc)) {
+      // Trying the next addrinfo possibility.  Will return here.
+      return;
+    }
+    // Give up
+    psocket_error(&pc->psocket, saved_status, "connect to ");
+    pc->context.closing = true;
+    wakeup(&pc->psocket);
+  }
 }
 
 void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
-  pconnection_t *pc = new_pconnection_t(p, c, false, addr);
+  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
+  const char *err = pconnection_setup(pc, p, c, false, addr);
+  if (err) {
+    pn_logf("pn_proactor_connect failure: %s", err);
+    return;
+  }
   // TODO: check case of proactor shutting down
   csguard g(&pc->context.cslock);
+  pc->connecting = true;
   proactor_add(&pc->context);
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
@@ -2559,6 +2711,24 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr)
   }
 }
 
+void pn_proactor_release_connection(pn_connection_t *c) {
+  bool notify = false;
+  pconnection_t *pc = get_pconnection(c);
+  if (pc) {
+    csguard g(&pc->context.cslock);
+    // reverse lifecycle entanglement of pc and c from new_pconnection_t()
+    pn_incref(pc);
+    pn_proactor_t *p = pc->context.proactor;
+    csguard g2(&p->bind_lock);
+    pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+    pn_record_set(r, PN_PROACTOR, NULL);
+    pn_connection_driver_release_connection(&pc->driver);
+    pc->bound = false;  // Transport unbound
+    g2.release();
+    pconnection_begin_close(pc);
+  }
+}
+
 void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog)
 {
   csguard g(&l->context.cslock);
@@ -2598,11 +2768,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
                 psocket_t *ps = &l->psockets[l->psockets_size++];
                 psocket_init(ps, l, false, addr);
                 ps->iocpd = iocpd;
+                iocpd->is_mp = true;
                 iocpd->active_completer = ps;
                 pni_iocpdesc_start(ps->iocpd);
-                // TODO: make configurable or able to grow with load
-                for (int i=0; i < 4; i++)
-                  begin_accept(ps->iocpd->acceptor, NULL);
               }
             }
       }
@@ -2630,11 +2798,11 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in
 static pn_event_batch_t *batch_owned(pn_listener_t *l) {
   if (l->close_dispatched) return NULL;
   if (!l->context.working) {
-    if (pn_collector_peek(l->collector)) {
+    if (listener_has_event(l)) {
       l->context.working = true;
       return &l->batch;
     }
-    assert(!(l->closing && l->pending_events));
+    assert(!(l->context.closing && l->pending_events));
     if (l->pending_events) {
       pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
       l->pending_events--;
@@ -2654,42 +2822,68 @@ static void listener_close_all(pn_listener_t *l) {
   }
 }
 
-static bool listener_maybe_free(pn_listener_t *l, csguard *guard) {
-  fprintf(stderr, "fixme: dangling listener_maybe_free.  aborting\n");
-  fflush(stderr);
-  abort();
-  // TODO: revised mechanism that works with disconnect and proactor_remove
+static bool listener_can_free(pn_listener_t *l) {
+  if (!l->close_dispatched) return false;
+  if (l->context.working || l->context.completion_ops) return false;
+  for (size_t i = 0; i < l->psockets_size; ++i) {
+    psocket_t *ps = &l->psockets[i];
+    if (ps->iocpd)
+      return false;
+  }
+  return true;
+}
+
+/* Call with lock not held */
+static inline void listener_final_free(pn_listener_t *l) {
+  pcontext_finalize(&l->context);
+  free(l->psockets);
+  if (l->collector) pn_collector_free(l->collector);
+  if (l->condition) pn_condition_free(l->condition);
+  if (l->attachments) pn_free(l->attachments);
+  free(l);
+}
+
+/* Call with listener lock held by lg.*/
+static void internal_listener_free(pn_listener_t *l, csguard &g) {
+  bool can_free = true;
+  if (l->context.proactor) {
+    can_free = proactor_remove(&l->context);
+  }
+  g.release();
+  if (can_free)
+    listener_final_free(l);
+  // else final free is done by proactor_disconnect()
+}
+
+static bool listener_maybe_free(pn_listener_t *l, csguard &g) {
+  if (listener_can_free(l)) {
+    internal_listener_free(l, g);
+    return true;
+  }
   return false;
 }
 
 static pn_event_batch_t *listener_process(pn_listener_t *l, iocp_result_t *result) {
   accept_result_t *accept_result = NULL;
-  bool reaping = false;
   psocket_t *ps = NULL;
   {
     csguard g(&l->context.cslock);
-    if (l->closing) {
-      listener_close_all(l);
-      reaping = true;
-    }
     if (!result) {
-      ps = l->psockets;
       wake_complete(&l->context);
-      if (listener_maybe_free(l, &g)) return NULL;
+      if (listener_maybe_free(l, g)) return NULL;
       return batch_owned(l);
     }
     else
       ps = (psocket_t *) result->iocpd->active_completer;
 
-    if (!reaping && result->status) {
-      psocket_error(ps, WSAGetLastError(), "listen on ");
-      listener_close_all(l);
-      reaping = true;
+    if (!l->context.closing && result->status) {
+      psocket_error(ps, WSAGetLastError(), "listen on "); // initiates close/multi-reap
     }
-    if (reaping) {
-      if (l->context.proactor->reaper->process(result))
+    if (l->context.closing) {
+      if (l->context.proactor->reaper->process(result)) {
         ps->iocpd = NULL;
-      if (listener_maybe_free(l, &g)) return NULL;
+        if (listener_maybe_free(l, g)) return NULL;
+      }
       return batch_owned(l);
     }
 
@@ -2712,12 +2906,17 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, iocp_result_t *resul
   {
     csguard g(&l->context.cslock);
     l->context.completion_ops--;
+    accept_result->new_sock->ops_in_progress--;
+    ps->iocpd->ops_in_progress--;
+
     // add even if closing to reuse cleanup code
-    l->accept_results->push(accept_result);
+    l->pending_accepts->push(accept_result);
+    if (!ps->iocpd->ops_in_progress)
+      begin_accept(ps->iocpd->acceptor, NULL);  // Start another, up to IOCP_MAX_ACCEPTS
     l->pending_events++;
-    if (l->closing)
+    if (l->context.closing)
       release_pending_accepts(l);
-    if (listener_maybe_free(l, &g)) return NULL;
+    if (listener_maybe_free(l, g)) return NULL;
     return batch_owned(l);
   }
 }
@@ -2730,8 +2929,10 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
 void pn_connection_wake(pn_connection_t* c) {
   pconnection_t *pc = get_pconnection(c);
   csguard g(&pc->context.cslock);
-  pc->wake_count++;
-  wakeup(&pc->psocket);
+  if (!pc->context.closing) {
+    pc->wake_count++;
+    wakeup(&pc->psocket);
+  }
 }
 
 pn_proactor_t *pn_proactor() {
@@ -2753,6 +2954,10 @@ pn_proactor_t *pn_proactor() {
       if (!WSAStartup(wsa_ver, &unused)) {
         wsa = true;
         if (iocp = pni_iocp()) {
+          InitializeCriticalSectionAndSpinCount(&p->context.cslock, 4000);
+          InitializeCriticalSectionAndSpinCount(&p->write_lock, 4000);
+          InitializeCriticalSectionAndSpinCount(&p->timer_lock, 4000);
+          InitializeCriticalSectionAndSpinCount(&p->bind_lock, 4000);
           try {
             r = new reaper(p, &p->write_lock, iocp);
             // success
@@ -2761,15 +2966,13 @@ pn_proactor_t *pn_proactor() {
             p->batch.next_event = &proactor_batch_next;
             p->collector = c;
             p->timer_queue = tq;
-            InitializeCriticalSectionAndSpinCount(&p->context.cslock, 4000);
-            InitializeCriticalSectionAndSpinCount(&p->write_lock, 4000);
             return p;
           } catch (...) {}
         }
       }
     }
   }
-  fprintf(stderr, errno_str("Windows Proton proactor OS resource failure %d\n", false).c_str());
+  fprintf(stderr, "%s\n", errno_str("Windows Proton proactor OS resource failure", false).c_str());
   if (iocp) pn_free((void *) iocp);
   if (wsa) WSACleanup();
   free(c);
@@ -2780,7 +2983,10 @@ pn_proactor_t *pn_proactor() {
 
 void pn_proactor_free(pn_proactor_t *p) {
   DeleteTimerQueueEx(p->timer_queue, INVALID_HANDLE_VALUE);
+  DeleteCriticalSection(&p->timer_lock);
+  DeleteCriticalSection(&p->bind_lock);
   proactor_shutdown(p);
+
   delete p->reaper;
   WSACleanup();
   pn_collector_free(p->collector);
@@ -2791,14 +2997,14 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
   {
     csguard g(&l->context.cslock);
-    if (!pn_collector_peek(l->collector) && l->pending_events > 0) {
+    if (!listener_has_event(l) && l->pending_events) {
       pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
       l->pending_events--;
     }
     pn_event_t *e = pn_collector_next(l->collector);
     if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
       l->close_dispatched = true;
-    return e;
+    return log_event(l, e);
   }
 }
 
@@ -2807,11 +3013,11 @@ static void listener_done(pn_listener_t *l) {
     csguard g(&l->context.cslock);
     l->context.working = false;
     if (l->close_dispatched) {
-      listener_maybe_free(l, &g);
+      listener_maybe_free(l, g);
       return;
     }
     else
-      if (pn_collector_peek(l->collector))
+      if (listener_has_event(l))
         wakeup(l->psockets);
   }
 }
@@ -2823,7 +3029,7 @@ pn_listener_t *pn_listener() {
     l->collector = pn_collector();
     l->condition = pn_condition();
     l->attachments = pn_record();
-    l->accept_results = new std::queue<accept_result_t *>();
+    l->pending_accepts = new std::queue<accept_result_t *>();
     if (!l->condition || !l->collector || !l->attachments) {
       pn_listener_free(l);
       return NULL;
@@ -2834,47 +3040,27 @@ pn_listener_t *pn_listener() {
   return l;
 }
 
-static bool listener_can_free(pn_listener_t *l) {
-  if (!l->close_dispatched) return false;
-  if (!l->running) return true;
-  if (l->context.working) return false;
-  for (size_t i = 0; i < l->psockets_size; ++i) {
-    psocket_t *ps = &l->psockets[i];
-    if (l->context.completion_ops || ps->iocpd)
-      return false;
-  }
-  return true;
-}
-
-static inline void listener_final_free(pn_listener_t *l) {
-  pcontext_finalize(&l->context);
-  free(l->psockets);
-  free(l);
-}
-
 void pn_listener_free(pn_listener_t *l) {
-  /* Note at this point either the listener has never been used (freed by user)
-     or it has been closed, so all its sockets are closed.
+  /* Note at this point either the listener has never been used (freed
+     by user) or it has been closed, and all pending operations
+     completed, i.e. listener_can_free() is true.
   */
   if (l) {
-    bool can_free = true;
-    if (l->collector) pn_collector_free(l->collector);
-    if (l->condition) pn_condition_free(l->condition);
-    if (l->attachments) pn_free(l->attachments);
     csguard g(&l->context.cslock);
     if (l->context.proactor) {
-      can_free = proactor_remove(&l->context);
+      internal_listener_free(l, g);
+      return;
     }
+    // freed by user
     g.release();
-    if (can_free)
-      listener_final_free(l);
+    listener_final_free(l);
   }
 }
 
 static void listener_begin_close(pn_listener_t* l) {
-  if (l->closing)
+  if (l->context.closing)
     return;
-  l->closing = true;
+  l->context.closing = true;
   listener_close_all(l);
   release_pending_accepts(l);
   pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
@@ -2882,10 +3068,8 @@ static void listener_begin_close(pn_listener_t* l) {
 
 void pn_listener_close(pn_listener_t* l) {
   csguard g(&l->context.cslock);
-  if (!l->context.closing) {
-    listener_begin_close(l);
-    wakeup(&l->psockets[0]);
-  }
+  listener_begin_close(l);
+  wakeup(&l->psockets[0]);
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
@@ -2910,9 +3094,9 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 
 static void release_pending_accepts(pn_listener_t *l) {
   // called with lock held or at shutdown
-  while (!l->accept_results->empty()) {
-    accept_result_t *accept_result = l->accept_results->front();
-    l->accept_results->pop();
+  while (!l->pending_accepts->empty()) {
+    accept_result_t *accept_result = l->pending_accepts->front();
+    l->pending_accepts->pop();
     psocket_t *ps = (psocket_t *) accept_result->base.iocpd->active_completer;
     accept_result->new_sock->ops_in_progress--;
     ps->iocpd->ops_in_progress--;
@@ -2927,16 +3111,15 @@ static void recycle_result(accept_result_t *accept_result) {
   pn_listener_t *l = ps->listener;
   reset_accept_result(accept_result);
   accept_result->new_sock = create_same_type_socket(ps->iocpd);
-  accept_result->new_sock->is_mp = true;
   {
     csguard g(&l->context.cslock);
-    if (l->closing) {
+    if (l->context.closing && accept_result->new_sock) {
       closesocket(accept_result->new_sock->socket);
       accept_result->new_sock->socket = INVALID_SOCKET;
     }
     begin_accept(ps->iocpd->acceptor, accept_result);  // cleans up if closing
     l->context.completion_ops--;
-    if (l->closing && listener_maybe_free(l, &g))
+    if (l->context.closing && listener_maybe_free(l, g))
       return;
   }
 }
@@ -2949,20 +3132,23 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
 
   {
     csguard g(&l->context.cslock);
-    pconnection_t *pc = new_pconnection_t(p, c, false, "");
+    pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
     assert(pc);  // TODO: memory safety
+    const char *err_str = pconnection_setup(pc, p, c, true, "");
+    if (err_str) {
+      pn_logf("pn_listener_accept failure: %s", err_str);
+      return;
+    }
     proactor_add(&pc->context);
 
-    if (l->closing)
+    if (l->context.closing)
       err = WSAESHUTDOWN;
-    else if (l->accept_results->empty())
+    else if (l->pending_accepts->empty())
       err = WSAEWOULDBLOCK;
     else {
-      accept_result = l->accept_results->front();
-      l->accept_results->pop();
+      accept_result = l->pending_accepts->front();
+      l->pending_accepts->pop();
       ps = (psocket_t *) accept_result->base.iocpd->active_completer;
-      accept_result->new_sock->ops_in_progress--;
-      ps->iocpd->ops_in_progress--;
       l->context.completion_ops++; // for recycle_result
       iocpdesc_t *conn_iocpd = accept_result->new_sock;
       pc->psocket.iocpd = conn_iocpd;
@@ -2984,20 +3170,20 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
 // Call with lock held.  Leave unchanged if events pending.
 // Return true if there is an event in the collector
 static bool proactor_update_batch(pn_proactor_t *p) {
-  if (pn_collector_peek(p->collector))
+  if (proactor_has_event(p))
     return true;
-  if (p->timeout_elapsed) {
-    p->timeout_elapsed = false;
+  if (p->need_timeout) {
+    p->need_timeout = false;
     proactor_add_event(p, PN_PROACTOR_TIMEOUT);
     return true;
   }
-  if (p->interrupt) {
-    p->interrupt = false;
+  if (p->need_interrupt) {
+    p->need_interrupt = false;
     proactor_add_event(p, PN_PROACTOR_INTERRUPT);
     return true;
   }
-  if (p->inactive) {
-    p->inactive = false;
+  if (p->need_inactive) {
+    p->need_inactive = false;
     proactor_add_event(p, PN_PROACTOR_INACTIVE);
     return true;
   }
@@ -3006,12 +3192,15 @@ static bool proactor_update_batch(pn_proactor_t *p) {
 
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
   pn_proactor_t *p = batch_proactor(batch);
-  pn_collector_t *c = p->collector;
-  if (!pn_collector_peek(c)) {
+  pn_event_t *e = pn_collector_next(p->collector);
+  if (!e) {
     csguard g(&p->context.cslock);
     proactor_update_batch(p);
+    e = pn_collector_next(p->collector);
   }
-  return pn_collector_next(c);
+  if (e && pn_event_type(e) == PN_PROACTOR_TIMEOUT)
+    p->timeout_processed = true;
+  return log_event(p, e);
 }
 
 static void proactor_add(pcontext_t *ctx) {
@@ -3051,17 +3240,15 @@ static bool proactor_remove(pcontext_t *ctx) {
       ctx->next->prev = ctx->prev;
     }
   }
-  proactor_wakeup(p);
+  wake_if_inactive(p);
   return can_free;
 }
 
 static void pconnection_forced_shutdown(pconnection_t *pc) {
-  // Called by proactor_free, no competing threads, no iocp activity.
+  // Called by proactor_free, no competing threads processing iocp activity.
+  pconnection_begin_close(pc);
+  // Timer threads may lurk. No lock held, so no deadlock risk
   stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer);
-  if (pc->psocket.iocpd) {
-    pc->context.proactor->reaper->add(pc->psocket.iocpd);
-    pc->psocket.iocpd = NULL;
-  }
   pconnection_final_free(pc);
 }
 
@@ -3095,8 +3282,8 @@ static void proactor_shutdown(pn_proactor_t *p) {
 }
 
 void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
-#ifdef todo_for_0_18
-  // TODO: fix mutexes below from epoll to cs locks
+  pcontext_t *disconnecting_pcontexts = NULL;
+  pcontext_t *ctx = NULL;
   {
     csguard g(&p->context.cslock);
     // Move the whole contexts list into a disconnecting state
@@ -3117,15 +3304,14 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
     return;
 
   // Second pass: different locking, close the pcontexts, free them if !disconnect_ops
-  bool notify = false;
   for (ctx = disconnecting_pcontexts; ctx; ctx = ctx ? ctx->next : NULL) {
     bool do_free = false;
-    bool ctx_notify = true;
-    pmutex *ctx_mutex = NULL;
     pconnection_t *pc = pcontext_pconnection(ctx);
+    pn_listener_t *l = pc ? NULL : pcontext_listener(ctx);
+    CRITICAL_SECTION *ctx_cslock = pc ? &pc->context.cslock : &l->context.cslock;
+    csguard ctx_guard(ctx_cslock);
     if (pc) {
-      ctx_mutex = &pc->context.mutex;
-      lock(ctx_mutex);
+      pc->can_wake = false;
       if (!ctx->closing) {
         if (ctx->working) {
           // Must defer
@@ -3145,10 +3331,8 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
         }
       }
     } else {
-      pn_listener_t *l = pcontext_listener(ctx);
+
       assert(l);
-      ctx_mutex = &l->context.mutex;
-      lock(ctx_mutex);
       if (!ctx->closing) {
         if (cond) {
           pn_condition_copy(pn_listener_condition(l), cond);
@@ -3157,46 +3341,23 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
       }
     }
 
-    lock(&p->context.mutex);
+    csguard p_guard(&p->context.cslock);
     if (--ctx->disconnect_ops == 0) {
       do_free = true;
-      ctx_notify = false;
-      notify = wake_if_inactive(p);
+      wake_if_inactive(p);
     } else {
       // If initiating the close, wake the pcontext to do the free.
-      if (ctx_notify)
-        ctx_notify = wake(ctx);
+      wakeup(pc ? &pc->psocket : l->psockets);
     }
-    unlock(&p->context.mutex);
-    unlock(ctx_mutex);
+    p_guard.release();
+    ctx_guard.release();
 
     if (do_free) {
       if (pc) pconnection_final_free(pc);
       else listener_final_free(pcontext_listener(ctx));
-    } else {
-      if (ctx_notify)
-        wake_notify(ctx);
     }
   }
-  if (notify)
-    wake_notify(&p->context);
-#endif
-}
-
-
-//TODO!!
-void pn_proactor_release_connection(pn_connection_t *c) {
-  abort();
-}
-void pn_proactor_cancel_timeout(pn_proactor_t *p) {
-  abort();
 }
-pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
-  abort();
-  return NULL;
-}
-
-
 
 
 static int pni2_snprintf(char *buf, size_t count, const char *fmt, ...);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message