qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [34/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree
Date Tue, 03 Jul 2018 22:13:23 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
new file mode 100644
index 0000000..8ff5831
--- /dev/null
+++ b/c/src/proactor/epoll.c
@@ -0,0 +1,2217 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */
+#ifndef _POSIX_C_SOURCE
+#define _POSIX_C_SOURCE 200809L
+#endif
+/* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */
+#undef _GNU_SOURCE
+
+#include "../core/log_private.h"
+#include "./proactor-internal.h"
+
+#include <proton/condition.h>
+#include <proton/connection_driver.h>
+#include <proton/engine.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <proton/listener.h>
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <errno.h>
+#include <pthread.h>
+#include <sys/timerfd.h>
+#include <sys/epoll.h>
+#include <unistd.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <netinet/tcp.h>
+#include <sys/eventfd.h>
+#include <limits.h>
+#include <time.h>
+
+#include "./netaddr-internal.h" /* Include after socket/inet headers */
+
+// TODO: replace timerfd per connection with global lightweight timer mechanism.
+// logging in general
+// SIGPIPE?
+// Can some of the mutexes be spinlocks (any benefit over adaptive pthread mutex)?
+//   Maybe futex is even better?
+// See other "TODO" in code.
+//
+// Consider case of large number of wakes: proactor_do_epoll() could start by
+// looking for pending wakes before a kernel call to epoll_wait(), or there
+// could be several eventfds with random assignment of wakeables.
+
+
+typedef char strerrorbuf[1024];      /* used for pstrerror message buffer */
+
+/* Like strerror_r but provide a default message if strerror_r fails */
+static void pstrerror(int err, strerrorbuf msg) {
+  int e = strerror_r(err, msg, sizeof(strerrorbuf));
+  if (e) snprintf(msg, sizeof(strerrorbuf), "unknown error %d", err);
+}
+
+/* Internal error, no recovery */
+#define EPOLL_FATAL(EXPR, SYSERRNO)                                     \
+  do {                                                                  \
+    strerrorbuf msg;                                                    \
+    pstrerror((SYSERRNO), msg);                                         \
+    fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n",        \
+            __FILE__, __LINE__ , #EXPR, msg);                           \
+    abort();                                                            \
+  } while (0)
+
+// ========================================================================
+// First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste.
+// ========================================================================
+
+// In general all locks to be held singly and shortly (possibly as spin locks).
+// Exception: psockets+proactor for pn_proactor_disconnect (convention: acquire
+// psocket first to avoid deadlock).  TODO: revisit the exception and its
+// awkwardness in the code (additional mutex? different type?).
+
+typedef pthread_mutex_t pmutex;
+static void pmutex_init(pthread_mutex_t *pm){
+  pthread_mutexattr_t attr;
+
+  pthread_mutexattr_init(&attr);
+  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+  if (pthread_mutex_init(pm, &attr)) {
+    perror("pthread failure");
+    abort();
+  }
+}
+
+static void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m); }
+static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
+static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
+
+typedef struct acceptor_t acceptor_t;
+
+typedef enum {
+  WAKE,   /* see if any work to do in proactor/psocket context */
+  PCONNECTION_IO,
+  PCONNECTION_TIMER,
+  LISTENER_IO,
+  PROACTOR_TIMER } epoll_type_t;
+
+// Data to use with epoll.
+typedef struct epoll_extended_t {
+  struct psocket_t *psocket;  // pconnection, listener, or NULL -> proactor
+  int fd;
+  epoll_type_t type;   // io/timer/wakeup
+  uint32_t wanted;     // events to poll for
+  bool polling;
+  pmutex barrier_mutex;
+} epoll_extended_t;
+
+/* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
+   writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
+   visible to epoll_wait() thread. This function creates a memory barrier,
+   called before epoll_ctl() and after epoll_wait()
+*/
+static void memory_barrier(epoll_extended_t *ee) {
+  // Mutex lock/unlock has the side-effect of being a memory barrier.
+  lock(&ee->barrier_mutex);
+  unlock(&ee->barrier_mutex);
+}
+
+/*
+ * This timerfd logic assumes EPOLLONESHOT and there never being two
+ * active timeout callbacks.  There can be multiple (or zero)
+ * unclaimed expiries processed in a single callback.
+ *
+ * timerfd_set() documentation implies a crisp relationship between
+ * timer expiry count and oldt's return value, but a return value of
+ * zero is ambiguous.  It can lead to no EPOLLIN, EPOLLIN + expected
+ * read, or
+ *
+ *   event expiry (in kernel) -> EPOLLIN
+ *   cancel/settime(0) (thread A) (number of expiries resets to zero)
+ *   read(timerfd) -> -1, EAGAIN  (thread B servicing epoll event)
+ *
+ * The original implementation with counters to track expiry counts
+ * was abandoned in favor of "in doubt" transitions and resolution
+ * at shutdown.
+ */
+
+typedef struct ptimer_t {
+  pmutex mutex;
+  int timerfd;
+  epoll_extended_t epoll_io;
+  bool timer_active;
+  bool in_doubt;  // 0 or 1 callbacks are possible
+  bool shutting_down;
+} ptimer_t;
+
+static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
+  pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+  pmutex_init(&pt->mutex);
+  pt->timer_active = false;
+  pt->in_doubt = false;
+  pt->shutting_down = false;
+  epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER;
+  pt->epoll_io.psocket = ps;
+  pt->epoll_io.fd = pt->timerfd;
+  pt->epoll_io.type = type;
+  pt->epoll_io.wanted = EPOLLIN;
+  pt->epoll_io.polling = false;
+  return (pt->timerfd >= 0);
+}
+
+// Call with ptimer lock held
+static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) {
+  struct itimerspec newt, oldt;
+  memset(&newt, 0, sizeof(newt));
+  newt.it_value.tv_sec = t_millis / 1000;
+  newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
+
+  timerfd_settime(pt->timerfd, 0, &newt, &oldt);
+  if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec == 0) {
+    // EPOLLIN is possible but not assured
+    pt->in_doubt = true;
+  }
+  pt->timer_active = t_millis;
+}
+
+static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
+  // t_millis == 0 -> cancel
+  lock(&pt->mutex);
+  if ((t_millis == 0 && !pt->timer_active) || pt->shutting_down) {
+    unlock(&pt->mutex);
+    return;  // nothing to do
+  }
+  ptimer_set_lh(pt, t_millis);
+  unlock(&pt->mutex);
+}
+
+/* Read from a timer or event FD */
+static uint64_t read_uint64(int fd) {
+  uint64_t result = 0;
+  ssize_t n = read(fd, &result, sizeof(result));
+  if (n != sizeof(result) && !(n < 0 && errno == EAGAIN)) {
+    EPOLL_FATAL("timerfd or eventfd read error", errno);
+  }
+  return result;
+}
+
+// Callback bookkeeping. Return true if there is an expired timer.
+static bool ptimer_callback(ptimer_t *pt) {
+  lock(&pt->mutex);
+  struct itimerspec current;
+  if (timerfd_gettime(pt->timerfd, &current) == 0) {
+    if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0)
+      pt->timer_active = false;
+  }
+  uint64_t u_exp_count = read_uint64(pt->timerfd);
+  if (!pt->timer_active) {
+    // Expiry counter just cleared, timer not set, timerfd not armed
+    pt->in_doubt = false;
+  }
+  unlock(&pt->mutex);
+  return u_exp_count > 0;
+}
+
+// Return true if timerfd has and will have no pollable expiries in the current armed state
+static bool ptimer_shutdown(ptimer_t *pt, bool currently_armed) {
+  lock(&pt->mutex);
+  if (currently_armed) {
+    ptimer_set_lh(pt, 0);
+    pt->shutting_down = true;
+    if (pt->in_doubt)
+      // Force at least one callback.  If two, second cannot proceed with unarmed timerfd.
+      ptimer_set_lh(pt, 1);
+  }
+  else
+    pt->shutting_down = true;
+  bool rv = !pt->in_doubt;
+  unlock(&pt->mutex);
+  return rv;
+}
+
+static void ptimer_finalize(ptimer_t *pt) {
+  if (pt->timerfd >= 0) close(pt->timerfd);
+  pmutex_finalize(&pt->mutex);
+}
+
+pn_timestamp_t pn_i_now2(void)
+{
+  struct timespec now;
+  clock_gettime(CLOCK_REALTIME, &now);
+  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
+}
+
+// ========================================================================
+// Proactor common code
+// ========================================================================
+
+const char *AMQP_PORT = "5672";
+const char *AMQP_PORT_NAME = "amqp";
+
+// The number of times a connection event batch may be replenished for
+// a thread between calls to wait().  Some testing shows that
+// increasing this value above 1 actually slows performance slightly
+// and increases latency.
+#define HOG_MAX 1
+
+/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
+   Class definitions are for identification as pn_event_t context only.
+*/
+PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
+PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+
+static bool start_polling(epoll_extended_t *ee, int epollfd) {
+  if (ee->polling)
+    return false;
+  ee->polling = true;
+  struct epoll_event ev;
+  ev.data.ptr = ee;
+  ev.events = ee->wanted | EPOLLONESHOT;
+  memory_barrier(ee);
+  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
+}
+
+static void stop_polling(epoll_extended_t *ee, int epollfd) {
+  // TODO: check for error, return bool or just log?
+  if (ee->fd == -1 || !ee->polling || epollfd == -1)
+    return;
+  struct epoll_event ev;
+  ev.data.ptr = ee;
+  ev.events = 0;
+  memory_barrier(ee);
+  if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1)
+    EPOLL_FATAL("EPOLL_CTL_DEL", errno);
+  ee->fd = -1;
+  ee->polling = false;
+}
+
+/*
+ * The proactor maintains a number of serialization contexts: each
+ * connection, each listener, the proactor itself.  The serialization
+ * is presented to the application via each associated event batch.
+ *
+ * Multiple threads can be trying to do work on a single context
+ * (i.e. socket IO is ready and wakeup at same time). Mutexes are used
+ * to manage contention.  Some vars are only ever touched by one
+ * "working" thread and are accessed without holding the mutex.
+ *
+ * Currently internal wakeups (via wake()/wake_notify()) are used to
+ * force a context to check if it has work to do.  To minimize trips
+ * through the kernel, wake() is a no-op if the context has a working
+ * thread.  Conversely, a thread must never stop working without
+ * checking if it has newly arrived work.
+ *
+ * External wake operations, like pn_connection_wake() and are built on top of
+ * the internal wake mechanism.  The former coalesces multiple wakes until event
+ * delivery, the latter does not.  The WAKEABLE implementation can be modeled on
+ * whichever is more suited.
+ *
+ * pn_proactor_interrupt() must be async-signal-safe so it has a dedicated
+ * eventfd to allow a lock-free pn_proactor_interrupt() implementation.
+ */
+typedef enum {
+  PROACTOR,
+  PCONNECTION,
+  LISTENER,
+  WAKEABLE } pcontext_type_t;
+
+typedef struct pcontext_t {
+  pmutex mutex;
+  pn_proactor_t *proactor;  /* Immutable */
+  void *owner;              /* Instance governed by the context */
+  pcontext_type_t type;
+  bool working;
+  int wake_ops;             // unprocessed eventfd wake callback (convert to bool?)
+  struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+  bool closing;
+  // Next 4 are protected by the proactor mutex
+  struct pcontext_t* next;  /* Protected by proactor.mutex */
+  struct pcontext_t* prev;  /* Protected by proactor.mutex */
+  int disconnect_ops;           /* ops remaining before disconnect complete */
+  bool disconnecting;           /* pn_proactor_disconnect */
+} pcontext_t;
+
+static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) {
+  memset(ctx, 0, sizeof(*ctx));
+  pmutex_init(&ctx->mutex);
+  ctx->proactor = p;
+  ctx->owner = o;
+  ctx->type = t;
+}
+
+static void pcontext_finalize(pcontext_t* ctx) {
+  pmutex_finalize(&ctx->mutex);
+}
+
+/* common to connection and listener */
+typedef struct psocket_t {
+  pn_proactor_t *proactor;
+  // Remaining protected by the pconnection/listener mutex
+  int sockfd;
+  epoll_extended_t epoll_io;
+  pn_listener_t *listener;      /* NULL for a connection socket */
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
+} psocket_t;
+
+struct pn_proactor_t {
+  pcontext_t context;
+  int epollfd;
+  ptimer_t timer;
+  pn_collector_t *collector;
+  pcontext_t *contexts;         /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */
+  epoll_extended_t epoll_wake;
+  epoll_extended_t epoll_interrupt;
+  pn_event_batch_t batch;
+  size_t disconnects_pending;   /* unfinished proactor disconnects*/
+  // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch()
+  bool need_interrupt;
+  bool need_inactive;
+  bool need_timeout;
+  bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */
+  bool timeout_processed;  /* timeout event dispatched in the most recent event batch */
+  bool timer_armed; /* timer is armed in epoll */
+  bool shutting_down;
+  // wake subsystem
+  int eventfd;
+  pmutex eventfd_mutex;
+  bool wakes_in_progress;
+  pcontext_t *wake_list_first;
+  pcontext_t *wake_list_last;
+  // Interrupts have a dedicated eventfd because they must be async-signal safe.
+  int interruptfd;
+  // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here.
+  acceptor_t *overflow;
+  pmutex overflow_mutex;
+};
+
+static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
+
+/*
+ * Wake strategy with eventfd.
+ *  - wakees can be in the list only once
+ *  - wakers only write() if wakes_in_progress is false
+ *  - wakees only read() if about to set wakes_in_progress to false
+ * When multiple wakes are pending, the kernel cost is a single rearm().
+ * Otherwise it is the trio of write/read/rearm.
+ * Only the writes and reads need to be carefully ordered.
+ *
+ * Multiple eventfds could be used and shared amongst the pcontext_t's.
+ */
+
+// part1: call with ctx->owner lock held, return true if notify required by caller
+static bool wake(pcontext_t *ctx) {
+  bool notify = false;
+  if (!ctx->wake_ops) {
+    if (!ctx->working) {
+      ctx->wake_ops++;
+      pn_proactor_t *p = ctx->proactor;
+      lock(&p->eventfd_mutex);
+      if (!p->wake_list_first) {
+        p->wake_list_first = p->wake_list_last = ctx;
+      } else {
+        p->wake_list_last->wake_next = ctx;
+        p->wake_list_last = ctx;
+      }
+      if (!p->wakes_in_progress) {
+        // force a wakeup via the eventfd
+        p->wakes_in_progress = true;
+        notify = true;
+      }
+      unlock(&p->eventfd_mutex);
+    }
+  }
+  return notify;
+}
+
+// part2: make OS call without lock held
+static inline void wake_notify(pcontext_t *ctx) {
+  if (ctx->proactor->eventfd == -1)
+    return;
+  uint64_t increment = 1;
+  if (write(ctx->proactor->eventfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t))
+    EPOLL_FATAL("setting eventfd", errno);
+}
+
+// call with no locks
+static pcontext_t *wake_pop_front(pn_proactor_t *p) {
+  pcontext_t *ctx = NULL;
+  lock(&p->eventfd_mutex);
+  assert(p->wakes_in_progress);
+  if (p->wake_list_first) {
+    ctx = p->wake_list_first;
+    p->wake_list_first = ctx->wake_next;
+    if (!p->wake_list_first) p->wake_list_last = NULL;
+    ctx->wake_next = NULL;
+
+    if (!p->wake_list_first) {
+      /* Reset the eventfd until a future write.
+       * Can the read system call be made without holding the lock?
+       * Note that if the reads/writes happen out of order, the wake
+       * mechanism will hang. */
+      (void)read_uint64(p->eventfd);
+      p->wakes_in_progress = false;
+    }
+  }
+  unlock(&p->eventfd_mutex);
+  rearm(p, &p->epoll_wake);
+  return ctx;
+}
+
+// call with owner lock held, once for each pop from the wake list
+static inline void wake_done(pcontext_t *ctx) {
+  assert(ctx->wake_ops > 0);
+  ctx->wake_ops--;
+}
+
+
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listener, const char *addr)
+{
+  ps->epoll_io.psocket = ps;
+  ps->epoll_io.fd = -1;
+  ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO;
+  ps->epoll_io.wanted = 0;
+  ps->epoll_io.polling = false;
+  ps->proactor = p;
+  ps->listener = listener;
+  ps->sockfd = -1;
+  pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port);
+}
+
+typedef struct pconnection_t {
+  psocket_t psocket;
+  pcontext_t context;
+  uint32_t new_events;
+  int wake_count;
+  bool server;                /* accept, not connect */
+  bool tick_pending;
+  bool timer_armed;
+  bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
+  pn_condition_t *disconnect_condition;
+  ptimer_t timer;  // TODO: review one timerfd per connection
+  // Following values only changed by (sole) working context:
+  uint32_t current_arm;  // active epoll io events
+  bool connected;
+  bool read_blocked;
+  bool write_blocked;
+  bool disconnected;
+  int hog_count; // thread hogging limiter
+  pn_event_batch_t batch;
+  pn_connection_driver_t driver;
+  struct pn_netaddr_t local, remote; /* Actual addresses */
+  struct addrinfo *addrinfo;         /* Resolved address list */
+  struct addrinfo *ai;               /* Current connect address */
+  pmutex rearm_mutex;                /* protects pconnection_rearm from out of order arming*/
+} pconnection_t;
+
+/* Protects read/update of pn_connnection_t pointer to it's pconnection_t
+ *
+ * Global because pn_connection_wake()/pn_connection_proactor() navigate from
+ * the pn_connection_t before we know the proactor or driver. Critical sections
+ * are small: only get/set of the pn_connection_t driver pointer.
+ *
+ * TODO: replace mutex with atomic load/store
+ */
+static pthread_mutex_t driver_ptr_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static pconnection_t *get_pconnection(pn_connection_t* c) {
+  if (!c) return NULL;
+  lock(&driver_ptr_mutex);
+  pn_connection_driver_t *d = *pn_connection_driver_ptr(c);
+  unlock(&driver_ptr_mutex);
+  if (!d) return NULL;
+  return (pconnection_t*)((char*)d-offsetof(pconnection_t, driver));
+}
+
+static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
+  lock(&driver_ptr_mutex);
+  *pn_connection_driver_ptr(c) = pc ? &pc->driver : NULL;
+  unlock(&driver_ptr_mutex);
+}
+
+/*
+ * A listener can have mutiple sockets (as specified in the addrinfo).  They
+ * are armed separately.  The individual psockets can be part of at most one
+ * list: the global proactor overflow retry list or the per-listener list of
+ * pending accepts (valid inbound socket obtained, but pn_listener_accept not
+ * yet called by the application).  These lists will be small and quick to
+ * traverse.
+ */
+
+struct acceptor_t{
+  psocket_t psocket;
+  int accepted_fd;
+  bool armed;
+  bool overflowed;
+  acceptor_t *next;              /* next listener list member */
+  struct pn_netaddr_t addr;      /* listening address */
+};
+
+struct pn_listener_t {
+  acceptor_t *acceptors;          /* Array of listening sockets */
+  size_t acceptors_size;
+  int active_count;               /* Number of listener sockets registered with epoll */
+  pcontext_t context;
+  pn_condition_t *condition;
+  pn_collector_t *collector;
+  pn_event_batch_t batch;
+  pn_record_t *attachments;
+  void *listener_context;
+  acceptor_t *pending_acceptors;  /* list of those with a valid inbound fd*/
+  int pending_count;
+  bool unclaimed;                 /* attach event dispatched but no pn_listener_attach() call yet */
+  size_t backlog;
+  bool close_dispatched;
+  pmutex rearm_mutex;             /* orders rearms/disarms, nothing else */
+};
+
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
+static void write_flush(pconnection_t *pc);
+static void listener_begin_close(pn_listener_t* l);
+static void proactor_add(pcontext_t *ctx);
+static bool proactor_remove(pcontext_t *ctx);
+
+static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
+  return ps->listener ? NULL : (pconnection_t*)ps;
+}
+
+static inline pn_listener_t *psocket_listener(psocket_t* ps) {
+  return ps->listener;
+}
+
+static inline acceptor_t *psocket_acceptor(psocket_t* ps) {
+  return !ps->listener ? NULL : (acceptor_t *)ps;
+}
+
+static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
+  return c->type == PCONNECTION ?
+    (pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
+}
+
+static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
+  return c->type == LISTENER ?
+    (pn_listener_t*)((char*)c - offsetof(pn_listener_t, context)) : NULL;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+  return (batch->next_event == proactor_batch_next) ?
+    (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+  return (batch->next_event == listener_batch_next) ?
+    (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+  return (batch->next_event == pconnection_batch_next) ?
+    (pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL;
+}
+
+static inline bool pconnection_has_event(pconnection_t *pc) {
+  return pn_connection_driver_has_event(&pc->driver);
+}
+
+static inline bool listener_has_event(pn_listener_t *l) {
+  return pn_collector_peek(l->collector) || (l->pending_count && !l->unclaimed);
+}
+
+static inline bool proactor_has_event(pn_proactor_t *p) {
+  return pn_collector_peek(p->collector);
+}
+
+static pn_event_t *log_event(void* p, pn_event_t *e) {
+  if (e) {
+    pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
+  }
+  return e;
+}
+
+static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) {
+  if (!ps->listener) {
+    pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
+    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+    pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg);
+    pn_connection_driver_close(driver);
+  } else {
+    pn_listener_t *l = psocket_listener(ps);
+    pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
+    listener_begin_close(l);
+  }
+}
+
+static void psocket_error(psocket_t *ps, int err, const char* what) {
+  strerrorbuf msg;
+  pstrerror(err, msg);
+  psocket_error_str(ps, msg, what);
+}
+
+static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) {
+  psocket_error_str(ps, gai_strerror(gai_err), what);
+}
+
+static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
+  struct epoll_event ev;
+  ev.data.ptr = ee;
+  ev.events = ee->wanted | EPOLLONESHOT;
+  memory_barrier(ee);
+  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
+    EPOLL_FATAL("arming polled file descriptor", errno);
+}
+
+static void listener_list_append(acceptor_t **start, acceptor_t *item) {
+  assert(item->next == NULL);
+  if (*start) {
+    acceptor_t *end = *start;
+    while (end->next)
+      end = end->next;
+    end->next = item;
+  }
+  else *start = item;
+}
+
+static acceptor_t *listener_list_next(acceptor_t **start) {
+  acceptor_t *item = *start;
+  if (*start) *start = (*start)->next;
+  if (item) item->next = NULL;
+  return item;
+}
+
+// Add an overflowing listener to the overflow list. Called with listener context lock held.
+static void listener_set_overflow(acceptor_t *a) {
+  a->overflowed = true;
+  pn_proactor_t *p = a->psocket.proactor;
+  lock(&p->overflow_mutex);
+  listener_list_append(&p->overflow, a);
+  unlock(&p->overflow_mutex);
+}
+
+/* TODO aconway 2017-06-08: we should also call proactor_rearm_overflow after a fixed delay,
+   even if the proactor has not freed any file descriptors, since other parts of the process
+   might have*/
+
+// Activate overflowing listeners, called when there may be available file descriptors.
+static void proactor_rearm_overflow(pn_proactor_t *p) {
+  lock(&p->overflow_mutex);
+  acceptor_t* ovflw = p->overflow;
+  p->overflow = NULL;
+  unlock(&p->overflow_mutex);
+  acceptor_t *a = listener_list_next(&ovflw);
+  while (a) {
+    pn_listener_t *l = a->psocket.listener;
+    lock(&l->context.mutex);
+    bool rearming = !l->context.closing;
+    bool notify = false;
+    assert(!a->armed);
+    assert(a->overflowed);
+    a->overflowed = false;
+    if (rearming) {
+      lock(&l->rearm_mutex);
+      a->armed = true;
+    }
+    else notify = wake(&l->context);
+    unlock(&l->context.mutex);
+    if (rearming) {
+      rearm(p, &a->psocket.epoll_io);
+      unlock(&l->rearm_mutex);
+    }
+    if (notify) wake_notify(&l->context);
+    a = listener_list_next(&ovflw);
+  }
+}
+
+// Close an FD and rearm overflow listeners.  Call with no listener locks held.
+static int pclosefd(pn_proactor_t *p, int fd) {
+  int err = close(fd);
+  if (!err) proactor_rearm_overflow(p);
+  return err;
+}
+
+
+// ========================================================================
+// pconnection
+// ========================================================================
+
+static void pconnection_tick(pconnection_t *pc);
+
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, bool server, const char *addr)
+{
+  memset(pc, 0, sizeof(*pc));
+
+  if (pn_connection_driver_init(&pc->driver, c, t) != 0) {
+    free(pc);
+    return "pn_connection_driver_init failure";
+  }
+
+  pcontext_init(&pc->context, PCONNECTION, p, pc);
+  psocket_init(&pc->psocket, p, NULL, addr);
+  pc->new_events = 0;
+  pc->wake_count = 0;
+  pc->tick_pending = false;
+  pc->timer_armed = false;
+  pc->queued_disconnect = false;
+  pc->disconnect_condition = NULL;
+
+  pc->current_arm = 0;
+  pc->connected = false;
+  pc->read_blocked = true;
+  pc->write_blocked = true;
+  pc->disconnected = false;
+  pc->hog_count = 0;
+  pc->batch.next_event = pconnection_batch_next;
+
+  if (server) {
+    pn_transport_set_server(pc->driver.transport);
+  }
+
+  if (!ptimer_init(&pc->timer, &pc->psocket)) {
+    psocket_error(&pc->psocket, errno, "timer setup");
+    pc->disconnected = true;    /* Already failed */
+  }
+  pmutex_init(&pc->rearm_mutex);
+
+  /* Set the pconnection_t backpointer last.
+     Connections that were released by pn_proactor_release_connection() must not reveal themselves
+     to be re-associated with a proactor till setup is complete.
+   */
+  set_pconnection(pc->driver.connection, pc);
+
+  return NULL;
+}
+
+// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled.
+// Return true when all possible outstanding epoll events associated with this pconnection have been processed.
+static inline bool pconnection_is_final(pconnection_t *pc) {
+  return !pc->current_arm && !pc->timer_armed && !pc->context.wake_ops;
+}
+
+static void pconnection_final_free(pconnection_t *pc) {
+  if (pc->driver.connection) {
+    set_pconnection(pc->driver.connection, NULL);
+  }
+  if (pc->addrinfo) {
+    freeaddrinfo(pc->addrinfo);
+  }
+  pmutex_finalize(&pc->rearm_mutex);
+  pn_condition_free(pc->disconnect_condition);
+  pn_connection_driver_destroy(&pc->driver);
+  pcontext_finalize(&pc->context);
+  free(pc);
+}
+
+// call without lock, but only if pconnection_is_final() is true
+static void pconnection_cleanup(pconnection_t *pc) {
+  stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
+  if (pc->psocket.sockfd != -1)
+    pclosefd(pc->psocket.proactor, pc->psocket.sockfd);
+  stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
+  ptimer_finalize(&pc->timer);
+  lock(&pc->context.mutex);
+  bool can_free = proactor_remove(&pc->context);
+  unlock(&pc->context.mutex);
+  if (can_free)
+    pconnection_final_free(pc);
+  // else proactor_disconnect logic owns psocket and its final free
+}
+
+// Call with lock held or from forced_shutdown
+static void pconnection_begin_close(pconnection_t *pc) {
+  if (!pc->context.closing) {
+    pc->context.closing = true;
+    if (pc->current_arm != 0 && !pc->new_events) {
+      // Force io callback via an EPOLLHUP
+      shutdown(pc->psocket.sockfd, SHUT_RDWR);
+    }
+
+    pn_connection_driver_close(&pc->driver);
+    if (ptimer_shutdown(&pc->timer, pc->timer_armed))
+      pc->timer_armed = false;  // disarmed in the sense that the timer will never fire again
+    else if (!pc->timer_armed) {
+      // In doubt.  One last callback to collect
+      rearm(pc->psocket.proactor, &pc->timer.epoll_io);
+      pc->timer_armed = true;
+    }
+  }
+}
+
+static void pconnection_forced_shutdown(pconnection_t *pc) {
+  // Called by proactor_free, no competing threads, no epoll activity.
+  pc->current_arm = 0;
+  pc->new_events = 0;
+  pconnection_begin_close(pc);
+  // pconnection_process will never be called again.  Zero everything.
+  pc->timer_armed = false;
+  pc->context.wake_ops = 0;
+  pn_connection_t *c = pc->driver.connection;
+  pn_collector_release(pn_connection_collector(c));
+  assert(pconnection_is_final(pc));
+  pconnection_cleanup(pc);
+}
+
+static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (!pc->driver.connection) return NULL;
+  pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
+  if (!e) {
+    write_flush(pc);  // May generate transport event
+    e = pn_connection_driver_next_event(&pc->driver);
+    if (!e && pc->hog_count < HOG_MAX) {
+      if (pconnection_process(pc, 0, false, true)) {
+        e = pn_connection_driver_next_event(&pc->driver);
+      }
+    }
+  }
+  return e;
+}
+
+/* Shortcuts */
+static inline bool pconnection_rclosed(pconnection_t  *pc) {
+  return pn_connection_driver_read_closed(&pc->driver);
+}
+
+static inline bool pconnection_wclosed(pconnection_t  *pc) {
+  return pn_connection_driver_write_closed(&pc->driver);
+}
+
+/* Call only from working context (no competitor for pc->current_arm or
+   connection driver).  If true returned, caller must do
+   pconnection_rearm().
+
+   Never rearm(0 | EPOLLONESHOT), since this really means
+   rearm(EPOLLHUP | EPOLLERR | EPOLLONESHOT) and leaves doubt that the
+   EPOLL_CTL_DEL can prevent a parallel HUP/ERR error notification during
+   close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
+*/
+static bool pconnection_rearm_check(pconnection_t *pc) {
+  if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
+    return false;
+  }
+  uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? EPOLLIN : 0;
+  if (!pconnection_wclosed(pc)) {
+    if (pc->write_blocked)
+      wanted_now |= EPOLLOUT;
+    else {
+      pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+      if (wbuf.size > 0)
+        wanted_now |= EPOLLOUT;
+    }
+  }
+  if (!wanted_now || pc->current_arm == wanted_now) return false;
+
+  lock(&pc->rearm_mutex);      /* unlocked in pconnection_rearm... */
+  pc->psocket.epoll_io.wanted = wanted_now;
+  pc->current_arm = wanted_now;
+  return true;                     /* ... so caller MUST call pconnection_rearm */
+}
+
+/* Call without lock */
+static inline void pconnection_rearm(pconnection_t *pc) {
+  rearm(pc->psocket.proactor, &pc->psocket.epoll_io);
+  unlock(&pc->rearm_mutex);
+}
+
+static inline bool pconnection_work_pending(pconnection_t *pc) {
+  if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect)
+    return true;
+  if (!pc->read_blocked && !pconnection_rclosed(pc))
+    return true;
+  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+  return (wbuf.size > 0 && !pc->write_blocked);
+}
+
+static void pconnection_done(pconnection_t *pc) {
+  bool notify = false;
+  lock(&pc->context.mutex);
+  pc->context.working = false;  // So we can wake() ourself if necessary.  We remain the de facto
+                                // working context while the lock is held.
+  pc->hog_count = 0;
+  if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
+    notify = wake(&pc->context);
+  } else if (pn_connection_driver_finished(&pc->driver)) {
+    pconnection_begin_close(pc);
+    if (pconnection_is_final(pc)) {
+      unlock(&pc->context.mutex);
+      pconnection_cleanup(pc);
+      return;
+    }
+  }
+  bool rearm = pconnection_rearm_check(pc);
+  unlock(&pc->context.mutex);
+  if (rearm) pconnection_rearm(pc);
+  if (notify) wake_notify(&pc->context);
+}
+
+// Return true unless error
+static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
+  ssize_t n = send(pc->psocket.sockfd, wbuf.start, wbuf.size, MSG_NOSIGNAL);
+  if (n > 0) {
+    pn_connection_driver_write_done(&pc->driver, n);
+    if ((size_t) n < wbuf.size) pc->write_blocked = true;
+  } else if (errno == EWOULDBLOCK) {
+    pc->write_blocked = true;
+  } else if (!(errno == EAGAIN || errno == EINTR)) {
+    return false;
+  }
+  return true;
+}
+
+static void write_flush(pconnection_t *pc) {
+  if (!pc->write_blocked && !pconnection_wclosed(pc)) {
+    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+    if (wbuf.size > 0) {
+      if (!pconnection_write(pc, wbuf)) {
+        psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to");
+      }
+    }
+    else {
+      if (pn_connection_driver_write_closed(&pc->driver)) {
+        shutdown(pc->psocket.sockfd, SHUT_WR);
+        pc->write_blocked = true;
+      }
+    }
+  }
+}
+
+static void pconnection_connected_lh(pconnection_t *pc);
+static void pconnection_maybe_connect_lh(pconnection_t *pc);
+
+/*
+ * May be called concurrently from multiple threads:
+ *   pn_event_batch_t loop (topup is true)
+ *   timer (timeout is true)
+ *   socket io (events != 0)
+ *   one or more wake()
+ * Only one thread becomes (or always was) the working thread.
+ */
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup) {
+  bool inbound_wake = !(events | timeout | topup);
+  bool rearm_timer = false;
+  bool timer_fired = false;
+  bool waking = false;
+  bool tick_required = false;
+
+  // Don't touch data exclusive to working thread (yet).
+
+  if (timeout) {
+    rearm_timer = true;
+    timer_fired = ptimer_callback(&pc->timer) != 0;
+  }
+  lock(&pc->context.mutex);
+
+  if (events) {
+    pc->new_events = events;
+    events = 0;
+  }
+  else if (timer_fired) {
+    pc->tick_pending = true;
+    timer_fired = false;
+  }
+  else if (inbound_wake) {
+    wake_done(&pc->context);
+    inbound_wake = false;
+  }
+
+  if (rearm_timer)
+    pc->timer_armed = false;
+
+  if (topup) {
+    // Only called by the batch owner.  Does not loop, just "tops up"
+    // once.  May be back depending on hog_count.
+    assert(pc->context.working);
+  }
+  else {
+    if (pc->context.working) {
+      // Another thread is the working context.
+      unlock(&pc->context.mutex);
+      return NULL;
+    }
+    pc->context.working = true;
+  }
+
+  // Confirmed as working thread.  Review state and unlock ASAP.
+
+ retry:
+
+  if (pc->queued_disconnect) {  // From pn_proactor_disconnect()
+    pc->queued_disconnect = false;
+    if (!pc->context.closing) {
+      if (pc->disconnect_condition) {
+        pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition);
+      }
+      pn_connection_driver_close(&pc->driver);
+    }
+  }
+
+  if (pconnection_has_event(pc)) {
+    unlock(&pc->context.mutex);
+    return &pc->batch;
+  }
+  bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc);
+  if (pc->wake_count) {
+    waking = !closed;
+    pc->wake_count = 0;
+  }
+  if (pc->tick_pending) {
+    pc->tick_pending = false;
+    tick_required = !closed;
+  }
+
+  if (pc->new_events) {
+    pc->current_arm = 0;
+    if (!pc->context.closing) {
+      if ((pc->new_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc))
+        pconnection_maybe_connect_lh(pc);
+      else
+        pconnection_connected_lh(pc); /* Non error event means we are connected */
+      if (pc->new_events & EPOLLOUT)
+        pc->write_blocked = false;
+      if (pc->new_events & EPOLLIN)
+        pc->read_blocked = false;
+    }
+    pc->new_events = 0;
+  }
+
+  if (pc->context.closing && pconnection_is_final(pc)) {
+    unlock(&pc->context.mutex);
+    pconnection_cleanup(pc);
+    return NULL;
+  }
+
+  unlock(&pc->context.mutex);
+  pc->hog_count++; // working context doing work
+
+  if (waking) {
+    pn_connection_t *c = pc->driver.connection;
+    pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+    waking = false;
+  }
+
+  // read... tick... write
+  // perhaps should be: write_if_recent_EPOLLOUT... read... tick... write
+
+  if (!pconnection_rclosed(pc)) {
+    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+    if (rbuf.size > 0 && !pc->read_blocked) {
+      ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
+
+      if (n > 0) {
+        pn_connection_driver_read_done(&pc->driver, n);
+        pconnection_tick(pc);         /* check for tick changes. */
+        tick_required = false;
+        if (!pn_connection_driver_read_closed(&pc->driver) && (size_t)n < rbuf.size)
+          pc->read_blocked = true;
+      }
+      else if (n == 0) {
+        pn_connection_driver_read_close(&pc->driver);
+      }
+      else if (errno == EWOULDBLOCK)
+        pc->read_blocked = true;
+      else if (!(errno == EAGAIN || errno == EINTR)) {
+        psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on read from");
+      }
+    }
+  }
+
+  if (tick_required) {
+    pconnection_tick(pc);         /* check for tick changes. */
+    tick_required = false;
+  }
+
+  if (topup) {
+    // If there was anything new to topup, we have it by now.
+    return NULL;  // caller already owns the batch
+  }
+
+  if (pconnection_has_event(pc)) {
+    return &pc->batch;
+  }
+
+  write_flush(pc);
+
+  lock(&pc->context.mutex);
+  if (pc->context.closing && pconnection_is_final(pc)) {
+    unlock(&pc->context.mutex);
+    pconnection_cleanup(pc);
+    return NULL;
+  }
+
+  // Never stop working while work remains.  hog_count exception to this rule is elsewhere.
+  if (pconnection_work_pending(pc))
+    goto retry;  // TODO: get rid of goto without adding more locking
+
+  pc->context.working = false;
+  pc->hog_count = 0;
+  if (pn_connection_driver_finished(&pc->driver)) {
+    pconnection_begin_close(pc);
+    if (pconnection_is_final(pc)) {
+      unlock(&pc->context.mutex);
+      pconnection_cleanup(pc);
+      return NULL;
+    }
+  }
+  bool rearm_pc = pconnection_rearm_check(pc);
+
+  if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.timerfd >= 0) {
+    pc->timer_armed = true;  // about to rearm outside the lock
+    rearm_timer = true;      // so we remember
+  }
+  unlock(&pc->context.mutex);
+
+  if (rearm_timer) {
+    rearm(pc->psocket.proactor, &pc->timer.epoll_io);
+  }
+  if (rearm_pc) pconnection_rearm(pc);
+
+  return NULL;
+}
+
+static void configure_socket(int sock) {
+  int flags = fcntl(sock, F_GETFL);
+  flags |= O_NONBLOCK;
+  (void)fcntl(sock, F_SETFL, flags); // TODO: check for error
+
+  int tcp_nodelay = 1;
+  (void)setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay));
+}
+
+/* Called with context.lock held */
+void pconnection_connected_lh(pconnection_t *pc) {
+  if (!pc->connected) {
+    pc->connected = true;
+    if (pc->addrinfo) {
+      freeaddrinfo(pc->addrinfo);
+      pc->addrinfo = NULL;
+    }
+    pc->ai = NULL;
+    socklen_t len = sizeof(pc->remote.ss);
+    (void)getpeername(pc->psocket.sockfd, (struct sockaddr*)&pc->remote.ss, &len);
+  }
+}
+
+/* multi-address connections may call pconnection_start multiple times with diffferent FDs  */
+static void pconnection_start(pconnection_t *pc) {
+  int efd = pc->psocket.proactor->epollfd;
+  /* Start timer, a no-op if the timer has already started. */
+  start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
+
+  /* Get the local socket name now, get the peer name in pconnection_connected */
+  socklen_t len = sizeof(pc->local.ss);
+  (void)getsockname(pc->psocket.sockfd, (struct sockaddr*)&pc->local.ss, &len);
+
+  epoll_extended_t *ee = &pc->psocket.epoll_io;
+  if (ee->polling) {     /* This is not the first attempt, stop polling and close the old FD */
+    int fd = ee->fd;     /* Save fd, it will be set to -1 by stop_polling */
+    stop_polling(ee, efd);
+    pclosefd(pc->psocket.proactor, fd);
+  }
+  ee->fd = pc->psocket.sockfd;
+  pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
+  start_polling(ee, efd);  // TODO: check for error
+}
+
+/* Called on initial connect, and if connection fails to try another address */
+static void pconnection_maybe_connect_lh(pconnection_t *pc) {
+  errno = 0;
+  if (!pc->connected) {         /* Not yet connected */
+    while (pc->ai) {            /* Have an address */
+      struct addrinfo *ai = pc->ai;
+      pc->ai = pc->ai->ai_next; /* Move to next address in case this fails */
+      int fd = socket(ai->ai_family, SOCK_STREAM, 0);
+      if (fd >= 0) {
+        configure_socket(fd);
+        if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) {
+          pc->psocket.sockfd = fd;
+          pconnection_start(pc);
+          return;               /* Async connection started */
+        } else {
+          close(fd);
+        }
+      }
+      /* connect failed immediately, go round the loop to try the next addr */
+    }
+    freeaddrinfo(pc->addrinfo);
+    pc->addrinfo = NULL;
+    /* If there was a previous attempted connection, let the poller discover the
+       errno from its socket, otherwise set the current error. */
+    if (pc->psocket.sockfd < 1) {
+      psocket_error(&pc->psocket, errno ? errno : ENOTCONN, "on connect");
+    }
+  }
+  pc->disconnected = true;
+}
+
+static int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res)
+{
+  struct addrinfo hints = { 0 };
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+  return getaddrinfo(host, port, &hints, res);
+}
+
+static inline bool is_inactive(pn_proactor_t *p) {
+  return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->shutting_down);
+}
+
+/* If inactive set need_inactive and return true if the proactor needs a wakeup */
+static bool wake_if_inactive(pn_proactor_t *p) {
+  if (is_inactive(p)) {
+    p->need_inactive = true;
+    return wake(&p->context);
+  }
+  return false;
+}
+
+void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
+  pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t));
+  assert(pc); // TODO: memory safety
+  const char *err = pconnection_setup(pc, p, c, t, false, addr);
+  if (err) {    /* TODO aconway 2017-09-13: errors must be reported as events */
+    pn_logf("pn_proactor_connect failure: %s", err);
+    return;
+  }
+  // TODO: check case of proactor shutting down
+
+  lock(&pc->context.mutex);
+  proactor_add(&pc->context);
+  pn_connection_open(pc->driver.connection); /* Auto-open */
+
+  bool notify = false;
+  bool notify_proactor = false;
+
+  if (pc->disconnected) {
+    notify = wake(&pc->context);    /* Error during initialization */
+  } else {
+    int gai_error = pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo);
+    if (!gai_error) {
+      pn_connection_open(pc->driver.connection); /* Auto-open */
+      pc->ai = pc->addrinfo;
+      pconnection_maybe_connect_lh(pc); /* Start connection attempts */
+      if (pc->disconnected) notify = wake(&pc->context);
+    } else {
+      psocket_gai_error(&pc->psocket, gai_error, "connect to ");
+      notify = wake(&pc->context);
+      notify_proactor = wake_if_inactive(p);
+    }
+  }
+  /* We need to issue INACTIVE on immediate failure */
+  unlock(&pc->context.mutex);
+  if (notify) wake_notify(&pc->context);
+  if (notify_proactor) wake_notify(&p->context);
+}
+
+static void pconnection_tick(pconnection_t *pc) {
+  pn_transport_t *t = pc->driver.transport;
+  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
+    ptimer_set(&pc->timer, 0);
+    uint64_t now = pn_i_now2();
+    uint64_t next = pn_transport_tick(t, now);
+    if (next) {
+      ptimer_set(&pc->timer, next - now);
+    }
+  }
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+  bool notify = false;
+  pconnection_t *pc = get_pconnection(c);
+  if (pc) {
+    lock(&pc->context.mutex);
+    if (!pc->context.closing) {
+      pc->wake_count++;
+      notify = wake(&pc->context);
+    }
+    unlock(&pc->context.mutex);
+  }
+  if (notify) wake_notify(&pc->context);
+}
+
+void pn_proactor_release_connection(pn_connection_t *c) {
+  bool notify = false;
+  pconnection_t *pc = get_pconnection(c);
+  if (pc) {
+    set_pconnection(c, NULL);
+    lock(&pc->context.mutex);
+    pn_connection_driver_release_connection(&pc->driver);
+    pconnection_begin_close(pc);
+    notify = wake(&pc->context);
+    unlock(&pc->context.mutex);
+  }
+  if (notify) wake_notify(&pc->context);
+}
+
+// ========================================================================
+// listener
+// ========================================================================
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_listener_t *pn_listener() {
+  pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
+  if (l) {
+    l->batch.next_event = listener_batch_next;
+    l->collector = pn_collector();
+    l->condition = pn_condition();
+    l->attachments = pn_record();
+    if (!l->condition || !l->collector || !l->attachments) {
+      pn_listener_free(l);
+      return NULL;
+    }
+    pn_proactor_t *unknown = NULL;  // won't know until pn_proactor_listen
+    pcontext_init(&l->context, LISTENER, unknown, l);
+    pmutex_init(&l->rearm_mutex);
+  }
+  return l;
+}
+
+void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog)
+{
+  // TODO: check listener not already listening for this or another proactor
+  lock(&l->context.mutex);
+  l->context.proactor = p;;
+  l->backlog = backlog;
+
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
+  pni_parse_addr(addr, addr_buf, PN_MAX_ADDR, &host, &port);
+
+  struct addrinfo *addrinfo = NULL;
+  int gai_err = pgetaddrinfo(host, port, AI_PASSIVE | AI_ALL, &addrinfo);
+  if (!gai_err) {
+    /* Count addresses, allocate enough space for sockets */
+    size_t len = 0;
+    for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+      ++len;
+    }
+    assert(len > 0);            /* guaranteed by getaddrinfo */
+    l->acceptors = (acceptor_t*)calloc(len, sizeof(acceptor_t));
+    assert(l->acceptors);      /* TODO aconway 2017-05-05: memory safety */
+    l->acceptors_size = 0;
+    uint16_t dynamic_port = 0;  /* Record dynamic port from first bind(0) */
+    /* Find working listen addresses */
+    for (struct addrinfo *ai = addrinfo; ai; ai = ai->ai_next) {
+      if (dynamic_port) set_port(ai->ai_addr, dynamic_port);
+      int fd = socket(ai->ai_family, SOCK_STREAM, ai->ai_protocol);
+      static int on = 1;
+      if (fd >= 0) {
+        if (!setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) &&
+            /* We listen to v4/v6 on separate sockets, don't let v6 listen for v4 */
+            (ai->ai_family != AF_INET6 ||
+             !setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on))) &&
+            !bind(fd, ai->ai_addr, ai->ai_addrlen) &&
+            !listen(fd, backlog))
+        {
+          acceptor_t *acceptor = &l->acceptors[l->acceptors_size++];
+          /* Get actual address */
+          socklen_t len = pn_netaddr_socklen(&acceptor->addr);
+          (void)getsockname(fd, (struct sockaddr*)(&acceptor->addr.ss), &len);
+          if (acceptor == l->acceptors) { /* First acceptor, check for dynamic port */
+            dynamic_port = check_dynamic_port(ai->ai_addr, pn_netaddr_sockaddr(&acceptor->addr));
+          } else {              /* Link addr to previous addr */
+            (acceptor-1)->addr.next = &acceptor->addr;
+          }
+
+          acceptor->accepted_fd = -1;
+          psocket_t *ps = &acceptor->psocket;
+          psocket_init(ps, p, l, addr);
+          ps->sockfd = fd;
+          ps->epoll_io.fd = fd;
+          ps->epoll_io.wanted = EPOLLIN;
+          ps->epoll_io.polling = false;
+          lock(&l->rearm_mutex);
+          start_polling(&ps->epoll_io, ps->proactor->epollfd);  // TODO: check for error
+          l->active_count++;
+          acceptor->armed = true;
+          unlock(&l->rearm_mutex);
+        } else {
+          close(fd);
+        }
+      }
+    }
+  }
+  if (addrinfo) {
+    freeaddrinfo(addrinfo);
+  }
+  bool notify = wake(&l->context);
+
+  if (l->acceptors_size == 0) { /* All failed, create dummy socket with an error */
+    l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t));
+    l->acceptors_size = 1;
+    memset(l->acceptors, 0, sizeof(acceptor_t));
+    psocket_init(&l->acceptors[0].psocket, p, l, addr);
+    l->acceptors[0].accepted_fd = -1;
+    if (gai_err) {
+      psocket_gai_error(&l->acceptors[0].psocket, gai_err, "listen on");
+    } else {
+      psocket_error(&l->acceptors[0].psocket, errno, "listen on");
+    }
+  } else {
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
+  }
+  proactor_add(&l->context);
+  unlock(&l->context.mutex);
+  if (notify) wake_notify(&l->context);
+  return;
+}
+
+// call with lock held and context.working false
+static inline bool listener_can_free(pn_listener_t *l) {
+  return l->context.closing && l->close_dispatched && !l->context.wake_ops && !l->active_count;
+}
+
+static inline void listener_final_free(pn_listener_t *l) {
+  pcontext_finalize(&l->context);
+  pmutex_finalize(&l->rearm_mutex);
+  free(l->acceptors);
+  free(l);
+}
+
+void pn_listener_free(pn_listener_t *l) {
+  /* Note at this point either the listener has never been used (freed by user)
+     or it has been closed, so all its sockets are closed.
+  */
+  if (l) {
+    bool can_free = true;
+    if (l->collector) pn_collector_free(l->collector);
+    if (l->condition) pn_condition_free(l->condition);
+    if (l->attachments) pn_free(l->attachments);
+    lock(&l->context.mutex);
+    if (l->context.proactor) {
+      can_free = proactor_remove(&l->context);
+    }
+    unlock(&l->context.mutex);
+    if (can_free)
+      listener_final_free(l);
+  }
+}
+
+/* Always call with lock held so it can be unlocked around overflow processing. */
+static void listener_begin_close(pn_listener_t* l) {
+  if (!l->context.closing) {
+    l->context.closing = true;
+
+    /* Close all listening sockets */
+    for (size_t i = 0; i < l->acceptors_size; ++i) {
+      acceptor_t *a = &l->acceptors[i];
+      psocket_t *ps = &a->psocket;
+      if (ps->sockfd >= 0) {
+        lock(&l->rearm_mutex);
+        if (a->armed) {
+          shutdown(ps->sockfd, SHUT_RD);  // Force epoll event and callback
+        } else {
+          stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+          close(ps->sockfd);
+          ps->sockfd = -1;
+          l->active_count--;
+        }
+        unlock(&l->rearm_mutex);
+      }
+    }
+    /* Close all sockets waiting for a pn_listener_accept2() */
+    if (l->unclaimed) l->pending_count++;
+    acceptor_t *a = listener_list_next(&l->pending_acceptors);
+    while (a) {
+      close(a->accepted_fd);
+      a->accepted_fd = -1;
+      l->pending_count--;
+      a = listener_list_next(&l->pending_acceptors);
+    }
+    assert(!l->pending_count);
+
+    unlock(&l->context.mutex);
+    /* Remove all acceptors from the overflow list.  closing flag prevents re-insertion.*/
+    proactor_rearm_overflow(pn_listener_proactor(l));
+    lock(&l->context.mutex);
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+  }
+}
+
+void pn_listener_close(pn_listener_t* l) {
+  bool notify = false;
+  lock(&l->context.mutex);
+  if (!l->context.closing) {
+    listener_begin_close(l);
+    notify = wake(&l->context);
+  }
+  unlock(&l->context.mutex);
+  if (notify) wake_notify(&l->context);
+}
+
+static void listener_forced_shutdown(pn_listener_t *l) {
+  // Called by proactor_free, no competing threads, no epoll activity.
+  lock(&l->context.mutex); // needed because of interaction with proactor_rearm_overflow
+  listener_begin_close(l);
+  unlock(&l->context.mutex);
+  // pconnection_process will never be called again.  Zero everything.
+  l->context.wake_ops = 0;
+  l->close_dispatched = true;
+  l->active_count = 0;
+  assert(listener_can_free(l));
+  pn_listener_free(l);
+}
+
+/* Accept a connection as part of listener_process(). Called with listener context lock held. */
+static void listener_accept_lh(psocket_t *ps) {
+  pn_listener_t *l = psocket_listener(ps);
+  acceptor_t *acceptor = psocket_acceptor(ps);
+  assert(acceptor->accepted_fd < 0); /* Shouldn't already have an accepted_fd */
+  acceptor->accepted_fd = accept(ps->sockfd, NULL, 0);
+  if (acceptor->accepted_fd >= 0) {
+    //    acceptor_t *acceptor = listener_list_next(pending_acceptors);
+    listener_list_append(&l->pending_acceptors, acceptor);
+    l->pending_count++;
+  } else {
+    int err = errno;
+    if (err == ENFILE || err == EMFILE) {
+      listener_set_overflow(acceptor);
+    } else {
+      psocket_error(ps, err, "accept");
+    }
+  }
+}
+
+/* Process a listening socket */
+static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
+  // TODO: some parallelization of the accept mechanism.
+  pn_listener_t *l = psocket_listener(ps);
+  acceptor_t *a = psocket_acceptor(ps);
+  lock(&l->context.mutex);
+  if (events) {
+    a->armed = false;
+    if (l->context.closing) {
+      lock(&l->rearm_mutex);
+      stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+      unlock(&l->rearm_mutex);
+      close(ps->sockfd);
+      ps->sockfd = -1;
+      l->active_count--;
+    }
+    else {
+      if (events & EPOLLRDHUP) {
+        /* Calls listener_begin_close which closes all the listener's sockets */
+        psocket_error(ps, errno, "listener epoll");
+      } else if (!l->context.closing && events & EPOLLIN) {
+        listener_accept_lh(ps);
+      }
+    }
+  } else {
+    wake_done(&l->context); // callback accounting
+  }
+  pn_event_batch_t *lb = NULL;
+  if (!l->context.working) {
+    l->context.working = true;
+    if (listener_has_event(l))
+      lb = &l->batch;
+    else {
+      l->context.working = false;
+      if (listener_can_free(l)) {
+        unlock(&l->context.mutex);
+        pn_listener_free(l);
+        return NULL;
+      }
+    }
+  }
+  unlock(&l->context.mutex);
+  return lb;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+  pn_listener_t *l = batch_listener(batch);
+  lock(&l->context.mutex);
+  pn_event_t *e = pn_collector_next(l->collector);
+  if (!e && l->pending_count && !l->unclaimed) {
+    // empty collector means pn_collector_put() will not coalesce
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+    l->unclaimed = true;
+    l->pending_count--;
+    e = pn_collector_next(l->collector);
+  }
+  if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
+    l->close_dispatched = true;
+  unlock(&l->context.mutex);
+  return log_event(l, e);
+}
+
+static void listener_done(pn_listener_t *l) {
+  bool notify = false;
+  lock(&l->context.mutex);
+  l->context.working = false;
+
+  if (listener_can_free(l)) {
+    unlock(&l->context.mutex);
+    pn_listener_free(l);
+    return;
+  } else if (listener_has_event(l))
+    notify = wake(&l->context);
+  unlock(&l->context.mutex);
+  if (notify) wake_notify(&l->context);
+}
+
+pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+  return l ? l->acceptors[0].psocket.proactor : NULL;
+}
+
+pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+  return l->condition;
+}
+
+void *pn_listener_get_context(pn_listener_t *l) {
+  return l->listener_context;
+}
+
+void pn_listener_set_context(pn_listener_t *l, void *context) {
+  l->listener_context = context;
+}
+
+pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+  return l->attachments;
+}
+
+void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t) {
+  pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t));
+  assert(pc); // TODO: memory safety
+  const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, "");
+  if (err) {
+    pn_logf("pn_listener_accept failure: %s", err);
+    return;
+  }
+  // TODO: fuller sanity check on input args
+
+  int err2 = 0;
+  int fd = -1;
+  psocket_t *rearming_ps = NULL;
+  bool notify = false;
+  lock(&l->context.mutex);
+  if (l->context.closing)
+    err2 = EBADF;
+  else if (l->unclaimed) {
+    l->unclaimed = false;
+    acceptor_t *a = listener_list_next(&l->pending_acceptors);
+    assert(a);
+    assert(!a->armed);
+    fd = a->accepted_fd;
+    a->accepted_fd = -1;
+    lock(&l->rearm_mutex);
+    rearming_ps = &a->psocket;
+    a->armed = true;
+  }
+  else err2 = EWOULDBLOCK;
+
+  proactor_add(&pc->context);
+  lock(&pc->context.mutex);
+  pc->psocket.sockfd = fd;
+  if (fd >= 0) {
+    configure_socket(fd);
+    pconnection_start(pc);
+    pconnection_connected_lh(pc);
+  }
+  else
+    psocket_error(&pc->psocket, err2, "pn_listener_accept");
+  if (!l->context.working && listener_has_event(l))
+    notify = wake(&l->context);
+  unlock(&pc->context.mutex);
+  unlock(&l->context.mutex);
+  if (rearming_ps) {
+    rearm(rearming_ps->proactor, &rearming_ps->epoll_io);
+    unlock(&l->rearm_mutex);
+  }
+  if (notify) wake_notify(&l->context);
+}
+
+
+// ========================================================================
+// proactor
+// ========================================================================
+
+/* Set up an epoll_extended_t to be used for wakeup or interrupts */
+static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
+  ee->psocket = NULL;
+  ee->fd = eventfd;
+  ee->type = WAKE;
+  ee->wanted = EPOLLIN;
+  ee->polling = false;
+  start_polling(ee, epollfd);  // TODO: check for error
+}
+
+pn_proactor_t *pn_proactor() {
+  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+  if (!p) return NULL;
+  p->epollfd = p->eventfd = p->timer.timerfd = -1;
+  pcontext_init(&p->context, PROACTOR, p, p);
+  pmutex_init(&p->eventfd_mutex);
+  ptimer_init(&p->timer, 0);
+
+  if ((p->epollfd = epoll_create(1)) >= 0) {
+    if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
+      if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
+        if (p->timer.timerfd >= 0)
+          if ((p->collector = pn_collector()) != NULL) {
+            p->batch.next_event = &proactor_batch_next;
+            start_polling(&p->timer.epoll_io, p->epollfd);  // TODO: check for error
+            p->timer_armed = true;
+            epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd);
+            epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd);
+            return p;
+          }
+      }
+    }
+  }
+  if (p->epollfd >= 0) close(p->epollfd);
+  if (p->eventfd >= 0) close(p->eventfd);
+  if (p->interruptfd >= 0) close(p->interruptfd);
+  ptimer_finalize(&p->timer);
+  if (p->collector) pn_free(p->collector);
+  free (p);
+  return NULL;
+}
+
+void pn_proactor_free(pn_proactor_t *p) {
+  //  No competing threads, not even a pending timer
+  p->shutting_down = true;
+  close(p->epollfd);
+  p->epollfd = -1;
+  close(p->eventfd);
+  p->eventfd = -1;
+  close(p->interruptfd);
+  p->interruptfd = -1;
+  ptimer_finalize(&p->timer);
+  while (p->contexts) {
+    pcontext_t *ctx = p->contexts;
+    p->contexts = ctx->next;
+    switch (ctx->type) {
+     case PCONNECTION:
+      pconnection_forced_shutdown(pcontext_pconnection(ctx));
+      break;
+     case LISTENER:
+      listener_forced_shutdown(pcontext_listener(ctx));
+      break;
+     default:
+      break;
+    }
+  }
+
+  pn_collector_free(p->collector);
+  pmutex_finalize(&p->eventfd_mutex);
+  pcontext_finalize(&p->context);
+  free(p);
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+  if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+  pn_listener_t *l = pn_event_listener(e);
+  if (l) return l->acceptors[0].psocket.proactor;
+  pn_connection_t *c = pn_event_connection(e);
+  if (c) return pn_connection_proactor(c);
+  return NULL;
+}
+
+static void proactor_add_event(pn_proactor_t *p, pn_event_type_t t) {
+  pn_collector_put(p->collector, pn_proactor__class(), p, t);
+}
+
+// Call with lock held.  Leave unchanged if events pending.
+// There can be multiple interrupts but only one inside the collector to avoid coalescing.
+// Return true if there is an event in the collector.
+static bool proactor_update_batch(pn_proactor_t *p) {
+  if (proactor_has_event(p))
+    return true;
+
+  if (p->need_timeout) {
+    p->need_timeout = false;
+    p->timeout_set = false;
+    proactor_add_event(p, PN_PROACTOR_TIMEOUT);
+    return true;
+  }
+  if (p->need_interrupt) {
+    p->need_interrupt = false;
+    proactor_add_event(p, PN_PROACTOR_INTERRUPT);
+    return true;
+  }
+  if (p->need_inactive) {
+    p->need_inactive = false;
+    proactor_add_event(p, PN_PROACTOR_INACTIVE);
+    return true;
+  }
+  return false;
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+  pn_proactor_t *p = batch_proactor(batch);
+  lock(&p->context.mutex);
+  proactor_update_batch(p);
+  pn_event_t *e = pn_collector_next(p->collector);
+  if (e && pn_event_type(e) == PN_PROACTOR_TIMEOUT)
+    p->timeout_processed = true;
+  unlock(&p->context.mutex);
+  return log_event(p, e);
+}
+
+static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t event) {
+  bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer) != 0;
+  lock(&p->context.mutex);
+  if (event == PN_PROACTOR_INTERRUPT) {
+    p->need_interrupt = true;
+  } else if (event == PN_PROACTOR_TIMEOUT) {
+    p->timer_armed = false;
+    if (timer_fired && p->timeout_set) {
+      p->need_timeout = true;
+    }
+  } else {
+    wake_done(&p->context);
+  }
+  if (!p->context.working) {       /* Can generate proactor events */
+    if (proactor_update_batch(p)) {
+      p->context.working = true;
+      unlock(&p->context.mutex);
+      return &p->batch;
+    }
+  }
+  bool rearm_timer = !p->timer_armed && !p->timer.shutting_down;
+  p->timer_armed = true;
+  unlock(&p->context.mutex);
+  if (rearm_timer)
+    rearm(p, &p->timer.epoll_io);
+  return NULL;
+}
+
+static void proactor_add(pcontext_t *ctx) {
+  pn_proactor_t *p = ctx->proactor;
+  lock(&p->context.mutex);
+  if (p->contexts) {
+    p->contexts->prev = ctx;
+    ctx->next = p->contexts;
+  }
+  p->contexts = ctx;
+  unlock(&p->context.mutex);
+}
+
+// call with psocket's mutex held
+// return true if safe for caller to free psocket
+static bool proactor_remove(pcontext_t *ctx) {
+  pn_proactor_t *p = ctx->proactor;
+  lock(&p->context.mutex);
+  bool can_free = true;
+  if (ctx->disconnecting) {
+    // No longer on contexts list
+    if (--ctx->disconnect_ops == 0) {
+      --p->disconnects_pending;
+    }
+    else                  // procator_disconnect() still processing
+      can_free = false;   // this psocket
+  }
+  else {
+    // normal case
+    if (ctx->prev)
+      ctx->prev->next = ctx->next;
+    else {
+      p->contexts = ctx->next;
+      ctx->next = NULL;
+      if (p->contexts)
+        p->contexts->prev = NULL;
+    }
+    if (ctx->next) {
+      ctx->next->prev = ctx->prev;
+    }
+  }
+  bool notify = wake_if_inactive(p);
+  unlock(&p->context.mutex);
+  if (notify) wake_notify(&p->context);
+  return can_free;
+}
+
+static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) {
+  if  (ee->fd == p->interruptfd) {        /* Interrupts have their own dedicated eventfd */
+    (void)read_uint64(p->interruptfd);
+    rearm(p, &p->epoll_interrupt);
+    return proactor_process(p, PN_PROACTOR_INTERRUPT);
+  }
+  pcontext_t *ctx = wake_pop_front(p);
+  if (ctx) {
+    switch (ctx->type) {
+     case PROACTOR:
+      return proactor_process(p, PN_EVENT_NONE);
+     case PCONNECTION:
+      return pconnection_process((pconnection_t *) ctx->owner, 0, false, false);
+     case LISTENER:
+      return listener_process(&((pn_listener_t *) ctx->owner)->acceptors[0].psocket, 0);
+     default:
+      assert(ctx->type == WAKEABLE); // TODO: implement or remove
+    }
+  }
+  return NULL;
+}
+
+static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_block) {
+  int timeout = can_block ? -1 : 0;
+  while(true) {
+    pn_event_batch_t *batch = NULL;
+    struct epoll_event ev;
+    int n = epoll_wait(p->epollfd, &ev, 1, timeout);
+
+    if (n < 0) {
+      if (errno != EINTR)
+        perror("epoll_wait"); // TODO: proper log
+      if (!can_block)
+        return NULL;
+      else
+        continue;
+    } else if (n == 0) {
+      if (!can_block)
+        return NULL;
+      else {
+        perror("epoll_wait unexpected timeout"); // TODO: proper log
+        continue;
+      }
+    }
+    assert(n == 1);
+    epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
+    memory_barrier(ee);
+
+    if (ee->type == WAKE) {
+      batch = process_inbound_wake(p, ee);
+    } else if (ee->type == PROACTOR_TIMER) {
+      batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
+    } else {
+      pconnection_t *pc = psocket_pconnection(ee->psocket);
+      if (pc) {
+        if (ee->type == PCONNECTION_IO) {
+          batch = pconnection_process(pc, ev.events, false, false);
+        } else {
+          assert(ee->type == PCONNECTION_TIMER);
+          batch = pconnection_process(pc, 0, true, false);
+        }
+      }
+      else {
+        // TODO: can any of the listener processing be parallelized like IOCP?
+        batch = listener_process(ee->psocket, ev.events);
+      }
+    }
+
+    if (batch) return batch;
+    // No Proton event generated.  epoll_wait() again.
+  }
+}
+
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+  return proactor_do_epoll(p, true);
+}
+
+pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) {
+  return proactor_do_epoll(p, false);
+}
+
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (pc) {
+    pconnection_done(pc);
+    return;
+  }
+  pn_listener_t *l = batch_listener(batch);
+  if (l) {
+    listener_done(l);
+    return;
+  }
+  pn_proactor_t *bp = batch_proactor(batch);
+  if (bp == p) {
+    bool notify = false;
+    lock(&p->context.mutex);
+    bool rearm_timer = !p->timer_armed && !p->shutting_down;
+    p->timer_armed = true;
+    p->context.working = false;
+    if (p->timeout_processed) {
+      p->timeout_processed = false;
+      if (wake_if_inactive(p))
+        notify = true;
+    }
+    proactor_update_batch(p);
+    if (proactor_has_event(p))
+      if (wake(&p->context))
+        notify = true;
+    unlock(&p->context.mutex);
+    if (notify)
+      wake_notify(&p->context);
+    if (rearm_timer)
+      rearm(p, &p->timer.epoll_io);
+    return;
+  }
+}
+
+void pn_proactor_interrupt(pn_proactor_t *p) {
+  if (p->interruptfd == -1)
+    return;
+  uint64_t increment = 1;
+  if (write(p->interruptfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t))
+    EPOLL_FATAL("setting eventfd", errno);
+}
+
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+  bool notify = false;
+  lock(&p->context.mutex);
+  p->timeout_set = true;
+  if (t == 0) {
+    ptimer_set(&p->timer, 0);
+    p->need_timeout = true;
+    notify = wake(&p->context);
+  } else {
+    ptimer_set(&p->timer, t);
+  }
+  unlock(&p->context.mutex);
+  if (notify) wake_notify(&p->context);
+}
+
+void pn_proactor_cancel_timeout(pn_proactor_t *p) {
+  lock(&p->context.mutex);
+  p->timeout_set = false;
+  p->need_timeout = false;
+  ptimer_set(&p->timer, 0);
+  bool notify = wake_if_inactive(p);
+  unlock(&p->context.mutex);
+  if (notify) wake_notify(&p->context);
+}
+
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+  pconnection_t *pc = get_pconnection(c);
+  return pc ? pc->psocket.proactor : NULL;
+}
+
+void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) {
+  bool notify = false;
+
+  lock(&p->context.mutex);
+  // Move the whole contexts list into a disconnecting state
+  pcontext_t *disconnecting_pcontexts = p->contexts;
+  p->contexts = NULL;
+  // First pass: mark each pcontext as disconnecting and update global pending count.
+  pcontext_t *ctx = disconnecting_pcontexts;
+  while (ctx) {
+    ctx->disconnecting = true;
+    ctx->disconnect_ops = 2;   // Second pass below and proactor_remove(), in any order.
+    p->disconnects_pending++;
+    ctx = ctx->next;
+  }
+  notify = wake_if_inactive(p);
+  unlock(&p->context.mutex);
+  if (!disconnecting_pcontexts) {
+    if (notify) wake_notify(&p->context);
+    return;
+  }
+
+  // Second pass: different locking, close the pcontexts, free them if !disconnect_ops
+  pcontext_t *next = disconnecting_pcontexts;
+  while (next) {
+    ctx = next;
+    next = ctx->next;           /* Save next pointer in case we free ctx */
+    bool do_free = false;
+    bool ctx_notify = true;
+    pmutex *ctx_mutex = NULL;
+    pconnection_t *pc = pcontext_pconnection(ctx);
+    if (pc) {
+      ctx_mutex = &pc->context.mutex;
+      lock(ctx_mutex);
+      if (!ctx->closing) {
+        if (ctx->working) {
+          // Must defer
+          pc->queued_disconnect = true;
+          if (cond) {
+            if (!pc->disconnect_condition)
+              pc->disconnect_condition = pn_condition();
+            pn_condition_copy(pc->disconnect_condition, cond);
+          }
+        }
+        else {
+          // No conflicting working context.
+          if (cond) {
+            pn_condition_copy(pn_transport_condition(pc->driver.transport), cond);
+          }
+          pn_connection_driver_close(&pc->driver);
+        }
+      }
+    } else {
+      pn_listener_t *l = pcontext_listener(ctx);
+      assert(l);
+      ctx_mutex = &l->context.mutex;
+      lock(ctx_mutex);
+      if (!ctx->closing) {
+        if (cond) {
+          pn_condition_copy(pn_listener_condition(l), cond);
+        }
+        listener_begin_close(l);
+      }
+    }
+
+    lock(&p->context.mutex);
+    if (--ctx->disconnect_ops == 0) {
+      do_free = true;
+      ctx_notify = false;
+      notify = wake_if_inactive(p);
+    } else {
+      // If initiating the close, wake the pcontext to do the free.
+      if (ctx_notify)
+        ctx_notify = wake(ctx);
+    }
+    unlock(&p->context.mutex);
+    unlock(ctx_mutex);
+
+    if (do_free) {
+      if (pc) pconnection_final_free(pc);
+      else listener_final_free(pcontext_listener(ctx));
+    } else {
+      if (ctx_notify)
+        wake_notify(ctx);
+    }
+  }
+  if (notify)
+    wake_notify(&p->context);
+}
+
+const pn_netaddr_t *pn_transport_local_addr(pn_transport_t *t) {
+  pconnection_t *pc = get_pconnection(pn_transport_connection(t));
+  return pc? &pc->local : NULL;
+}
+
+const pn_netaddr_t *pn_transport_remote_addr(pn_transport_t *t) {
+  pconnection_t *pc = get_pconnection(pn_transport_connection(t));
+  return pc ? &pc->remote : NULL;
+}
+
+const pn_netaddr_t *pn_listener_addr(pn_listener_t *l) {
+  return l->acceptors_size > 0 ? &l->acceptors[0].addr : NULL;
+}
+
+pn_millis_t pn_proactor_now(void) {
+  struct timespec t;
+  clock_gettime(CLOCK_MONOTONIC, &t);
+  return t.tv_sec*1000 + t.tv_nsec/1000000;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message