qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [32/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree
Date Tue, 03 Jul 2018 22:13:21 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/proactor/win_iocp.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c
new file mode 100644
index 0000000..e41ac5b
--- /dev/null
+++ b/c/src/proactor/win_iocp.c
@@ -0,0 +1,3429 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/connection_driver.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/object.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <queue>
+#include <list>
+
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include <iostream>
+#include <sstream>
+
+#include "./netaddr-internal.h" /* Include after socket/inet headers */
+
+/*
+ * Proactor for Windows using IO completion ports.
+ *
+ * There is no specific threading code other than locking and indirect (and
+ * brief) calls to the default threadpool via timerqueue timers.
+ *
+ * The IOCP system remembers what threads have called pn_proactor_wait and
+ * assumes they are doing proactor work until they call wait again.  Using a
+ * dedicated threadpool to drive the proactor, as in the examples, is
+ * recommended.  The use of other long-lived threads to occasionally drive the
+ * proactor will hurt the completion port scheduler and cause fewer threads to
+ * run.
+ *
+ * Each proactor connection maintains its own queue of pending completions and
+ * its work is serialized on that.  The proactor listener runs parallel where
+ * possible, but its event batch is serialized.
+ */
+
+// TODO: make all code C++ or all C90-ish
+//       change INACTIVE to be from begin_close instead of zombie reap, to be more like Posix
+//       make the global write lock window much smaller
+//       2 exclusive write buffers per connection
+//       make the zombie processing thread safe
+//       configurable completion port concurrency
+//       proton objects and ref counting: just say no.
+
+
+
+// From selector.h
+#define PN_READABLE (1)
+#define PN_WRITABLE (2)
+#define PN_EXPIRED (4)
+#define PN_ERROR (8)
+
+typedef SOCKET pn_socket_t;
+
+
+namespace pn_experimental {
+// Code borrowed from old messenger/reactor io.c iocp.c write_pipeline.c select.c
+// Mostly unchanged except fro some thread safety (i.e. dropping the old descriptor map)
+
+typedef struct pni_acceptor_t pni_acceptor_t;
+typedef struct write_result_t write_result_t;
+typedef struct read_result_t read_result_t;
+typedef struct write_pipeline_t write_pipeline_t;
+typedef struct iocpdesc_t iocpdesc_t;
+typedef struct iocp_t iocp_t;
+
+
+// One per completion port.
+
+struct iocp_t {
+  HANDLE completion_port;
+  pn_list_t *zombie_list;
+  unsigned shared_pool_size;
+  char *shared_pool_memory;
+  write_result_t **shared_results;
+  write_result_t **available_results;
+  size_t shared_available_count;
+  size_t writer_count;
+  int loopback_bufsize;
+  bool iocp_trace;
+};
+
+// One for each socket.
+// It should remain ref counted in the
+// zombie_list until ops_in_progress == 0 or the completion port is closed.
+
+struct iocpdesc_t {
+  pn_socket_t socket;
+  iocp_t *iocp;
+  void *active_completer;
+  pni_acceptor_t *acceptor;
+  pn_error_t *error;
+  int ops_in_progress;
+  bool read_in_progress;
+  write_pipeline_t *pipeline;
+  read_result_t *read_result;
+  bool bound;          // associated with the completion port
+  bool closing;        // close called by application
+  bool read_closed;    // EOF or read error
+  bool write_closed;   // shutdown sent or write error
+  bool poll_error;     // flag posix-like POLLERR/POLLHUP/POLLNVAL
+  bool is_mp;          // Special multi thread care required
+  bool write_blocked;
+  int events;
+  pn_timestamp_t reap_time;
+};
+
+
+// Max number of overlapped accepts per listener.  TODO: configurable.
+#define IOCP_MAX_ACCEPTS 4
+
+// AcceptEx squishes the local and remote addresses and optional data
+// all together when accepting the connection. Reserve enough for
+// IPv6 addresses, even if the socket is IPv4. The 16 bytes padding
+// per address is required by AcceptEx.
+#define IOCP_SOCKADDRMAXLEN (sizeof(sockaddr_in6) + 16)
+#define IOCP_SOCKADDRBUFLEN (2 * IOCP_SOCKADDRMAXLEN)
+
+typedef enum { IOCP_ACCEPT, IOCP_CONNECT, IOCP_READ, IOCP_WRITE } iocp_type_t;
+
+typedef struct {
+  OVERLAPPED overlapped;
+  iocp_type_t type;
+  iocpdesc_t *iocpd;
+  HRESULT status;
+  DWORD num_transferred;
+} iocp_result_t;
+
+struct read_result_t {
+  iocp_result_t base;
+  size_t drain_count;
+  char unused_buf[1];
+};
+
+struct write_result_t {
+  iocp_result_t base;
+  size_t requested;
+  bool in_use;
+  pn_bytes_t buffer;
+};
+
+typedef struct {
+  iocp_result_t base;
+  char address_buffer[IOCP_SOCKADDRBUFLEN];
+  struct addrinfo *addrinfo;
+} connect_result_t;
+
+typedef struct {
+  iocp_result_t base;
+  iocpdesc_t *new_sock;
+  char address_buffer[IOCP_SOCKADDRBUFLEN];
+  DWORD unused;
+} accept_result_t;
+
+
+void complete_connect(connect_result_t *result, HRESULT status);
+void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status);
+void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status);
+void start_reading(iocpdesc_t *iocpd);
+void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result);
+void reset_accept_result(accept_result_t *result);
+iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd);
+void pni_iocp_reap_check(iocpdesc_t *iocpd);
+connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr);
+iocpdesc_t *pni_iocpdesc_create(iocp_t *, pn_socket_t s);
+iocpdesc_t *pni_deadline_desc(iocp_t *);
+void pni_iocpdesc_start(iocpdesc_t *iocpd);
+void pni_iocp_drain_completions(iocp_t *);
+int pni_iocp_wait_one(iocp_t *, int timeout, pn_error_t *);
+void pni_iocp_start_accepting(iocpdesc_t *iocpd);
+pn_socket_t pni_iocp_end_accept(iocpdesc_t *ld, sockaddr *addr, socklen_t *addrlen, bool *would_block, pn_error_t *error);
+pn_socket_t pni_iocp_begin_connect(iocp_t *, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error);
+ssize_t pni_iocp_begin_write(iocpdesc_t *, const void *, size_t, bool *, pn_error_t *);
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error);
+void pni_iocp_begin_close(iocpdesc_t *iocpd);
+iocp_t *pni_iocp();
+
+void pni_events_update(iocpdesc_t *iocpd, int events);
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen);
+void pni_zombie_check(iocp_t *, pn_timestamp_t);
+pn_timestamp_t pni_zombie_deadline(iocp_t *);
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code);
+
+pn_timestamp_t pn_i_now2(void);
+}
+
+// ======================================================================
+// Write pipeline.
+// ======================================================================
+
+/*
+ * A simple write buffer pool.  Each socket has a dedicated "primary"
+ * buffer and can borrow from a shared pool with limited size tuning.
+ * Could enhance e.g. with separate pools per network interface and fancier
+ * memory tuning based on interface speed, system resources, and
+ * number of connections, etc.
+ */
+
+namespace pn_experimental {
+
+// Max overlapped writes per socket
+#define IOCP_MAX_OWRITES 16
+// Write buffer size
+#define IOCP_WBUFSIZE 16384
+
+static void pipeline_log(const char *fmt, ...)
+{
+    va_list ap;
+    va_start(ap, fmt);
+    vfprintf(stderr, fmt, ap);
+    va_end(ap);
+    fflush(stderr);
+}
+
+void pni_shared_pool_create(iocp_t *iocp)
+{
+  // TODO: more pools (or larger one) when using multiple non-loopback interfaces
+  iocp->shared_pool_size = 16;
+  char *env = getenv("PNI_WRITE_BUFFERS"); // Internal: for debugging
+  if (env) {
+    int sz = atoi(env);
+    if (sz >= 0 && sz < 256) {
+      iocp->shared_pool_size = sz;
+    }
+  }
+  iocp->loopback_bufsize = 0;
+  env = getenv("PNI_LB_BUFSIZE"); // Internal: for debugging
+  if (env) {
+    int sz = atoi(env);
+    if (sz >= 0 && sz <= 128 * 1024) {
+      iocp->loopback_bufsize = sz;
+    }
+  }
+
+  if (iocp->shared_pool_size) {
+    iocp->shared_pool_memory = (char *) VirtualAlloc(NULL, IOCP_WBUFSIZE * iocp->shared_pool_size, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
+    HRESULT status = GetLastError();
+    if (!iocp->shared_pool_memory) {
+      perror("Proton write buffer pool allocation failure\n");
+      iocp->shared_pool_size = 0;
+      iocp->shared_available_count = 0;
+      return;
+    }
+
+    iocp->shared_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+    iocp->available_results = (write_result_t **) malloc(iocp->shared_pool_size * sizeof(write_result_t *));
+    iocp->shared_available_count = iocp->shared_pool_size;
+    char *mem = iocp->shared_pool_memory;
+    for (unsigned i = 0; i < iocp->shared_pool_size; i++) {
+      iocp->shared_results[i] = iocp->available_results[i] = pni_write_result(NULL, mem, IOCP_WBUFSIZE);
+      mem += IOCP_WBUFSIZE;
+    }
+  }
+}
+
+void pni_shared_pool_free(iocp_t *iocp)
+{
+  for (unsigned i = 0; i < iocp->shared_pool_size; i++) {
+    write_result_t *result = iocp->shared_results[i];
+    if (result->in_use)
+      pipeline_log("Proton buffer pool leak\n");
+    else
+      free(result);
+  }
+  if (iocp->shared_pool_size) {
+    free(iocp->shared_results);
+    free(iocp->available_results);
+    if (iocp->shared_pool_memory) {
+      if (!VirtualFree(iocp->shared_pool_memory, 0, MEM_RELEASE)) {
+        perror("write buffers release failed");
+      }
+      iocp->shared_pool_memory = NULL;
+    }
+  }
+}
+
+static void shared_pool_push(write_result_t *result)
+{
+  iocp_t *iocp = result->base.iocpd->iocp;
+  assert(iocp->shared_available_count < iocp->shared_pool_size);
+  iocp->available_results[iocp->shared_available_count++] = result;
+}
+
+static write_result_t *shared_pool_pop(iocp_t *iocp)
+{
+  return iocp->shared_available_count ? iocp->available_results[--iocp->shared_available_count] : NULL;
+}
+
+struct write_pipeline_t {
+  iocpdesc_t *iocpd;
+  size_t pending_count;
+  write_result_t *primary;
+  size_t reserved_count;
+  size_t next_primary_index;
+  size_t depth;
+  bool is_writer;
+};
+
+#define write_pipeline_compare NULL
+#define write_pipeline_inspect NULL
+#define write_pipeline_hashcode NULL
+
+static void write_pipeline_initialize(void *object)
+{
+  write_pipeline_t *pl = (write_pipeline_t *) object;
+  pl->pending_count = 0;
+  const char *pribuf = (const char *) malloc(IOCP_WBUFSIZE);
+  pl->primary = pni_write_result(NULL, pribuf, IOCP_WBUFSIZE);
+  pl->depth = 0;
+  pl->is_writer = false;
+}
+
+static void write_pipeline_finalize(void *object)
+{
+  write_pipeline_t *pl = (write_pipeline_t *) object;
+  free((void *)pl->primary->buffer.start);
+  free(pl->primary);
+}
+
+write_pipeline_t *pni_write_pipeline(iocpdesc_t *iocpd)
+{
+  static const pn_cid_t CID_write_pipeline = CID_pn_void;
+  static const pn_class_t clazz = PN_CLASS(write_pipeline);
+  write_pipeline_t *pipeline = (write_pipeline_t *) pn_class_new(&clazz, sizeof(write_pipeline_t));
+  pipeline->iocpd = iocpd;
+  pipeline->primary->base.iocpd = iocpd;
+  return pipeline;
+}
+
+static void confirm_as_writer(write_pipeline_t *pl)
+{
+  if (!pl->is_writer) {
+    iocp_t *iocp = pl->iocpd->iocp;
+    iocp->writer_count++;
+    pl->is_writer = true;
+  }
+}
+
+static void remove_as_writer(write_pipeline_t *pl)
+{
+  if (!pl->is_writer)
+    return;
+  iocp_t *iocp = pl->iocpd->iocp;
+  assert(iocp->writer_count);
+  pl->is_writer = false;
+  iocp->writer_count--;
+}
+
+/*
+ * Optimal depth will depend on properties of the NIC, server, and driver.  For now,
+ * just distinguish between loopback interfaces and the rest.  Optimizations in the
+ * loopback stack allow decent performance with depth 1 and actually cause major
+ * performance hiccups if set to large values.
+ */
+static void set_depth(write_pipeline_t *pl)
+{
+  pl->depth = 1;
+  sockaddr_storage sa;
+  socklen_t salen = sizeof(sa);
+  char buf[INET6_ADDRSTRLEN];
+  DWORD buflen = sizeof(buf);
+
+  if (getsockname(pl->iocpd->socket,(sockaddr*) &sa, &salen) == 0 &&
+      getnameinfo((sockaddr*) &sa, salen, buf, buflen, NULL, 0, NI_NUMERICHOST) == 0) {
+    if ((sa.ss_family == AF_INET6 && strcmp(buf, "::1")) ||
+        (sa.ss_family == AF_INET && strncmp(buf, "127.", 4))) {
+      // not loopback
+      pl->depth = IOCP_MAX_OWRITES;
+    } else {
+      iocp_t *iocp = pl->iocpd->iocp;
+      if (iocp->loopback_bufsize) {
+        const char *p = (const char *) realloc((void *) pl->primary->buffer.start, iocp->loopback_bufsize);
+        if (p) {
+          pl->primary->buffer.start = p;
+          pl->primary->buffer.size = iocp->loopback_bufsize;
+        }
+      }
+    }
+  }
+}
+
+static size_t pni_min(size_t a, size_t b) { return a < b ? a : b; }
+
+// Reserve as many buffers as possible for count bytes.
+size_t pni_write_pipeline_reserve(write_pipeline_t *pl, size_t count)
+{
+  if (pl->primary->in_use)
+    return 0;  // I.e. io->wouldblock
+  if (!pl->depth)
+    set_depth(pl);
+  if (pl->depth == 1) {
+    // always use the primary
+    pl->reserved_count = 1;
+    pl->next_primary_index = 0;
+    return 1;
+  }
+
+  iocp_t *iocp = pl->iocpd->iocp;
+  confirm_as_writer(pl);
+  size_t wanted = (count / IOCP_WBUFSIZE);
+  if (count % IOCP_WBUFSIZE)
+    wanted++;
+  size_t pending = pl->pending_count;
+  assert(pending < pl->depth);
+  size_t bufs = pni_min(wanted, pl->depth - pending);
+  // Can draw from shared pool or the primary... but share with others.
+  size_t writers = iocp->writer_count;
+  size_t shared_count = (iocp->shared_available_count + writers - 1) / writers;
+  bufs = pni_min(bufs, shared_count + 1);
+  pl->reserved_count = pending + bufs;
+
+  if (bufs == wanted &&
+      pl->reserved_count < (pl->depth / 2) &&
+      iocp->shared_available_count > (2 * writers + bufs)) {
+    // No shortage: keep the primary as spare for future use
+    pl->next_primary_index = pl->reserved_count;
+  } else if (bufs == 1) {
+    pl->next_primary_index = pending;
+  } else {
+    // let approx 1/3 drain before replenishing
+    pl->next_primary_index = ((pl->reserved_count + 2) / 3) - 1;
+    if (pl->next_primary_index < pending)
+      pl->next_primary_index = pending;
+  }
+  return bufs;
+}
+
+write_result_t *pni_write_pipeline_next(write_pipeline_t *pl)
+{
+  size_t sz = pl->pending_count;
+  if (sz >= pl->reserved_count)
+    return NULL;
+  write_result_t *result;
+  if (sz == pl->next_primary_index) {
+    result = pl->primary;
+  } else {
+    assert(pl->iocpd->iocp->shared_available_count > 0);
+    result = shared_pool_pop(pl->iocpd->iocp);
+  }
+
+  result->in_use = true;
+  pl->pending_count++;
+  return result;
+}
+
+void pni_write_pipeline_return(write_pipeline_t *pl, write_result_t *result)
+{
+  result->in_use = false;
+  pl->pending_count--;
+  pl->reserved_count = 0;
+  if (result != pl->primary)
+    shared_pool_push(result);
+  if (pl->pending_count == 0)
+    remove_as_writer(pl);
+}
+
+bool pni_write_pipeline_writable(write_pipeline_t *pl)
+{
+  // Only writable if not full and we can guarantee a buffer:
+  return pl->pending_count < pl->depth && !pl->primary->in_use;
+}
+
+size_t pni_write_pipeline_size(write_pipeline_t *pl)
+{
+  return pl->pending_count;
+}
+
+}
+
+
+// ======================================================================
+// IOCP infrastructure code
+// ======================================================================
+
+/*
+ * Socket IO including graceful or forced zombie teardown.
+ *
+ * Overlapped writes are used to avoid lengthy stalls between write
+ * completion and starting a new write.  Non-overlapped reads are used
+ * since Windows accumulates inbound traffic without stalling and
+ * managing read buffers would not avoid a memory copy at the pn_read
+ * boundary.
+ *
+ * A socket must not get a Windows closesocket() unless the
+ * application has called pn_connection_close on the connection or a
+ * global single threaded shutdown is in progress. On error, the
+ * internal accounting for write_closed or read_closed may be updated
+ * along with the external event notification.  A socket may be
+ * directly closed if it is never gets started (accept/listener_close
+ * collision) or otherwise turned into a zombie.
+ */
+
+namespace pn_experimental {
+
+pn_timestamp_t pn_i_now2(void)
+{
+  FILETIME now;
+  GetSystemTimeAsFileTime(&now);
+  ULARGE_INTEGER t;
+  t.u.HighPart = now.dwHighDateTime;
+  t.u.LowPart = now.dwLowDateTime;
+  // Convert to milliseconds and adjust base epoch
+  return t.QuadPart / 10000 - 11644473600000;
+}
+
+static void iocp_log(const char *fmt, ...)
+{
+  va_list ap;
+  va_start(ap, fmt);
+  vfprintf(stderr, fmt, ap);
+  va_end(ap);
+  fflush(stderr);
+}
+
+static void set_iocp_error_status(pn_error_t *error, int code, HRESULT status)
+{
+  char buf[512];
+  if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
+                    0, status, 0, buf, sizeof(buf), 0))
+    pn_error_set(error, code, buf);
+  else {
+    fprintf(stderr, "pn internal Windows error: %lu\n", GetLastError());
+  }
+}
+
+static void reap_check(iocpdesc_t *);
+static void bind_to_completion_port(iocpdesc_t *iocpd);
+static void iocp_shutdown(iocpdesc_t *iocpd);
+static bool is_listener(iocpdesc_t *iocpd);
+static void release_sys_sendbuf(SOCKET s);
+
+int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
+{
+  // Error code can be from GetLastError or WSAGetLastError,
+  char err[1024] = {0};
+  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+                FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+  return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
+}
+
+static void iocpdesc_fail(iocpdesc_t *iocpd, HRESULT status, const char* text)
+{
+  pni_win32_error(iocpd->error, text, status);
+  if (iocpd->iocp->iocp_trace) {
+    iocp_log("connection terminated: %s\n", pn_error_text(iocpd->error));
+  }
+  iocpd->write_closed = true;
+  iocpd->read_closed = true;
+  iocpd->poll_error = true;
+  pni_events_update(iocpd, iocpd->events & ~(PN_READABLE | PN_WRITABLE));
+}
+
+static int pni_strcasecmp(const char *a, const char *b)
+{
+  int diff;
+  while (*b) {
+    char aa = *a++, bb = *b++;
+    diff = tolower(aa)-tolower(bb);
+    if ( diff!=0 ) return diff;
+  }
+  return *a;
+}
+
+static void pni_events_update(iocpdesc_t *iocpd, int events)
+{
+  // If set, a poll error is permanent
+  if (iocpd->poll_error)
+    events |= PN_ERROR;
+  if (iocpd->events == events)
+    return;
+  // ditch old code to update list of ready selectables
+  iocpd->events = events;
+}
+
+// Helper functions to use specialized IOCP AcceptEx() and ConnectEx()
+static LPFN_ACCEPTEX lookup_accept_ex(SOCKET s)
+{
+  GUID guid = WSAID_ACCEPTEX;
+  DWORD bytes = 0;
+  LPFN_ACCEPTEX fn;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+           &fn, sizeof(fn), &bytes, NULL, NULL);
+  assert(fn);
+  return fn;
+}
+
+static LPFN_CONNECTEX lookup_connect_ex(SOCKET s)
+{
+  GUID guid = WSAID_CONNECTEX;
+  DWORD bytes = 0;
+  LPFN_CONNECTEX fn;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+           &fn, sizeof(fn), &bytes, NULL, NULL);
+  assert(fn);
+  return fn;
+}
+
+static LPFN_GETACCEPTEXSOCKADDRS lookup_get_accept_ex_sockaddrs(SOCKET s)
+{
+  GUID guid = WSAID_GETACCEPTEXSOCKADDRS;
+  DWORD bytes = 0;
+  LPFN_GETACCEPTEXSOCKADDRS fn;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+           &fn, sizeof(fn), &bytes, NULL, NULL);
+  assert(fn);
+  return fn;
+}
+
+// match accept socket to listener socket
+iocpdesc_t *create_same_type_socket(iocpdesc_t *iocpd)
+{
+  sockaddr_storage sa;
+  socklen_t salen = sizeof(sa);
+  if (getsockname(iocpd->socket, (sockaddr*)&sa, &salen) == -1)
+    return NULL;
+  SOCKET s = socket(sa.ss_family, SOCK_STREAM, 0); // Currently only work with SOCK_STREAM
+  if (s == INVALID_SOCKET)
+    return NULL;
+  return pni_iocpdesc_create(iocpd->iocp, s);
+}
+
+static bool is_listener(iocpdesc_t *iocpd)
+{
+  return iocpd && iocpd->acceptor;
+}
+
+// === Async accept processing
+
+static accept_result_t *accept_result(iocpdesc_t *listen_sock) {
+  accept_result_t *result = (accept_result_t *)calloc(1, sizeof(accept_result_t));
+  if (result) {
+    result->base.type = IOCP_ACCEPT;
+    result->base.iocpd = listen_sock;
+  }
+  return result;
+}
+
+void reset_accept_result(accept_result_t *result) {
+  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+  memset(&result->address_buffer, 0, IOCP_SOCKADDRBUFLEN);
+  result->new_sock = NULL;
+}
+
+struct pni_acceptor_t {
+  int accept_queue_size;
+  pn_list_t *accepts;
+  iocpdesc_t *listen_sock;
+  bool signalled;
+  LPFN_ACCEPTEX fn_accept_ex;
+  LPFN_GETACCEPTEXSOCKADDRS fn_get_accept_ex_sockaddrs;
+};
+
+#define pni_acceptor_compare NULL
+#define pni_acceptor_inspect NULL
+#define pni_acceptor_hashcode NULL
+
+static void pni_acceptor_initialize(void *object)
+{
+  pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+  acceptor->accepts = pn_list(PN_VOID, IOCP_MAX_ACCEPTS);
+}
+
+static void pni_acceptor_finalize(void *object)
+{
+  pni_acceptor_t *acceptor = (pni_acceptor_t *) object;
+  size_t len = pn_list_size(acceptor->accepts);
+  for (size_t i = 0; i < len; i++)
+    free(pn_list_get(acceptor->accepts, i));
+  pn_free(acceptor->accepts);
+}
+
+static pni_acceptor_t *pni_acceptor(iocpdesc_t *iocpd)
+{
+  static const pn_cid_t CID_pni_acceptor = CID_pn_void;
+  static const pn_class_t clazz = PN_CLASS(pni_acceptor);
+  pni_acceptor_t *acceptor = (pni_acceptor_t *) pn_class_new(&clazz, sizeof(pni_acceptor_t));
+  acceptor->listen_sock = iocpd;
+  acceptor->accept_queue_size = 0;
+  acceptor->signalled = false;
+  pn_socket_t sock = acceptor->listen_sock->socket;
+  acceptor->fn_accept_ex = lookup_accept_ex(sock);
+  acceptor->fn_get_accept_ex_sockaddrs = lookup_get_accept_ex_sockaddrs(sock);
+  return acceptor;
+}
+
+void begin_accept(pni_acceptor_t *acceptor, accept_result_t *result)
+{
+  // flag to divide this routine's logic into locked/unlocked mp portions
+  bool mp = acceptor->listen_sock->is_mp;
+  bool created = false;
+
+  if (acceptor->listen_sock->closing) {
+    if (result) {
+      if (mp && result->new_sock && result->new_sock->socket != INVALID_SOCKET)
+        closesocket(result->new_sock->socket);
+      free(result);
+      acceptor->accept_queue_size--;
+    }
+    if (acceptor->accept_queue_size == 0)
+      acceptor->signalled = true;
+    return;
+  }
+
+  if (result) {
+    if (!mp)
+      reset_accept_result(result);
+  } else {
+    if (acceptor->accept_queue_size < IOCP_MAX_ACCEPTS && (mp ||
+           pn_list_size(acceptor->accepts) == acceptor->accept_queue_size )) {
+      result = accept_result(acceptor->listen_sock);
+      acceptor->accept_queue_size++;
+      created = true;
+    } else {
+      // an async accept is still pending or max concurrent accepts already hit
+      return;
+    }
+  }
+
+  if (created || !mp)
+    result->new_sock = create_same_type_socket(acceptor->listen_sock);
+  if (result->new_sock) {
+    // Not yet connected.
+    result->new_sock->read_closed = true;
+    result->new_sock->write_closed = true;
+
+    bool success = acceptor->fn_accept_ex(acceptor->listen_sock->socket, result->new_sock->socket,
+                     result->address_buffer, 0, IOCP_SOCKADDRMAXLEN, IOCP_SOCKADDRMAXLEN,
+                     &result->unused, (LPOVERLAPPED) result);
+    if (!success) {
+      DWORD err = WSAGetLastError();
+      if (err != ERROR_IO_PENDING) {
+        if (err == WSAECONNRESET) {
+          // other side gave up.  Ignore and try again.
+          begin_accept(acceptor, result);
+          return;
+        }
+        else {
+          iocpdesc_fail(acceptor->listen_sock, err, "AcceptEX call failure");
+          return;
+        }
+      }
+      acceptor->listen_sock->ops_in_progress++;
+      // This socket is equally involved in the async operation.
+      result->new_sock->ops_in_progress++;
+    }
+  } else {
+    iocpdesc_fail(acceptor->listen_sock, WSAGetLastError(), "create accept socket");
+  }
+}
+
+static void complete_accept(accept_result_t *result, HRESULT status)
+{
+  result->new_sock->ops_in_progress--;
+  iocpdesc_t *ld = result->base.iocpd;
+  if (ld->read_closed) {
+    if (!result->new_sock->closing)
+      pni_iocp_begin_close(result->new_sock);
+    pn_decref(result->new_sock);
+    free(result);    // discard
+    reap_check(ld);
+  } else {
+    assert(!ld->is_mp);  // Non mp only
+    result->base.status = status;
+    pn_list_add(ld->acceptor->accepts, result);
+    pni_events_update(ld, ld->events | PN_READABLE);
+  }
+}
+
+
+// === Async connect processing
+
+
+#define connect_result_initialize NULL
+#define connect_result_compare NULL
+#define connect_result_inspect NULL
+#define connect_result_hashcode NULL
+
+static void connect_result_finalize(void *object)
+{
+  connect_result_t *result = (connect_result_t *) object;
+  // Do not release addrinfo until ConnectEx completes
+  if (result->addrinfo)
+    freeaddrinfo(result->addrinfo);
+}
+
+connect_result_t *connect_result(iocpdesc_t *iocpd, struct addrinfo *addr) {
+  static const pn_cid_t CID_connect_result = CID_pn_void;
+  static const pn_class_t clazz = PN_CLASS(connect_result);
+  connect_result_t *result = (connect_result_t *) pn_class_new(&clazz, sizeof(connect_result_t));
+  if (result) {
+    memset(result, 0, sizeof(connect_result_t));
+    result->base.type = IOCP_CONNECT;
+    result->base.iocpd = iocpd;
+    result->addrinfo = addr;
+  }
+  return result;
+}
+
+pn_socket_t pni_iocp_begin_connect(iocp_t *iocp, pn_socket_t sock, struct addrinfo *addr, pn_error_t *error)
+{
+  // addr lives for the duration of the async connect.  Caller has passed ownership here.
+  // See connect_result_finalize().
+  // Use of Windows-specific ConnectEx() requires our socket to be "loosely" pre-bound:
+  sockaddr_storage sa;
+  memset(&sa, 0, sizeof(sa));
+  sa.ss_family = addr->ai_family;
+  if (bind(sock, (SOCKADDR *) &sa, addr->ai_addrlen)) {
+    pni_win32_error(error, "begin async connection", WSAGetLastError());
+    if (iocp->iocp_trace)
+      iocp_log("%s\n", pn_error_text(error));
+    closesocket(sock);
+    freeaddrinfo(addr);
+    return INVALID_SOCKET;
+  }
+
+  iocpdesc_t *iocpd = pni_iocpdesc_create(iocp, sock);
+  bind_to_completion_port(iocpd);
+  LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex(iocpd->socket);
+  connect_result_t *result = connect_result(iocpd, addr);
+  DWORD unused;
+  bool success = fn_connect_ex(iocpd->socket, result->addrinfo->ai_addr, result->addrinfo->ai_addrlen,
+                               NULL, 0, &unused, (LPOVERLAPPED) result);
+  if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
+    pni_win32_error(error, "ConnectEx failure", WSAGetLastError());
+    pn_free(result);
+    iocpd->write_closed = true;
+    iocpd->read_closed = true;
+    if (iocp->iocp_trace)
+      iocp_log("%s\n", pn_error_text(error));
+  } else {
+    iocpd->ops_in_progress++;
+  }
+  return sock;
+}
+
+void complete_connect(connect_result_t *result, HRESULT status)
+{
+  iocpdesc_t *iocpd = result->base.iocpd;
+  if (iocpd->closing) {
+    pn_free(result);
+    reap_check(iocpd);
+    return;
+  }
+
+  if (status) {
+    iocpdesc_fail(iocpd, status, "Connect failure");
+    // Posix sets selectable events as follows:
+    pni_events_update(iocpd, PN_READABLE | PN_EXPIRED);
+  } else {
+    release_sys_sendbuf(iocpd->socket);
+    if (setsockopt(iocpd->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,  NULL, 0)) {
+      iocpdesc_fail(iocpd, WSAGetLastError(), "Internal connect failure (update context)");
+    } else {
+      pni_events_update(iocpd, PN_WRITABLE);
+      start_reading(iocpd);
+    }
+  }
+  pn_free(result);
+  return;
+}
+
+
+// === Async writes
+
+static bool write_in_progress(iocpdesc_t *iocpd)
+{
+  return pni_write_pipeline_size(iocpd->pipeline) != 0;
+}
+
+write_result_t *pni_write_result(iocpdesc_t *iocpd, const char *buf, size_t buflen)
+{
+  write_result_t *result = (write_result_t *) calloc(sizeof(write_result_t), 1);
+  if (result) {
+    result->base.type = IOCP_WRITE;
+    result->base.iocpd = iocpd;
+    result->buffer.start = buf;
+    result->buffer.size = buflen;
+  }
+  return result;
+}
+
+static int submit_write(write_result_t *result, const void *buf, size_t len)
+{
+  WSABUF wsabuf;
+  wsabuf.buf = (char *) buf;
+  wsabuf.len = len;
+  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+  return WSASend(result->base.iocpd->socket, &wsabuf, 1, NULL, 0,
+                 (LPOVERLAPPED) result, 0);
+}
+
+ssize_t pni_iocp_begin_write(iocpdesc_t *iocpd, const void *buf, size_t len, bool *would_block, pn_error_t *error)
+{
+  if (len == 0) return 0;
+  *would_block = false;
+  if (is_listener(iocpd)) {
+    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+    return INVALID_SOCKET;
+  }
+  if (iocpd->closing) {
+    set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+    return SOCKET_ERROR;
+  }
+  if (iocpd->write_closed) {
+    assert(pn_error_code(iocpd->error));
+    pn_error_copy(error, iocpd->error);
+    if (iocpd->iocp->iocp_trace)
+      iocp_log("write error: %s\n", pn_error_text(error));
+    return SOCKET_ERROR;
+  }
+  if (len == 0) return 0;
+  if (!(iocpd->events & PN_WRITABLE)) {
+    *would_block = true;
+    return SOCKET_ERROR;
+  }
+
+  size_t written = 0;
+  size_t requested = len;
+  const char *outgoing = (const char *) buf;
+  size_t available = pni_write_pipeline_reserve(iocpd->pipeline, len);
+  if (!available) {
+    *would_block = true;
+    return SOCKET_ERROR;
+  }
+
+  for (size_t wr_count = 0; wr_count < available; wr_count++) {
+    write_result_t *result = pni_write_pipeline_next(iocpd->pipeline);
+    assert(result);
+    result->base.iocpd = iocpd;
+    ssize_t actual_len = len;
+    if (len > result->buffer.size) actual_len = result->buffer.size;
+    result->requested = actual_len;
+    memmove((void *)result->buffer.start, outgoing, actual_len);
+    outgoing += actual_len;
+    written += actual_len;
+    len -= actual_len;
+
+    int werror = submit_write(result, result->buffer.start, actual_len);
+    if (werror && WSAGetLastError() != ERROR_IO_PENDING) {
+      pni_write_pipeline_return(iocpd->pipeline, result);
+      iocpdesc_fail(iocpd, WSAGetLastError(), "overlapped send");
+      return SOCKET_ERROR;
+    }
+    iocpd->ops_in_progress++;
+  }
+
+  if (!pni_write_pipeline_writable(iocpd->pipeline))
+    pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+  return written;
+}
+
+/*
+ * Note: iocp write completion is not "bytes on the wire", it is "peer
+ * acked the sent bytes".  Completion can be seconds on a slow
+ * consuming peer.
+ */
+void complete_write(write_result_t *result, DWORD xfer_count, HRESULT status)
+{
+  iocpdesc_t *iocpd = result->base.iocpd;
+  if (iocpd->closing) {
+    pni_write_pipeline_return(iocpd->pipeline, result);
+    if (!iocpd->write_closed && !write_in_progress(iocpd))
+      iocp_shutdown(iocpd);
+    reap_check(iocpd);
+    return;
+  }
+  if (status == 0 && xfer_count > 0) {
+    if (xfer_count != result->requested) {
+      // Is this recoverable?  How to preserve order if multiple overlapped writes?
+      pni_write_pipeline_return(iocpd->pipeline, result);
+      iocpdesc_fail(iocpd, WSA_OPERATION_ABORTED, "Partial overlapped write on socket");
+      return;
+    } else {
+      // Success.
+      pni_write_pipeline_return(iocpd->pipeline, result);
+      if (pni_write_pipeline_writable(iocpd->pipeline))
+        pni_events_update(iocpd, iocpd->events | PN_WRITABLE);
+      return;
+    }
+  }
+  // Other error
+  pni_write_pipeline_return(iocpd->pipeline, result);
+  if (status == WSAECONNABORTED || status == WSAECONNRESET || status == WSAENOTCONN
+      || status == ERROR_NETNAME_DELETED) {
+    iocpd->write_closed = true;
+    iocpd->poll_error = true;
+    pni_events_update(iocpd, iocpd->events & ~PN_WRITABLE);
+    pni_win32_error(iocpd->error, "Remote close or timeout", status);
+  } else {
+    iocpdesc_fail(iocpd, status, "IOCP async write error");
+  }
+}
+
+
+// === Async reads
+
+
+static read_result_t *read_result(iocpdesc_t *iocpd)
+{
+  read_result_t *result = (read_result_t *) calloc(sizeof(read_result_t), 1);
+  if (result) {
+    result->base.type = IOCP_READ;
+    result->base.iocpd = iocpd;
+  }
+  return result;
+}
+
+static void begin_zero_byte_read(iocpdesc_t *iocpd)
+{
+  if (iocpd->read_in_progress) return;
+  if (iocpd->read_closed) {
+    pni_events_update(iocpd, iocpd->events | PN_READABLE);
+    return;
+  }
+
+  read_result_t *result = iocpd->read_result;
+  memset(&result->base.overlapped, 0, sizeof (OVERLAPPED));
+  DWORD flags = 0;
+  WSABUF wsabuf;
+  wsabuf.buf = result->unused_buf;
+  wsabuf.len = 0;
+  int rc = WSARecv(iocpd->socket, &wsabuf, 1, NULL, &flags,
+                       &result->base.overlapped, 0);
+  if (rc && WSAGetLastError() != ERROR_IO_PENDING) {
+    iocpdesc_fail(iocpd, WSAGetLastError(), "IOCP read error");
+    return;
+  }
+  iocpd->ops_in_progress++;
+  iocpd->read_in_progress = true;
+}
+
+static void drain_until_closed(iocpdesc_t *iocpd) {
+  size_t max_drain = 16 * 1024;
+  char buf[512];
+  read_result_t *result = iocpd->read_result;
+  while (result->drain_count < max_drain) {
+    int rv = recv(iocpd->socket, buf, 512, 0);
+    if (rv > 0)
+      result->drain_count += rv;
+    else if (rv == 0) {
+      iocpd->read_closed = true;
+      return;
+    } else if (WSAGetLastError() == WSAEWOULDBLOCK) {
+      // wait a little longer
+      start_reading(iocpd);
+      return;
+    }
+    else
+      break;
+  }
+  // Graceful close indication unlikely, force the issue
+  if (iocpd->iocp->iocp_trace)
+    if (result->drain_count >= max_drain)
+      iocp_log("graceful close on reader abandoned (too many chars)\n");
+    else
+      iocp_log("graceful close on reader abandoned: %d\n", WSAGetLastError());
+  iocpd->read_closed = true;
+}
+
+
+void complete_read(read_result_t *result, DWORD xfer_count, HRESULT status)
+{
+  iocpdesc_t *iocpd = result->base.iocpd;
+  iocpd->read_in_progress = false;
+
+  if (iocpd->closing) {
+    // Application no longer reading, but we are looking for a zero length read
+    if (!iocpd->read_closed)
+      drain_until_closed(iocpd);
+    reap_check(iocpd);
+    return;
+  }
+
+  if (status == 0 && xfer_count == 0) {
+    // Success.
+    pni_events_update(iocpd, iocpd->events | PN_READABLE);
+  } else {
+    iocpdesc_fail(iocpd, status, "IOCP read complete error");
+  }
+}
+
+ssize_t pni_iocp_recv(iocpdesc_t *iocpd, void *buf, size_t size, bool *would_block, pn_error_t *error)
+{
+  if (size == 0) return 0;
+  *would_block = false;
+  if (is_listener(iocpd)) {
+    set_iocp_error_status(error, PN_ERR, WSAEOPNOTSUPP);
+    return SOCKET_ERROR;
+  }
+  if (iocpd->closing) {
+    // Previous call to pn_close()
+    set_iocp_error_status(error, PN_ERR, WSAESHUTDOWN);
+    return SOCKET_ERROR;
+  }
+  if (iocpd->read_closed) {
+    if (pn_error_code(iocpd->error))
+      pn_error_copy(error, iocpd->error);
+    else
+      set_iocp_error_status(error, PN_ERR, WSAENOTCONN);
+    return SOCKET_ERROR;
+  }
+
+  int count = recv(iocpd->socket, (char *) buf, size, 0);
+  if (count > 0) {
+    pni_events_update(iocpd, iocpd->events & ~PN_READABLE);
+    // caller must initiate     begin_zero_byte_read(iocpd);
+    return (ssize_t) count;
+  } else if (count == 0) {
+    iocpd->read_closed = true;
+    return 0;
+  }
+  if (WSAGetLastError() == WSAEWOULDBLOCK)
+    *would_block = true;
+  else {
+    set_iocp_error_status(error, PN_ERR, WSAGetLastError());
+    iocpd->read_closed = true;
+  }
+  return SOCKET_ERROR;
+}
+
+void start_reading(iocpdesc_t *iocpd)
+{
+  begin_zero_byte_read(iocpd);
+}
+
+
+// === The iocp descriptor
+
+static void pni_iocpdesc_initialize(void *object)
+{
+  iocpdesc_t *iocpd = (iocpdesc_t *) object;
+  memset(iocpd, 0, sizeof(iocpdesc_t));
+  iocpd->socket = INVALID_SOCKET;
+}
+
+static void pni_iocpdesc_finalize(void *object)
+{
+  iocpdesc_t *iocpd = (iocpdesc_t *) object;
+  pn_free(iocpd->acceptor);
+  pn_error_free(iocpd->error);
+   if (iocpd->pipeline)
+    if (write_in_progress(iocpd))
+      iocp_log("iocp descriptor write leak\n");
+    else
+      pn_free(iocpd->pipeline);
+  if (iocpd->read_in_progress)
+    iocp_log("iocp descriptor read leak\n");
+  else
+    free(iocpd->read_result);
+}
+
+static uintptr_t pni_iocpdesc_hashcode(void *object)
+{
+  iocpdesc_t *iocpd = (iocpdesc_t *) object;
+  return iocpd->socket;
+}
+
+#define pni_iocpdesc_compare NULL
+#define pni_iocpdesc_inspect NULL
+
+// Reference counted in the iocpdesc map, zombie_list, selector.
+static iocpdesc_t *pni_iocpdesc(pn_socket_t s)
+{
+  static const pn_cid_t CID_pni_iocpdesc = CID_pn_void;
+  static pn_class_t clazz = PN_CLASS(pni_iocpdesc);
+  iocpdesc_t *iocpd = (iocpdesc_t *) pn_class_new(&clazz, sizeof(iocpdesc_t));
+  assert(iocpd);
+  iocpd->socket = s;
+  return iocpd;
+}
+
+static bool is_listener_socket(pn_socket_t s)
+{
+  BOOL tval = false;
+  int tvalsz = sizeof(tval);
+  int code = getsockopt(s, SOL_SOCKET, SO_ACCEPTCONN, (char *)&tval, &tvalsz);
+  return code == 0 && tval;
+}
+
+iocpdesc_t *pni_iocpdesc_create(iocp_t *iocp, pn_socket_t s) {
+  assert (s != INVALID_SOCKET);
+  bool listening = is_listener_socket(s);
+  iocpdesc_t *iocpd = pni_iocpdesc(s);
+  iocpd->iocp = iocp;
+  if (iocpd) {
+    iocpd->error = pn_error();
+    if (listening) {
+      iocpd->acceptor = pni_acceptor(iocpd);
+    } else {
+      iocpd->pipeline = pni_write_pipeline(iocpd);
+      iocpd->read_result = read_result(iocpd);
+    }
+  }
+  return iocpd;
+}
+
+
+static void bind_to_completion_port(iocpdesc_t *iocpd)
+{
+  if (iocpd->bound) return;
+  if (!iocpd->iocp->completion_port) {
+    iocpdesc_fail(iocpd, WSAEINVAL, "Incomplete setup, no completion port.");
+    return;
+  }
+
+  if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0))
+    iocpd->bound = true;
+  else {
+    iocpdesc_fail(iocpd, GetLastError(), "IOCP socket setup.");
+  }
+}
+
+static void release_sys_sendbuf(SOCKET s)
+{
+  // Set the socket's send buffer size to zero.
+  int sz = 0;
+  int status = setsockopt(s, SOL_SOCKET, SO_SNDBUF, (const char *)&sz, sizeof(int));
+  assert(status == 0);
+}
+
+void pni_iocpdesc_start(iocpdesc_t *iocpd)
+{
+  if (iocpd->bound) return;
+  bind_to_completion_port(iocpd);
+  if (is_listener(iocpd)) {
+    begin_accept(iocpd->acceptor, NULL);
+  }
+  else {
+    release_sys_sendbuf(iocpd->socket);
+    pni_events_update(iocpd, PN_WRITABLE);
+    start_reading(iocpd);
+  }
+}
+
+static void complete(iocp_result_t *result, bool success, DWORD num_transferred) {
+  result->iocpd->ops_in_progress--;
+  DWORD status = success ? 0 : GetLastError();
+
+  switch (result->type) {
+  case IOCP_ACCEPT:
+    complete_accept((accept_result_t *) result, status);
+    break;
+  case IOCP_CONNECT:
+    complete_connect((connect_result_t *) result, status);
+    break;
+  case IOCP_WRITE:
+    complete_write((write_result_t *) result, num_transferred, status);
+    break;
+  case IOCP_READ:
+    complete_read((read_result_t *) result, num_transferred, status);
+    break;
+  default:
+    assert(false);
+  }
+}
+
+void pni_iocp_drain_completions(iocp_t *iocp)
+{
+  while (true) {
+    DWORD timeout_ms = 0;
+    DWORD num_transferred = 0;
+    ULONG_PTR completion_key = 0;
+    OVERLAPPED *overlapped = 0;
+
+    bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+                                               &completion_key, &overlapped, timeout_ms);
+    if (!overlapped)
+      return;  // timed out
+    iocp_result_t *result = (iocp_result_t *) overlapped;
+    if (!completion_key)
+      complete(result, good_op, num_transferred);
+  }
+}
+
+// returns: -1 on error, 0 on timeout, 1 successful completion
+// proactor layer uses completion_key, but we don't, so ignore those (shutdown in progress)
+int pni_iocp_wait_one(iocp_t *iocp, int timeout, pn_error_t *error) {
+  DWORD win_timeout = (timeout < 0) ? INFINITE : (DWORD) timeout;
+  DWORD num_transferred = 0;
+  ULONG_PTR completion_key = 0;
+  OVERLAPPED *overlapped = 0;
+
+  bool good_op = GetQueuedCompletionStatus (iocp->completion_port, &num_transferred,
+                                            &completion_key, &overlapped, win_timeout);
+  if (!overlapped)
+    if (GetLastError() == WAIT_TIMEOUT)
+      return 0;
+    else {
+      if (error)
+        pni_win32_error(error, "GetQueuedCompletionStatus", GetLastError());
+      return -1;
+    }
+
+  if (completion_key)
+    return pni_iocp_wait_one(iocp, timeout, error);
+  iocp_result_t *result = (iocp_result_t *) overlapped;
+  complete(result, good_op, num_transferred);
+  return 1;
+}
+
+// === Close (graceful and otherwise)
+
+// zombie_list is for sockets transitioning out of iocp on their way to zero ops_in_progress
+// and fully closed.
+
+static void zombie_list_add(iocpdesc_t *iocpd)
+{
+  assert(iocpd->closing);
+  if (!iocpd->ops_in_progress) {
+    // No need to make a zombie.
+    if (iocpd->socket != INVALID_SOCKET) {
+      closesocket(iocpd->socket);
+      iocpd->socket = INVALID_SOCKET;
+      iocpd->read_closed = true;
+    }
+    return;
+  }
+  // Allow 2 seconds for graceful shutdown before releasing socket resource.
+  iocpd->reap_time = pn_i_now2() + 2000;
+  pn_list_add(iocpd->iocp->zombie_list, iocpd);
+}
+
+static void reap_check(iocpdesc_t *iocpd)
+{
+  if (iocpd->closing && !iocpd->ops_in_progress) {
+    if (iocpd->socket != INVALID_SOCKET) {
+      closesocket(iocpd->socket);
+      iocpd->socket = INVALID_SOCKET;
+    }
+    pn_list_remove(iocpd->iocp->zombie_list, iocpd);
+    // iocpd is decref'ed and possibly released
+  }
+}
+
+void pni_iocp_reap_check(iocpdesc_t *iocpd) {
+  reap_check(iocpd);
+}
+
+pn_timestamp_t pni_zombie_deadline(iocp_t *iocp)
+{
+  if (pn_list_size(iocp->zombie_list)) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, 0);
+    return iocpd->reap_time;
+  }
+  return 0;
+}
+
+void pni_zombie_check(iocp_t *iocp, pn_timestamp_t now)
+{
+  pn_list_t *zl = iocp->zombie_list;
+  // Look for stale zombies that should have been reaped by "now"
+  for (size_t idx = 0; idx < pn_list_size(zl); idx++) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(zl, idx);
+    if (iocpd->reap_time > now)
+      return;
+    if (iocpd->socket == INVALID_SOCKET)
+      continue;
+    if (iocp->iocp_trace)
+      iocp_log("async close: graceful close timeout exceeded\n");
+    closesocket(iocpd->socket);
+    iocpd->socket = INVALID_SOCKET;
+    iocpd->read_closed = true;
+    // outstanding ops should complete immediately now
+  }
+}
+
+static void drain_zombie_completions(iocp_t *iocp)
+{
+  // No more pn_selector_select() from App, but zombies still need care and feeding
+  // until their outstanding async actions complete.
+  pni_iocp_drain_completions(iocp);
+
+  // Discard any that have no pending async IO
+  size_t sz = pn_list_size(iocp->zombie_list);
+  for (size_t idx = 0; idx < sz;) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, idx);
+    if (!iocpd->ops_in_progress) {
+      pn_list_del(iocp->zombie_list, idx, 1);
+      sz--;
+    } else {
+      idx++;
+    }
+  }
+
+  unsigned shutdown_grace = 2000;
+  char *override = getenv("PN_SHUTDOWN_GRACE");
+  if (override) {
+    int grace = atoi(override);
+    if (grace > 0 && grace < 60000)
+      shutdown_grace = (unsigned) grace;
+  }
+  pn_timestamp_t now = pn_i_now2();
+  pn_timestamp_t deadline = now + shutdown_grace;
+
+  while (pn_list_size(iocp->zombie_list)) {
+    if (now >= deadline)
+      break;
+    int rv = pni_iocp_wait_one(iocp, deadline - now, NULL);
+    if (rv < 0) {
+      iocp_log("unexpected IOCP failure on Proton IO shutdown %d\n", GetLastError());
+      break;
+    }
+    now = pn_i_now2();
+  }
+  if (now >= deadline && pn_list_size(iocp->zombie_list) && iocp->iocp_trace)
+    // Should only happen if really slow TCP handshakes, i.e. total network failure
+    iocp_log("network failure on Proton shutdown\n");
+}
+
+static void zombie_list_hard_close_all(iocp_t *iocp)
+{
+  pni_iocp_drain_completions(iocp);
+  size_t zs = pn_list_size(iocp->zombie_list);
+  for (size_t i = 0; i < zs; i++) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+    if (iocpd->socket != INVALID_SOCKET) {
+      closesocket(iocpd->socket);
+      iocpd->socket = INVALID_SOCKET;
+      iocpd->read_closed = true;
+      iocpd->write_closed = true;
+    }
+  }
+  pni_iocp_drain_completions(iocp);
+
+  // Zombies should be all gone.  Do a sanity check.
+  zs = pn_list_size(iocp->zombie_list);
+  int remaining = 0;
+  int ops = 0;
+  for (size_t i = 0; i < zs; i++) {
+    iocpdesc_t *iocpd = (iocpdesc_t *) pn_list_get(iocp->zombie_list, i);
+    remaining++;
+    ops += iocpd->ops_in_progress;
+  }
+  if (remaining)
+    iocp_log("Proton: %d unfinished close operations (ops count = %d)\n", remaining, ops);
+}
+
+static void iocp_shutdown(iocpdesc_t *iocpd)
+{
+  if (iocpd->socket == INVALID_SOCKET)
+    return;    // Hard close in progress
+  if (shutdown(iocpd->socket, SD_SEND)) {
+    int err = WSAGetLastError();
+    if (err != WSAECONNABORTED && err != WSAECONNRESET && err != WSAENOTCONN)
+      if (iocpd->iocp->iocp_trace)
+        iocp_log("socket shutdown failed %d\n", err);
+  }
+  iocpd->write_closed = true;
+}
+
+void pni_iocp_begin_close(iocpdesc_t *iocpd)
+{
+  assert (!iocpd->closing);
+  if (is_listener(iocpd)) {
+    // Listening socket is easy.  Close the socket which will cancel async ops.
+    pn_socket_t old_sock = iocpd->socket;
+    iocpd->socket = INVALID_SOCKET;
+    iocpd->closing = true;
+    iocpd->read_closed = true;
+    iocpd->write_closed = true;
+    closesocket(old_sock);
+    // Pending accepts will now complete.  Zombie can die when all consumed.
+    zombie_list_add(iocpd);
+  } else {
+    // Continue async operation looking for graceful close confirmation or timeout.
+    pn_socket_t old_sock = iocpd->socket;
+    iocpd->closing = true;
+    if (!iocpd->write_closed && !write_in_progress(iocpd))
+      iocp_shutdown(iocpd);
+    zombie_list_add(iocpd);
+  }
+}
+
+
+// === iocp_t
+
+#define pni_iocp_hashcode NULL
+#define pni_iocp_compare NULL
+#define pni_iocp_inspect NULL
+
+void pni_iocp_initialize(void *obj)
+{
+  iocp_t *iocp = (iocp_t *) obj;
+  memset(iocp, 0, sizeof(iocp_t));
+  pni_shared_pool_create(iocp);
+  iocp->completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+  assert(iocp->completion_port != NULL);
+  iocp->zombie_list = pn_list(PN_OBJECT, 0);
+  iocp->iocp_trace = false;
+}
+
+void pni_iocp_finalize(void *obj)
+{
+  iocp_t *iocp = (iocp_t *) obj;
+  // Move sockets to closed state
+  drain_zombie_completions(iocp);    // Last chance for graceful close
+  zombie_list_hard_close_all(iocp);
+  CloseHandle(iocp->completion_port);  // This cancels all our async ops
+  iocp->completion_port = NULL;
+  // Now safe to free everything that might be touched by a former async operation.
+  pn_free(iocp->zombie_list);
+  pni_shared_pool_free(iocp);
+}
+
+iocp_t *pni_iocp()
+{
+  static const pn_cid_t CID_pni_iocp = CID_pn_void;
+  static const pn_class_t clazz = PN_CLASS(pni_iocp);
+  iocp_t *iocp = (iocp_t *) pn_class_new(&clazz, sizeof(iocp_t));
+  return iocp;
+}
+
+}
+
+
+// ======================================================================
+// Proton Proactor support
+// ======================================================================
+
+#include "../core/log_private.h"
+#include "./proactor-internal.h"
+
+class csguard {
+  public:
+    csguard(CRITICAL_SECTION *cs) : cs_(cs), set_(true) { EnterCriticalSection(cs_); }
+    ~csguard() { if (set_) LeaveCriticalSection(cs_); }
+    void release() {
+        if (set_) {
+            set_ = false;
+            LeaveCriticalSection(cs_);
+        }
+    }
+  private:
+    LPCRITICAL_SECTION cs_;
+    bool set_;
+};
+
+
+// Get string from error status
+std::string errno_str2(DWORD status) {
+  char buf[512];
+  if (FormatMessage(FORMAT_MESSAGE_MAX_WIDTH_MASK | FORMAT_MESSAGE_FROM_SYSTEM,
+                    0, status, 0, buf, sizeof(buf), 0))
+      return std::string(buf);
+  return std::string("internal proactor error");
+}
+
+
+std::string errno_str(const std::string& msg, bool is_wsa) {
+  DWORD e = is_wsa ? WSAGetLastError() : GetLastError();
+  return msg + ": " + errno_str2(e);
+}
+
+
+
+using namespace pn_experimental;
+
+static void proactor_wake_stub() {}
+ULONG_PTR proactor_wake_key = (ULONG_PTR) &proactor_wake_stub;
+
+static void psocket_wakeup_stub() {}
+ULONG_PTR psocket_wakeup_key = (ULONG_PTR) &psocket_wakeup_stub;
+
+static void recycle_accept_stub() {}
+ULONG_PTR recycle_accept_key = (ULONG_PTR) &recycle_accept_stub;
+
+static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+{
+  struct addrinfo hints = { 0 };
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+  return getaddrinfo(host, port, &hints, res) ? WSAGetLastError() : 0;
+}
+
+const char *COND_NAME = "proactor";
+PN_HANDLE(PN_PROACTOR)
+
+// The number of times a connection event batch may be replenished for
+// a thread between calls to wait().
+// TODO: consider some instrumentation to determine an optimal number
+// or switch to cpu time based limit.
+#define HOG_MAX 3
+
+/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
+   Class definitions are for identification as pn_event_t context only.
+*/
+PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
+PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+
+/* Completion serialization context common to connection and listener. */
+/* And also the reaper singleton (which has no socket */
+
+typedef enum {
+  PROACTOR,
+  PCONNECTION,
+  LISTENER,
+  WAKEABLE } pcontext_type_t;
+
+typedef struct pcontext_t {
+  CRITICAL_SECTION cslock;
+  pn_proactor_t *proactor;  /* Immutable */
+  void *owner;              /* Instance governed by the context */
+  pcontext_type_t type;
+  bool working;
+  bool wake_pending;
+  int completion_ops;  // uncompleted ops that are not socket IO related
+  struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+  bool closing;
+  // Next 4 are protected by the proactor mutex
+  struct pcontext_t* next;  /* Protected by proactor.mutex */
+  struct pcontext_t* prev;  /* Protected by proactor.mutex */
+  int disconnect_ops;           /* ops remaining before disconnect complete */
+  bool disconnecting;           /* pn_proactor_disconnect */
+} pcontext_t;
+
+static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) {
+  memset(ctx, 0, sizeof(*ctx));
+  InitializeCriticalSectionAndSpinCount(&ctx->cslock, 4000);
+  ctx->proactor = p;
+  ctx->owner = o;
+  ctx->type = t;
+}
+
+static void pcontext_finalize(pcontext_t* ctx) {
+  DeleteCriticalSection(&ctx->cslock);
+}
+
+typedef struct psocket_t {
+  iocpdesc_t *iocpd;            /* NULL if reaper, or socket open failure. */
+  pn_listener_t *listener;      /* NULL for a connection socket */
+  pn_netaddr_t listen_addr;     /* Not filled in for connection sockets */
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
+  bool is_reaper;
+} psocket_t;
+
+static void psocket_init(psocket_t* ps, pn_listener_t *listener, bool is_reaper, const char *addr) {
+  ps->is_reaper = is_reaper;
+  if (is_reaper) return;
+  ps->listener = listener;
+  pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
+}
+
+struct pn_proactor_t {
+  pcontext_t context;
+  CRITICAL_SECTION write_lock;
+  CRITICAL_SECTION timer_lock;
+  CRITICAL_SECTION bind_lock;
+  HANDLE timer_queue;
+  HANDLE timeout_timer;
+  iocp_t *iocp;
+  class reaper *reaper;
+  pn_collector_t *collector;
+  pcontext_t *contexts;         /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
+  pn_event_batch_t batch;
+  size_t disconnects_pending;   /* unfinished proactor disconnects*/
+
+  // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+  bool need_interrupt;
+  bool need_inactive;
+  bool need_timeout;
+  bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+  bool timeout_processed;  /* timout event dispatched in the most recent event batch */
+  bool delayed_interrupt;
+  bool shutting_down;
+};
+
+typedef struct pconnection_t {
+  psocket_t psocket;
+  pcontext_t context;
+  pn_connection_driver_t driver;
+  std::queue<iocp_result_t *> *completion_queue;
+  std::queue<iocp_result_t *> *work_queue;
+  pn_condition_t *disconnect_condition;
+  pn_event_batch_t batch;
+  int wake_count;
+  int hog_count; // thread hogging limiter
+  bool server;                /* accept, not connect */
+  bool started;
+  bool connecting;
+  bool tick_pending;
+  bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
+  bool bound;
+  bool stop_timer_required;
+  bool can_wake;
+  HANDLE tick_timer;
+  struct pn_netaddr_t local, remote; /* Actual addresses */
+  struct addrinfo *addrinfo;         /* Resolved address list */
+  struct addrinfo *ai;               /* Current connect address */
+} pconnection_t;
+
+struct pn_listener_t {
+  psocket_t *psockets;          /* Array of listening sockets */
+  size_t psockets_size;
+  pcontext_t context;
+  std::queue<accept_result_t *> *pending_accepts;  // sockets awaiting a pn_listener_accept
+  int pending_events;          // number of PN_LISTENER_ACCEPT events to be delivered
+  pn_condition_t *condition;
+  pn_collector_t *collector;
+  pn_event_batch_t batch;
+  pn_record_t *attachments;
+  void *listener_context;
+  size_t backlog;
+  bool close_dispatched;
+};
+
+
+static bool proactor_remove(pcontext_t *ctx);
+static void listener_done(pn_listener_t *l);
+static bool proactor_update_batch(pn_proactor_t *p);
+static pn_event_batch_t *listener_process(pn_listener_t *l, iocp_result_t *result);
+static void recycle_result(accept_result_t *accept_result);
+static void proactor_add(pcontext_t *p);
+static void release_pending_accepts(pn_listener_t *l);
+static void proactor_shutdown(pn_proactor_t *p);
+
+static void post_completion(iocp_t *iocp, ULONG_PTR arg1, void *arg2) {
+    // Despite the vagueness of the official documentation, the
+    // following args to Post are passed undisturbed and unvalidated
+    // to GetQueuedCompletionStatus().  In particular, arg2 does not
+    // have to be an OVERLAPPED struct and may be NULL.
+
+    DWORD nxfer = 0;            // We could pass this through too if needed.
+    PostQueuedCompletionStatus(iocp->completion_port, nxfer, arg1, (LPOVERLAPPED) arg2);
+}
+
+static inline bool is_write_result(iocp_result_t *r) {
+    return r && r->type == IOCP_WRITE;
+}
+
+static inline bool is_read_result(iocp_result_t *r) {
+    return r && r->type == IOCP_READ;
+}
+
+static inline bool is_connect_result(iocp_result_t *r) {
+    return r && r->type == IOCP_CONNECT;
+}
+
+// From old io.c
+static void pni_configure_sock_2(pn_socket_t sock) {
+  //
+  // Disable the Nagle algorithm on TCP connections.
+  //
+
+  int flag = 1;
+  if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
+      //TODO: log/err
+  }
+
+  u_long nonblock = 1;
+  if (ioctlsocket(sock, FIONBIO, &nonblock)) {
+      // TODO: log/err
+  }
+}
+
+static LPFN_CONNECTEX lookup_connect_ex2(SOCKET s)
+{
+  GUID guid = WSAID_CONNECTEX;
+  DWORD bytes = 0;
+  LPFN_CONNECTEX fn;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid),
+           &fn, sizeof(fn), &bytes, NULL, NULL);
+  assert(fn);
+  return fn;
+}
+
+
+// File descriptor wrapper that calls ::close in destructor.
+class unique_socket {
+  public:
+    unique_socket(pn_socket_t socket) : socket_(socket) {}
+    ~unique_socket() { if (socket_ != INVALID_SOCKET) ::closesocket(socket_); }
+    operator pn_socket_t() const { return socket_; }
+    pn_socket_t release() { pn_socket_t ret = socket_; socket_ = INVALID_SOCKET; return ret; }
+
+  protected:
+    pn_socket_t socket_;
+};
+
+void do_complete(iocp_result_t *result) {
+  iocpdesc_t *iocpd = result->iocpd;  // connect result gets deleted
+  switch (result->type) {
+
+  case IOCP_ACCEPT:
+    /* accept is now processed inline to do in parallel, except on teardown */
+    assert(iocpd->closing);
+    complete_accept((accept_result_t *) result, result->status);  // free's result and retires new_sock
+    break;
+  case IOCP_CONNECT:
+    complete_connect((connect_result_t *) result, result->status);
+    break;
+  case IOCP_WRITE:
+    complete_write((write_result_t *) result, result->num_transferred, result->status);
+    break;
+  case IOCP_READ:
+    complete_read((read_result_t *) result, result->num_transferred, result->status);
+    break;
+  default:
+    assert(false);
+  }
+
+  iocpd->ops_in_progress--;  // Set in each begin_xxx call
+}
+
+
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+  return !(ps->is_reaper || ps->listener) ? (pconnection_t*)ps : NULL;
+}
+
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+  return ps->listener;
+}
+
+static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
+  return c->type == PCONNECTION ?
+    (pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
+}
+static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
+  return c->type == LISTENER ?
+    (pn_listener_t*)((char*)c - offsetof(pn_listener_t, context)) : NULL;
+}
+
+static pcontext_t *psocket_context(psocket_t *ps) {
+  if (ps->listener)
+    return &ps->listener->context;
+  pconnection_t *pc = as_pconnection_t(ps);
+  return &pc->context;
+}
+
+
+// Call wih lock held
+static inline void wake_complete(pcontext_t *ctx) {
+  ctx->wake_pending = false;
+  ctx->completion_ops--;
+}
+
+// Call wih lock held
+static void wakeup(psocket_t *ps) {
+  pcontext_t *ctx = psocket_context(ps);
+  if (!ctx->working && !ctx->wake_pending) {
+    ctx->wake_pending = true;
+    ctx->completion_ops++;
+    post_completion(ctx->proactor->iocp, psocket_wakeup_key, ps);
+  }
+}
+
+// Call wih lock held
+static inline void proactor_wake_complete(pn_proactor_t *p) {
+  wake_complete(&p->context);
+}
+
+// Call wih lock held
+static void proactor_wake(pn_proactor_t *p) {
+  if (!p->context.working && !p->context.wake_pending) {
+    p->context.wake_pending = true;
+    p->context.completion_ops++;
+    post_completion(p->iocp, proactor_wake_key, p);
+  }
+}
+
+VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ );
+
+// Serialize handling listener and connection closing IO completions
+// after the engine has lost interest.  A temporary convenience while
+// using the old single threaded iocp/io/select driver code.
+class reaper {
+  public:
+    reaper(pn_proactor_t *p, CRITICAL_SECTION *wlock, iocp_t *iocp)
+      : iocp_(iocp), global_wlock_(wlock), timer_(NULL), running(true) {
+      InitializeCriticalSectionAndSpinCount(&lock_, 4000);
+      timer_queue_ = CreateTimerQueue();
+      if (!timer_queue_) {
+          perror("CreateTimerQueue");
+          abort();
+      }
+    }
+
+  // Connection or listener lock must also be held by caller.
+    bool add(iocpdesc_t *iocpd) {
+        if (!iocpd) return false;
+        csguard g(&lock_);
+        if (iocpd->closing) return false;
+        bool rval = !iocpd->ops_in_progress;
+        pni_iocp_begin_close(iocpd); // sets iocpd->closing
+        pn_decref(iocpd);            // may still be ref counted on zombie list
+        reap_timer();
+        return rval;
+    }
+
+    // For cases where the close will be immediate.  I.E. after a failed
+    // connection attempt where there is no follow-on IO.
+    void fast_reap(iocpdesc_t *iocpd) {
+        assert(iocpd && iocpd->ops_in_progress == 0 && !iocpd->closing);
+        csguard g(&lock_);
+        pni_iocp_begin_close(iocpd);
+        pn_decref(iocpd);
+    }
+
+    bool process(iocp_result_t *result) {
+        // No queue of completions for the reaper.  Just process
+        // serialized by the lock assuming all actions are "short".
+        // That may be wrong, and if so the real fix is not a
+        // consumer/producer setup but just replace the reaper with a
+        // multi threaded alternative.
+        csguard g(&lock_);
+        iocpdesc_t *iocpd = result->iocpd;
+        if (is_write_result(result)) {
+            csguard wg(global_wlock_);
+            do_complete(result);
+        }
+        else do_complete(result);
+        // result may now be NULL
+        bool rval = (iocpd->ops_in_progress == 0);
+        pni_iocp_reap_check(iocpd);
+        return rval;
+    }
+
+    // Called when all competing threads have terminated except our own reap_check timer.
+    void final_shutdown() {
+        running = false;
+        DeleteTimerQueueEx(timer_queue_, INVALID_HANDLE_VALUE);
+        // No pending or active timers from thread pool remain.  Truly single threaded now.
+        pn_free((void *) iocp_); // calls pni_iocp_finalize(); cleans up all sockets, completions, completion port.
+        DeleteCriticalSection(&lock_);
+    }
+
+    void reap_check() {
+        csguard g(&lock_);
+        DeleteTimerQueueTimer(timer_queue_, timer_, NULL);
+        timer_ = NULL;
+        reap_timer();
+    }
+
+
+  private:
+    void reap_timer() {
+        // Call with lock
+        if (timer_ || !running)
+            return;
+        pn_timestamp_t now = pn_i_now2();
+        pni_zombie_check(iocp_, now);
+        pn_timestamp_t zd = pni_zombie_deadline(iocp_);
+        if (zd) {
+            DWORD tm = (zd > now) ? zd - now : 1;
+            if (!CreateTimerQueueTimer(&timer_, timer_queue_, reap_check_cb, this, tm,
+                                       0, WT_EXECUTEONLYONCE)) {
+                perror("CreateTimerQueueTimer");
+                abort();
+            }
+        }
+    }
+
+    iocp_t *iocp_;
+    CRITICAL_SECTION lock_;
+    CRITICAL_SECTION *global_wlock_;
+    HANDLE timer_queue_;
+    HANDLE timer_;
+    bool running;
+};
+
+VOID CALLBACK reap_check_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
+    // queue timer callback
+    reaper *r = static_cast<reaper *>(arg);
+    r->reap_check();
+}
+
+static void listener_begin_close(pn_listener_t* l);
+static void connect_step_done(pconnection_t *pc, connect_result_t *result);
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
+
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+  return (batch->next_event == proactor_batch_next) ?
+    (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+  return (batch->next_event == listener_batch_next) ?
+    (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+  return (batch->next_event == pconnection_batch_next) ?
+    (pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL;
+}
+
+static inline bool pconnection_has_event(pconnection_t *pc) {
+  return pn_connection_driver_has_event(&pc->driver);
+}
+
+static inline bool listener_has_event(pn_listener_t *l) {
+  return pn_collector_peek(l->collector);
+}
+
+static inline bool proactor_has_event(pn_proactor_t *p) {
+  return pn_collector_peek(p->collector);
+}
+
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+  if (e) {
+    pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
+  }
+  return e;
+}
+
+static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
+  if (ps->is_reaper)
+    return;
+  if (!ps->listener) {
+    pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+    pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg);
+    pn_connection_driver_close(driver);
+  } else {
+    pn_listener_t *l = as_listener(ps);
+    pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
+    listener_begin_close(l);
+  }
+}
+
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+  psocket_error_str(ps, errno_str2(err).c_str(), what);
+}
+
+
+
+// ========================================================================
+// pconnection
+// ========================================================================
+
+/* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
+#define CID_pconnection CID_pn_object
+#define pconnection_inspect NULL
+#define pconnection_initialize NULL
+#define pconnection_hashcode NULL
+#define pconnection_compare NULL
+
+static void pconnection_finalize(void *vp_pconnection) {
+  pconnection_t *pc = (pconnection_t*)vp_pconnection;
+  pcontext_finalize(&pc->context);
+}
+
+static const pn_class_t pconnection_class = PN_CLASS(pconnection);
+
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr)
+{
+  if (pn_connection_driver_init(&pc->driver, c, t) != 0) {
+    free(pc);
+    return "pn_connection_driver_init failure";
+  }
+  {
+    csguard g(&p->bind_lock);
+    pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+    if (pn_record_get(r, PN_PROACTOR)) {
+      pn_connection_driver_destroy(&pc->driver);
+      free(pc);
+      return "pn_connection_t already in use";
+    }
+    pn_record_def(r, PN_PROACTOR, &pconnection_class);
+    pn_record_set(r, PN_PROACTOR, pc);
+    pc->bound = true;
+    pc->can_wake = true;
+  }
+
+  pc->completion_queue = new std::queue<iocp_result_t *>();
+  pc->work_queue = new std::queue<iocp_result_t *>();
+  pcontext_init(&pc->context, PCONNECTION, p, pc);
+
+  psocket_init(&pc->psocket, NULL, false, addr);
+  pc->batch.next_event = pconnection_batch_next;
+  if (server) {
+    pn_transport_set_server(pc->driver.transport);
+  }
+
+  pn_decref(pc);                /* Will be deleted when the connection is */
+  return NULL;
+}
+
+// Either stops a timer before firing or returns after the callback has
+// completed (in the threadpool thread).  Never "in doubt".
+static bool stop_timer(HANDLE tqueue, HANDLE *timer) {
+  if (!*timer) return true;
+  if (DeleteTimerQueueTimer(tqueue, *timer, INVALID_HANDLE_VALUE)) {
+    *timer = NULL;
+    return true;
+  }
+  return false;  // error
+}
+
+static bool start_timer(HANDLE tqueue, HANDLE *timer, WAITORTIMERCALLBACK cb, void *cb_arg, DWORD time) {
+  if (*timer) {
+    // TODO: log err
+    return false;
+  }
+  return CreateTimerQueueTimer(timer, tqueue, cb, cb_arg, time, 0, WT_EXECUTEONLYONCE);
+}
+
+VOID CALLBACK tick_timer_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
+  pconnection_t *pc = (pconnection_t *) arg;
+  csguard g(&pc->context.cslock);
+  if (pc->psocket.iocpd && !pc->psocket.iocpd->closing) {
+    pc->tick_pending = true;
+    wakeup(&pc->psocket);
+  }
+}
+
+
+// Call with no lock held or stop_timer and callback may deadlock
+static void pconnection_tick(pconnection_t *pc) {
+  pn_transport_t *t = pc->driver.transport;
+  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
+    if(!stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer)) {
+      // TODO: handle error
+    }
+    uint64_t now = pn_i_now2();
+    uint64_t next = pn_transport_tick(t, now);
+    if (next) {
+      if (!start_timer(pc->context.proactor->timer_queue, &pc->tick_timer, tick_timer_cb, pc, next - now)) {
+        // TODO: handle error
+      }
+    }
+  }
+}
+
+static pconnection_t *get_pconnection(pn_connection_t* c) {
+  if (!c) return NULL;
+  pn_record_t *r = pn_connection_attachments(c);
+  return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
+}
+
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+  if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+  pn_listener_t *l = pn_event_listener(e);
+  if (l) return l->context.proactor;
+  pn_connection_t *c = pn_event_connection(e);
+  if (c) return pn_connection_proactor(pn_event_connection(e));
+  return NULL;
+}
+
+// Call after successful accept
+static void set_sock_names(pconnection_t *pc) {
+  // This works.  Note possible use of GetAcceptExSockaddrs()
+  pn_socket_t sock = pc->psocket.iocpd->socket;
+  socklen_t len = sizeof(pc->local.ss);
+  getsockname(sock, (struct sockaddr*)&pc->local.ss, &len);
+  len = sizeof(pc->remote.ss);
+  getpeername(sock, (struct sockaddr*)&pc->remote.ss, &len);
+}
+
+
+// Call with lock held when closing and transitioning away from working context
+static inline bool pconnection_can_free(pconnection_t *pc) {
+  return pc->psocket.iocpd == NULL && pc->context.completion_ops == 0
+    && !pc->stop_timer_required && !pconnection_has_event(pc) && !pc->queued_disconnect;
+}
+
+static void pconnection_final_free(pconnection_t *pc) {
+  if (pc->addrinfo) {
+    freeaddrinfo(pc->addrinfo);
+  }
+  pn_condition_free(pc->disconnect_condition);
+  pn_incref(pc);                /* Make sure we don't do a circular free */
+  pn_connection_driver_destroy(&pc->driver);
+  pn_decref(pc);
+  /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+}
+
+// Call with lock held or from forced shutdown
+static void pconnection_begin_close(pconnection_t *pc) {
+  if (!pc->context.closing) {
+    pc->context.closing = true;
+    pn_connection_driver_close(&pc->driver);
+    pc->stop_timer_required = true;
+    if (pc->context.proactor->reaper->add(pc->psocket.iocpd))
+      pc->psocket.iocpd = NULL;
+    wakeup(&pc->psocket);
+  }
+}
+
+// call with lock held.  return true if caller must call pconnection_final_free()
+static bool pconnection_cleanup(pconnection_t *pc) {
+  delete pc->completion_queue;
+  delete pc->work_queue;
+  return proactor_remove(&pc->context);
+}
+
+static inline bool pconnection_work_pending(pconnection_t *pc) {
+  if (pc->completion_queue->size() || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
+    return true;
+  if (!pc->started)
+    return false;
+  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+  return (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE));
+}
+
+// Return true unless reaped
+static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
+  ssize_t n;
+  bool wouldblock;
+  {
+    csguard g(&pc->context.proactor->write_lock);
+    n = pni_iocp_begin_write(pc->psocket.iocpd, wbuf.start, wbuf.size, &wouldblock, pc->psocket.iocpd->error);
+  }
+  if (n > 0) {
+    pn_connection_driver_write_done(&pc->driver, n);
+  } else if (n < 0 && !wouldblock) {
+    psocket_error(&pc->psocket, WSAGetLastError(), "on write to");
+  }
+  else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+    if (pc->context.proactor->reaper->add(pc->psocket.iocpd)) {
+      pc->psocket.iocpd = NULL;
+      return false;
+    }
+  }
+  return true;
+}
+
+// Queue the result (if any) for the one worker thread.  Become the worker if possible.
+// NULL result is a wakeup or a topup.
+// topup signifies that the working thread is the caller looking for additional events.
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, iocp_result_t *result, bool topup) {
+  bool first = true;
+  bool waking = false;
+  bool tick_required = false;
+  bool open = false;
+
+  while (true) {
+    {
+      csguard g(&pc->context.cslock);
+      if (first) {
+        first = false;
+        if (result)
+          pc->completion_queue->push(result);
+        else if (!topup)
+          wake_complete(&pc->context);
+        if (!topup) {
+          if (pc->context.working)
+            return NULL;
+          pc->context.working = true;
+        }
+        open = pc->started && !pc->connecting && !pc->context.closing;
+      }
+      else {
+        // Just re-acquired lock after processing IO and engine work
+        if (pconnection_has_event(pc))
+          return &pc->batch;
+
+        if (!pconnection_work_pending(pc)) {
+          pc->context.working = false;
+          if (pn_connection_driver_finished(&pc->driver)) {
+            pconnection_begin_close(pc);
+          }
+          if (pc->context.closing && pconnection_can_free(pc)) {
+            if (pconnection_cleanup(pc)) {
+              g.release();
+              pconnection_final_free(pc);
+              return NULL;
+            } // else disconnect logic has the free obligation
+          }
+          return NULL;
+        }
+      }
+
+      if (pc->queued_disconnect) {  // From pn_proactor_disconnect()
+        pc->queued_disconnect = false;
+        if (!pc->context.closing) {
+          if (pc->disconnect_condition) {
+            pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
+          }
+          pn_connection_driver_close(&pc->driver);
+        }
+      }
+
+      assert(pc->work_queue->empty());
+      if (pc->completion_queue->size())
+          std::swap(pc->work_queue, pc->completion_queue);
+
+      if (pc->wake_count) {
+        waking = open && pc->can_wake && !pn_connection_driver_finished(&pc->driver);
+        pc->wake_count = 0;
+      }
+      if (pc->tick_pending) {
+        pc->tick_pending = false;
+        if (open)
+          tick_required = true;
+      }
+    }
+
+    // No lock
+
+    // Drain completions prior to engine work
+    while (pc->work_queue->size()) {
+      result = (iocp_result_t *) pc->work_queue->front();
+      pc->work_queue->pop();
+      if (result->iocpd->closing) {
+        if (pc->context.proactor->reaper->process(result)) {
+          pc->psocket.iocpd = NULL;  // reaped
+          open = false;
+        }
+      }
+      else {
+        if (is_write_result(result)) {
+          csguard g(&pc->context.proactor->write_lock);
+          do_complete(result);
+        }
+        else if (is_connect_result(result)) {
+          connect_step_done(pc, (connect_result_t *) result);
+          if (pc->psocket.iocpd && (pc->psocket.iocpd->events & PN_WRITABLE)) {
+            pc->connecting = false;
+            if (pc->started)
+              open = true;
+          }
+        }
+        else do_complete(result);
+      }
+    }
+
+    if (!open) {
+      if (pc->stop_timer_required) {
+        pc->stop_timer_required = false;
+        // Do without context lock to avoid possible deadlock
+        stop_timer(pc->context.proactor->timer_queue, &pc->tick_timer);
+      }
+    } else {
+
+      pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+      pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+
+      /* Ticks and checking buffers can generate events, process before proceeding */
+      bool ready = pconnection_has_event(pc);
+      if (waking) {
+        pn_connection_t *c = pc->driver.connection;
+        pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+        waking = false;
+      }
+      if (ready) {
+        continue;
+      }
+
+      if (wbuf.size >= 16384 && (pc->psocket.iocpd->events & PN_WRITABLE)) {
+        if (!pconnection_write(pc, wbuf))
+          continue;
+      }
+      if (rbuf.size > 0 && !pc->psocket.iocpd->read_in_progress) {
+        bool wouldblock;
+        ssize_t n = pni_iocp_recv(pc->psocket.iocpd, rbuf.start, rbuf.size, &wouldblock, pc->psocket.iocpd->error);
+
+        if (n > 0) {
+          pn_connection_driver_read_done(&pc->driver, n);
+          pconnection_tick(pc);         /* check for tick changes. */
+          tick_required = false;
+        }
+        else if (n == 0)
+          pn_connection_driver_read_close(&pc->driver);
+        else if (!wouldblock)
+          psocket_error(&pc->psocket, WSAGetLastError(), "on read from");
+      }
+      if (!pc->psocket.iocpd->read_in_progress)
+        start_reading(pc->psocket.iocpd);
+
+
+      if (tick_required) {
+        pconnection_tick(pc);         /* check for tick changes. */
+        tick_required = false;
+      }
+      wbuf = pn_connection_driver_write_buffer(&pc->driver);
+      if (wbuf.size > 0 && (pc->psocket.iocpd->events & PN_WRITABLE)) {
+        if (!pconnection_write(pc, wbuf))
+          open = false;
+      }
+    }
+
+    if (topup)
+      return NULL;  // regardless if new events made available
+  }
+}
+
+
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
+  if (!e && ++pc->hog_count < HOG_MAX) {
+    pconnection_process(pc, NULL, true);  // top up
+    e = pn_connection_driver_next_event(&pc->driver);
+  }
+  if (e && !pc->started && pn_event_type(e) == PN_CONNECTION_BOUND)
+    pc->started = true; // SSL will be set up on return and safe to do IO with correct transport layers
+  return e;
+}
+
+static void pconnection_done(pconnection_t *pc) {
+  {
+    csguard g(&pc->context.cslock);
+    pc->context.working = false;
+    pc->hog_count = 0;
+    if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
+      wakeup(&pc->psocket);
+    } else if (pn_connection_driver_finished(&pc->driver)) {
+      pconnection_begin_close(pc);
+      wakeup(&pc->psocket);
+    }
+  }
+}
+
+static inline bool is_inactive(pn_proactor_t *p) {
+  return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->need_timeout && !p->shutting_down);
+}
+
+// Call whenever transitioning from "definitely active" to "maybe inactive"
+static void wake_if_inactive(pn_proactor_t *p) {
+  if (is_inactive(p)) {
+    p->need_inactive = true;
+    proactor_wake(p);
+  }
+}
+
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (pc) {
+    pconnection_done(pc);
+    return;
+  }
+  pn_listener_t *l = batch_listener(batch);
+  if (l) {
+    listener_done(l);
+    return;
+  }
+  pn_proactor_t *bp = batch_proactor(batch);
+  if (bp == p) {
+    csguard g(&p->context.cslock);
+    p->context.working = false;
+    if (p->delayed_interrupt) {
+      p->delayed_interrupt = false;
+      p->need_interrupt = true;
+    }
+    if (p->timeout_processed) {
+      p->timeout_processed = false;
+      wake_if_inactive(p);
+    }
+    if (proactor_update_batch(p))
+      proactor_wake(p);
+    return;
+  }
+}
+
+static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) {
+  pn_collector_put(p->collector, pn_proactor__class(), p, t);
+}
+
+static pn_event_batch_t *proactor_process(pn_proactor_t *p) {
+  csguard g(&p->context.cslock);
+  proactor_wake_complete(p);
+  if (!p->context.working) {       /* Can generate proactor events */
+    if (proactor_update_batch(p)) {
+      p->context.working = true;
+      return &p->batch;
+    }
+  }
+  return NULL;
+}
+
+static pn_event_batch_t *psocket_process(psocket_t *ps, iocp_result_t *result, reaper *rpr) {
+  if (ps) {
+    pconnection_t *pc = as_pconnection_t(ps);
+    if (pc) {
+      return pconnection_process(pc, result, false);
+    } else {
+      pn_listener_t *l = as_listener(ps);
+      if (l)
+        return listener_process(l, result);
+    }
+  }
+  rpr->process(result);
+  return NULL;
+}
+
+static pn_event_batch_t *proactor_completion_loop(struct pn_proactor_t* p, bool can_block) {
+  // Proact! Process inbound completions of async activity until one
+  // of them provides a batch of events.
+  while(true) {
+    pn_event_batch_t *batch = NULL;
+
+    DWORD win_timeout = can_block ? INFINITE : 0;
+    DWORD num_xfer = 0;
+    ULONG_PTR completion_key = 0;
+    OVERLAPPED *overlapped = 0;
+
+    bool good_op = GetQueuedCompletionStatus (p->iocp->completion_port, &num_xfer,
+                                              &completion_key, &overlapped, win_timeout);
+    if (!overlapped && !can_block && GetLastError() == WAIT_TIMEOUT)
+      return NULL;  // valid timeout
+
+    if (!good_op && !overlapped) {
+      // Should never happen.  shutdown?
+      // We aren't expecting a timeout, closed completion port, or other error here.
+      pn_logf("%s", errno_str("Windows Proton proactor internal failure\n", false).c_str());
+      abort();
+    }
+
+    if (completion_key == NULL) {
+      // Normal IO case for connections and listeners
+      iocp_result_t *result = (iocp_result_t *) overlapped;
+      result->status = good_op ? 0 : GetLastError();
+      result->num_transferred = num_xfer;
+      psocket_t *ps = (psocket_t *) result->iocpd->active_completer;
+      batch = psocket_process(ps, result, p->reaper);
+    }
+    else {
+      // completion_key on our completion port is always null unless set by us
+      // in PostQueuedCompletionStatus.  In which case, we hijack the overlapped
+      // data structure for our own use.
+      if (completion_key == psocket_wakeup_key)
+        batch = psocket_process((psocket_t *) overlapped, NULL, p->reaper);
+      else if (completion_key == proactor_wake_key)
+        batch = proactor_process((pn_proactor_t *) overlapped);
+      else if (completion_key == recycle_accept_key)
+        recycle_result((accept_result_t *) overlapped);
+    }
+    if (batch) return batch;
+    // No event generated.  Try again with next completion.
+  }
+}
+
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+  return proactor_completion_loop(p, true);
+}
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+  return proactor_completion_loop(p, false);
+}
+
+void pn_proactor_interrupt(pn_proactor_t *p) {
+  csguard g(&p->context.cslock);
+  if (p->context.working)
+    p->delayed_interrupt = true;
+  else
+    p->need_interrupt = true;
+  proactor_wake(p);
+}
+
+// runs on a threadpool thread.  Must not hold timer_lock.
+VOID CALLBACK timeout_cb(PVOID arg, BOOLEAN /* ignored*/ ) {
+  pn_proactor_t *p = (pn_proactor_t *) arg;
+  csguard gtimer(&p->timer_lock);
+  csguard g(&p->context.cslock);
+  if (p->timeout_set)
+    p->need_timeout = true;  // else cancelled
+  p->timeout_set = false;
+  if (p->need_timeout)
+    proactor_wake(p);
+}
+
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+  bool ticking = false;
+  csguard gtimer(&p->timer_lock);
+  {
+    csguard g(&p->context.cslock);
+    ticking = (p->timeout_timer != NULL);
+    if (t == 0) {
+      p->need_timeout = true;
+      p->timeout_set = false;
+      proactor_wake(p);
+    }
+    else
+      p->timeout_set = true;
+  }
+  // Just timer_lock held
+  if (ticking) {
+    stop_timer(p->timer_queue, &p->timeout_timer);
+  }
+  if (t) {
+    start_timer(p->timer_queue, &p->timeout_timer, timeout_cb, p, t);
+  }
+}
+
+void pn_proactor_cancel_timeout(pn_proactor_t *p) {
+  bool ticking = false;
+  csguard gtimer(&p->timer_lock);
+  {
+    csguard g(&p->context.cslock);
+    p->timeout_set = false;
+    ticking = (p->timeout_timer != NULL);
+  }
+  if (ticking) {
+    stop_timer(p->timer_queue, &p->timeout_timer);
+    csguard g(&p->context.cslock);
+    wake_if_inactive(p);
+  }
+}
+
+// Return true if connect_step_done()will handle connection status
+static bool connect_step(pconnection_t *pc) {
+  pn_proactor_t *p = pc->context.proactor;
+  while (pc->ai) {            /* Have an address */
+    struct addrinfo *ai = pc->ai;
+    pc->ai = pc->ai->ai_next; /* Move to next address in case this fails */
+    unique_socket fd(::socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol));
+    if (fd != INVALID_SOCKET) {
+      // Windows ConnectEx requires loosely bound socket.
+      sockaddr_storage sa;
+      memset(&sa, 0, sizeof(sa));
+      sa.ss_family = ai->ai_family;
+      if (!bind(fd, (SOCKADDR *) &sa, ai->ai_addrlen)) {
+        pni_configure_sock_2(fd);
+        pc->psocket.iocpd = pni_iocpdesc_create(p->iocp, fd);
+        assert(pc->psocket.iocpd);
+        pc->psocket.iocpd->write_closed = true;
+        pc->psocket.iocpd->read_closed = true;
+        fd.release();
+        iocpdesc_t *iocpd = pc->psocket.iocpd;
+        if (CreateIoCompletionPort ((HANDLE) iocpd->socket, iocpd->iocp->completion_port, 0, 0)) {
+          LPFN_CONNECTEX fn_connect_ex = lookup_connect_ex2(iocpd->socket);
+          // addrinfo is owned by the pconnection so pass NULL to the connect result
+          connect_result_t *result = connect_result(iocpd, NULL);
+          DWORD unused;
+          bool success = fn_connect_ex(iocpd->socket, ai->ai_addr, ai->ai_addrlen,
+                                       NULL, 0, &unused, (LPOVERLAPPED) result);
+          if (success || WSAGetLastError() == ERROR_IO_PENDING) {
+            iocpd->ops_in_progress++;
+            iocpd->active_completer = &pc->psocket;
+            // getpeername unreliable for outgoing connections, but we know it at this point
+            memcpy(&pc->remote.ss, ai->ai_addr, ai->ai_addrlen);
+            return true;  // logic resumes at connect_step_done()
+          }
+          pn_free(result);
+        }
+      }
+      if (pc->psocket.iocpd) {
+        pc->context.proactor->reaper->fast_reap(pc->psocket.iocpd);
+        pc->psocket.iocpd = NULL;
+      }
+    }
+  }
+  pc->context.closing = true;
+  return false;
+}
+
+static void connect_step_done(pconnection_t *pc, connect_result_t *result) {
+  csguard g(&pc->context.cslock);
+  DWORD saved_status = result->base.status;
+  iocpdesc_t *iocpd = result->base.iocpd;
+  iocpd->ops_in_progress--;
+  assert(pc->psocket.iocpd == iocpd);
+  complete_connect(result, result->base.status);  // frees result, starts regular IO if connected
+
+  if (!saved_status) {
+    // Success
+    pc->psocket.iocpd->write_closed = false;
+    pc->psocket.iocpd->read_closed = false;
+    if (pc->addrinfo) {
+      socklen_t len = sizeof(pc->local.ss);
+      getsockname(pc->psocket.iocpd->socket, (struct sockaddr*)&pc->local.ss, &len);
+      freeaddrinfo(pc->addrinfo);
+      pc->addrinfo = NULL;
+    }
+    pc->ai = NULL;
+    return;
+  }
+  else {
+    // Descriptor will never be used.  Dispose.
+    // Connect failed, no IO started, i.e. no pending iocpd based events
+    pc->context.proactor->reaper->fast_reap(iocpd);
+    pc->psocket.iocpd = NULL;
+    memset(&pc->remote.ss, 0, sizeof(pc->remote.ss));
+    // Is there a next connection target in the addrinfo to try?
+    if (pc->ai && connect_step(pc)) {
+      // Trying the next addrinfo possibility.  Will return here.
+      return;
+    }
+    // Give up
+    psocket_error(&pc->psocket, saved_status, "connect to ");
+    pc->context.closing = true;
+    wakeup(&pc->psocket);
+  }
+}
+
+void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
+  pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t));
+  assert(pc); // TODO: memory safety
+  const char *err = pconnection_setup(pc, p, c, t, false, addr);
+  if (err) {
+    pn_logf("pn_proactor_connect failure: %s", err);
+    return;
+  }
+  // TODO: check case of proactor shutting down
+  csguard g(&pc->context.cslock);
+  pc->connecting = true;
+  proactor_add(&pc->context);
+  pn_connection_open(pc->driver.connection); /* Auto-open */
+
+  if (!pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo)) {
+    pc->ai = pc->addrinfo;
+    if (connect_step(pc)) {
+      return;
+    }
+  }
+  psocket_error(&pc->psocket, WSAGetLastError(), "connect to ");
+  wakeup(&pc->psocket);
+  if (p->reaper->add(pc->psocket.iocpd)) {
+    pc->psocket.iocpd = NULL;
+  }
+}
+
+void pn_proactor_release_connection(pn_connection_t *c) {
+  bool notify = false;
+  pconnection_t *pc = g

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message