qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/3] qpid-proton git commit: PROTON-1438: C proactor listening behavior
Date Wed, 22 Mar 2017 16:01:06 GMT
PROTON-1438: C proactor listening behavior

Improved listening behavior for pn_proactor_listen to allow selective listening by protocol (ipv4/v6) or portable "listen to everything".

Host can be a host name, IPV4 or IPV6 literal, or the empty string/NULL (treated the same). The empty string listens on all local addresses. A host name listens on all addresses associated with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4.

- pn_proactor_listen may listen on more than one socket for ipv6/v4 or for DNS names with multiple address records.

- the 'backlog' applies to *each* socket

- an error on any socket will close all the sockets of the listener,  PN_LISTERN_CLOSE event indicates all sockets are closed and provides the error that triggered the close.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ce1b3d1f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ce1b3d1f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ce1b3d1f

Branch: refs/heads/master
Commit: ce1b3d1f84f40963f5ea1ec391a5f589ffb62ac1
Parents: d48bf9b
Author: Alan Conway <aconway@redhat.com>
Authored: Sun Mar 19 14:57:13 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Wed Mar 22 10:59:04 2017 -0400

----------------------------------------------------------------------
 proton-c/include/proton/cid.h      |   4 +-
 proton-c/include/proton/listener.h |   8 +
 proton-c/include/proton/proactor.h |  25 +-
 proton-c/src/proactor/libuv.c      | 791 +++++++++++++++++++-------------
 proton-c/src/tests/proactor.c      | 249 +++++++---
 proton-c/src/tests/test_tools.h    |  86 ++--
 6 files changed, 746 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/include/proton/cid.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h
index 2d68896..e0766a0 100644
--- a/proton-c/include/proton/cid.h
+++ b/proton-c/include/proton/cid.h
@@ -64,7 +64,9 @@ typedef enum {
   CID_pn_url,
 
   CID_pn_listener,
-  CID_pn_proactor
+  CID_pn_proactor,
+
+  CID_pn_listener_socket
 } pn_cid_t;
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 2038c06..6646dd1 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -22,6 +22,7 @@
 
 #include <proton/import_export.h>
 #include <proton/types.h>
+#include <proton/event.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -89,6 +90,9 @@ PNP_EXTERN pn_record_t *pn_listener_attachments(pn_listener_t *listener);
 
 /**
  * Close the listener (thread safe).
+ *
+ * The PN_LISTENER_CLOSE event is generated when the listener has stopped listening.
+ *
  */
 PNP_EXTERN void pn_listener_close(pn_listener_t *l);
 
@@ -97,6 +101,10 @@ PNP_EXTERN void pn_listener_close(pn_listener_t *l);
  */
 PNP_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c);
 
+/**
+ * Return the listener associated with an event or NULL.
+ */
+PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
 
 /**
  *@}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 43b8ccb..185a1af 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -79,15 +79,23 @@ PNP_EXTERN int pn_proactor_connect(
   pn_proactor_t *proactor, pn_connection_t *connection, const char *addr);
 
 /**
- * Start listening with listener.  pn_proactor_wait() will return a
- * PN_LISTENER_ACCEPT event when a connection can be accepted.
+ * Start listening with listener.
+ *
+ * pn_proactor_wait() will return a PN_LISTENER_ACCEPT event when a connection can be
+ * accepted.
+ *
  *
  * @param[in] proactor the proactor object
  * @param[in] listener proactor takes ownership of listener, do not free
- * @param[in] addr the network address (not AMQP address) to connect to. May
- * be in the form "host:port" or an "amqp://" or "amqps://" URL. The `/path` part of
- * the URL is ignored.
- * @param[in] backlog number of connection requests to queue
+ * @param[in] addr the network address (not AMQP address) to connect to in "host:port"
+ *
+ * The host can be a host name, IPV4 or IPV6 literal, or the empty string. The empty
+ * string listens on all local addresses. A host name listens on all addresses associated
+ * with the name. An IPV6 literal address (or wildcard '[::]') listens only for IPV6. An
+ * IPV4 literal address (or wildcard '0.0.0.0') listens only for IPV4."
+ *
+ * @param[in] backlog number of connection requests to queue. If the host resolves
+ * to multiple addresses, this backlog applies to each address.
  *
  * @return error on immediate error, e.g. an allocation failure.
  * Other errors are indicated by pn_listener_condition() on the
@@ -187,11 +195,6 @@ PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
 PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
 
 /**
- * Return the listener associated with an event or NULL.
- */
-PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
-
-/**
  * @}
  */
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
index 1d16972..a077a5f 100644
--- a/proton-c/src/proactor/libuv.c
+++ b/proton-c/src/proactor/libuv.c
@@ -77,76 +77,126 @@ PN_HANDLE(PN_PROACTOR)
 PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
-/* common to connection and listener */
-typedef struct psocket_t {
+
+/* ================ Queues ================ */
+static int unqueued;            /* Provide invalid address for _unqueued pointers */
+
+#define QUEUE_DECL(T)                                                   \
+  typedef struct T##_queue_t { T##_t *front, *back; } T##_queue_t;      \
+                                                                        \
+  static T##_t *T##_unqueued = (T##_t*)&unqueued;                       \
+                                                                        \
+  static void T##_push(T##_queue_t *q, T##_t *x) {                      \
+    assert(x->next == T##_unqueued);                                    \
+    x->next = NULL;                                                     \
+    if (!q->front) {                                                    \
+      q->front = q->back = x;                                           \
+    } else {                                                            \
+      q->back->next = x;                                                \
+      q->back =  x;                                                     \
+    }                                                                   \
+  }                                                                     \
+                                                                        \
+  static T##_t* T##_pop(T##_queue_t *q) {                               \
+    T##_t *x = q->front;                                                \
+    if (x) {                                                            \
+      q->front = x->next;                                               \
+      x->next = T##_unqueued;                                           \
+    }                                                                   \
+    return x;                                                           \
+  }
+
+
+/* All work structs and UV callback data structs start with a struct_type member  */
+typedef enum { T_CONNECTION, T_LISTENER, T_LSOCKET } struct_type;
+
+/* A stream of serialized work for the proactor */
+typedef struct work_t {
   /* Immutable */
+  struct_type type;
   pn_proactor_t *proactor;
-  char host[NI_MAXHOST];
-  char port[NI_MAXSERV];
-  bool is_conn;
 
   /* Protected by proactor.lock */
-  struct psocket_t* next;
+  struct work_t* next;
   bool working;                      /* Owned by a worker thread */
+} work_t;
 
-  /* Only used by leader thread when it owns the psocket */
-  uv_tcp_t tcp;
-} psocket_t;
+QUEUE_DECL(work)
 
-typedef struct queue { psocket_t *front, *back; } queue;
+static void work_init(work_t* w, pn_proactor_t* p, struct_type type) {
+  w->proactor = p;
+  w->next = work_unqueued;
+  w->type = type;
+  w->working = true;
+}
 
-/* Special value for psocket.next pointer when socket is not on any any list. */
-psocket_t UNLISTED;
+/* ================ IO ================ */
 
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
-  ps->proactor = p;
-  ps->next = &UNLISTED;
-  ps->is_conn = is_conn;
-  ps->tcp.data = NULL;          /* Set in leader_init */
-  ps->working = true;
+#define MAXADDR (NI_MAXHOST+NI_MAXSERV)
 
-  /* For platforms that don't know about "amqp" and "amqps" service names. */
-  if (port && strcmp(port, AMQP_PORT_NAME) == 0)
-    port = AMQP_PORT;
-  else if (port && strcmp(port, AMQPS_PORT_NAME) == 0)
-    port = AMQPS_PORT;
-  /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
-  strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
-  strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
-}
+/* A resolvable address */
+typedef struct addr_t {
+  char addr[MAXADDR];
+  char *host, *port;            /* Point into addr after destructive pni_url_parse */
+  uv_getaddrinfo_t getaddrinfo; /* UV getaddrinfo request, contains list of addrinfo */
+  struct addrinfo* addrinfo;    /* The current addrinfo being tried */
+} addr_t;
 
-/* Turn "\001" back to NULL */
-static inline const char* fixstr(const char* str) {
-  return str[0] == '\001' ? NULL : str;
-}
+/* A single listening socket, a listener can have more than one */
+typedef struct lsocket_t {
+  struct_type type;             /* Always T_LSOCKET */
+  pn_listener_t *parent;
+  uv_tcp_t tcp;
+} lsocket_t;
+
+PN_STRUCT_CLASSDEF(lsocket, CID_pn_listener_socket)
 
 typedef enum { W_NONE, W_PENDING, W_CLOSED } wake_state;
 
 /* An incoming or outgoing connection. */
 typedef struct pconnection_t {
-  psocket_t psocket;
+  work_t work;                  /* Must be first to allow casting */
+  struct pconnection_t *next;   /* For listener list */
 
   /* Only used by owner thread */
   pn_connection_driver_t driver;
 
   /* Only used by leader */
+  uv_tcp_t tcp;
+  addr_t addr;
+
+  uv_connect_t connect;         /* Outgoing connection only */
+  int connected;      /* 0: not connected, <0: connecting after error, 1 = connected ok */
+
+  lsocket_t *lsocket;         /* Incoming connection only */
+
   uv_timer_t timer;
   uv_write_t write;
-  uv_shutdown_t shutdown;
   size_t writing;               /* size of pending write request, 0 if none pending */
-
-  /* Outgoing connection only */
-  uv_connect_t connect;
+  uv_shutdown_t shutdown;
 
   /* Locked for thread-safe access */
   uv_mutex_t lock;
   wake_state wake;
 } pconnection_t;
 
+QUEUE_DECL(pconnection)
+
+typedef enum {
+  L_UNINIT,                     /**<< Not yet listening */
+  L_LISTENING,                  /**<< Listening */
+  L_CLOSE,                      /**<< Close requested  */
+  L_CLOSING,                    /**<< Socket close initiated, wait for close */
+  L_CLOSED                      /**<< User saw PN_LISTENER_CLOSED, all done  */
+} listener_state;
 
-/* a listener socket */
+/* A listener */
 struct pn_listener_t {
-  psocket_t psocket;
+  work_t work;                  /* Must be first to allow casting */
+
+  size_t nsockets;
+  lsocket_t *sockets;
+  lsocket_t prealloc[1];       /* Pre-allocated socket array, allocate larger if needed */
 
   /* Only used by owner thread */
   pn_event_batch_t batch;
@@ -154,14 +204,17 @@ struct pn_listener_t {
   void *context;
   size_t backlog;
 
+  /* Only used by leader */
+  addr_t addr;
+
   /* Locked for thread-safe access. uv_listen can't be stopped or cancelled so we can't
    * detach a listener from the UV loop to prevent concurrent access.
    */
   uv_mutex_t lock;
   pn_condition_t *condition;
   pn_collector_t *collector;
-  queue          accept;        /* pconnection_t for uv_accept() */
-  bool closed;
+  pconnection_queue_t accept;   /* pconnection_t list for accepting */
+  listener_state state;
 };
 
 struct pn_proactor_t {
@@ -177,11 +230,11 @@ struct pn_proactor_t {
 
   /* Protected by lock */
   uv_mutex_t lock;
-  queue worker_q;               /* psockets ready for work, to be returned via pn_proactor_wait()  */
-  queue leader_q;               /* psockets waiting for attention by the leader thread */
+  work_queue_t worker_q;               /* ready for work, to be returned via pn_proactor_wait()  */
+  work_queue_t leader_q;               /* waiting for attention by the leader thread */
   size_t interrupt;             /* pending interrupts */
   pn_millis_t timeout;
-  size_t count;                 /* psocket count */
+  size_t count;                 /* connection/listener count for INACTIVE events */
   bool inactive;
   bool timeout_request;
   bool timeout_elapsed;
@@ -189,62 +242,40 @@ struct pn_proactor_t {
   bool batch_working;          /* batch is being processed in a worker thread */
 };
 
-static void push_lh(queue *q, psocket_t *ps) {
-  assert(ps->next == &UNLISTED);
-  ps->next = NULL;
-  if (!q->front) {
-    q->front = q->back = ps;
-  } else {
-    q->back->next = ps;
-    q->back =  ps;
-  }
-}
-
-/* Pop returns front of q or NULL if empty */
-static psocket_t* pop_lh(queue *q) {
-  psocket_t *ps = q->front;
-  if (ps) {
-    q->front = ps->next;
-    ps->next = &UNLISTED;
-  }
-  return ps;
-}
 
 /* Notify the leader thread that there is something to do outside of uv_run() */
 static inline void notify(pn_proactor_t* p) {
   uv_async_send(&p->async);
 }
 
-/* Notify that this socket needs attention from the leader at the next opportunity */
-static void psocket_notify(psocket_t *ps) {
-  uv_mutex_lock(&ps->proactor->lock);
+/* Notify that this work item needs attention from the leader at the next opportunity */
+static void work_notify(work_t *w) {
+  uv_mutex_lock(&w->proactor->lock);
   /* If the socket is in use by a worker or is already queued then leave it where it is.
      It will be processed in pn_proactor_done() or when the queue it is on is processed.
   */
-  if (!ps->working && ps->next == &UNLISTED) {
-    push_lh(&ps->proactor->leader_q, ps);
-    notify(ps->proactor);
+  if (!w->working && w->next == work_unqueued) {
+    work_push(&w->proactor->leader_q, w);
+    notify(w->proactor);
   }
-  uv_mutex_unlock(&ps->proactor->lock);
+  uv_mutex_unlock(&w->proactor->lock);
 }
 
-/* Notify the leader of a newly-created socket */
-static void psocket_start(psocket_t *ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (ps->next == &UNLISTED) {  /* No-op if already queued */
-    ps->working = false;
-    push_lh(&ps->proactor->leader_q, ps);
-    notify(ps->proactor);
-    uv_mutex_unlock(&ps->proactor->lock);
+/* Notify the leader of a newly-created work item */
+static void work_start(work_t *w) {
+  uv_mutex_lock(&w->proactor->lock);
+  if (w->next == work_unqueued) {  /* No-op if already queued */
+    w->working = false;
+    work_push(&w->proactor->leader_q, w);
+    notify(w->proactor);
+    uv_mutex_unlock(&w->proactor->lock);
   }
 }
 
-static inline pconnection_t *as_pconnection(psocket_t* ps) {
-  return ps->is_conn ? (pconnection_t*)ps : NULL;
-}
-
-static inline pn_listener_t *as_listener(psocket_t* ps) {
-  return ps->is_conn ? NULL: (pn_listener_t*)ps;
+static void parse_addr(addr_t *addr, const char *str) {
+  strncpy(addr->addr, str, sizeof(addr->addr));
+  char *scheme, *user, *pass, *path;
+  pni_parse_url(addr->addr, &scheme, &user, &pass, &addr->host, &addr->port, &path);
 }
 
 /* Make a pn_class for pconnection_t since it is attached to a pn_connection_t record */
@@ -262,7 +293,7 @@ static void pconnection_finalize(void *vp_pconnection) {
 
 static const pn_class_t pconnection_class = PN_CLASS(pconnection);
 
-static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool server) {
   /* pconnection_t is a pn_class instance so we can attach it to the pn_connection_t and
      it will be finalized when the pn_connection_t is freed.
   */
@@ -272,15 +303,18 @@ static pconnection_t *pconnection(pn_proactor_t *p, pn_connection_t *c, bool ser
   if (!pc || pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
     return NULL;
   }
-  psocket_init(&pc->psocket, p,  true, host, port);
-  pc->write.data = &pc->psocket;
+  work_init(&pc->work, p,  T_CONNECTION);
+  pc->next = pconnection_unqueued;
+  pc->write.data = &pc->work;
   if (server) {
     pn_transport_set_server(pc->driver.transport);
   }
+  pc->addr.host = pc->addr.port = pc->addr.addr; /* Set host/port to "" by default */
   pn_record_t *r = pn_connection_attachments(pc->driver.connection);
   pn_record_def(r, PN_PROACTOR, &pconnection_class);
   pn_record_set(r, PN_PROACTOR, pc);
   pn_decref(pc);                /* Will be deleted when the connection is */
+  pc->addr.host = pc->addr.port = pc->addr.addr; /* Set host/port to "" by default */
   return pc;
 }
 
@@ -302,38 +336,51 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
   return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
 }
 
-static inline psocket_t *batch_psocket(pn_event_batch_t *batch) {
+static inline work_t *batch_work(pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
-  if (pc) return &pc->psocket;
+  if (pc) return &pc->work;
   pn_listener_t *l = batch_listener(batch);
-  if (l) return &l->psocket;
+  if (l) return &l->work;
   return NULL;
 }
 
-static void leader_count(pn_proactor_t *p, int change) {
+/* Total count of listener and connections for PN_PROACTOR_INACTIVE */
+static void leader_inc(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
-  p->count += change;
-  if (p->count == 0) {
+  ++p->count;
+  uv_mutex_unlock(&p->lock);
+}
+
+static void leader_dec(pn_proactor_t *p) {
+  uv_mutex_lock(&p->lock);
+  assert(p->count > 0);
+  if (--p->count == 0) {
     p->inactive = true;
     notify(p);
   }
   uv_mutex_unlock(&p->lock);
 }
 
-static void pn_listener_free(pn_listener_t *l);
+static void pconnection_free(pconnection_t *pc) {
+  if (pc->addr.getaddrinfo.addrinfo) {
+    uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Interrupted after resolve */
+  }
+  pn_incref(pc);                /* Make sure we don't do a circular free */
+  pn_connection_driver_destroy(&pc->driver);
+  pn_decref(pc);
+  /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */
+}
 
-/* Final close event for for a pconnection_t, closes the timer */
+/* Final close event for for a pconnection_t, disconnects from proactor */
 static void on_close_pconnection_final(uv_handle_t *h) {
-  /* If the life of the pn_connection_t has been extended with reference counts
-     we want the pconnection_t to have the same lifespan so calls to pn_connection_wake
-     will be valid (but no-ops)
+  /* Free resources associated with a pconnection_t.
+     If the life of the pn_connection_t has been extended with reference counts
+     we want the pconnection_t to have the same lifespan so calls to pn_connection_wake()
+     will be valid, but no-ops.
   */
   pconnection_t *pc = (pconnection_t*)h->data;
-  /* Break circular references */
-  pn_incref(pc);                /* Don't let driver_destroy free pc */
-  pn_connection_driver_destroy(&pc->driver);
-  pn_decref(pc);
-  /* Now pc is freed iff the connection is, otherwise remains till the is freed. */
+  leader_dec(pc->work.proactor);
+  pconnection_free(pc);
 }
 
 static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) {
@@ -342,18 +389,27 @@ static void uv_safe_close(uv_handle_t *h, uv_close_cb cb) {
   }
 }
 
-/* Close event for uv_tcp_t of a psocket_t */
-static void on_close_psocket(uv_handle_t *h) {
-  psocket_t *ps = (psocket_t*)h->data;
-  leader_count(ps->proactor, -1);
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection(ps);
-    uv_timer_stop(&pc->timer);
-    /* Delay the free till the timer handle is also closed */
-    uv_safe_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
-  } else {
-    pn_listener_free(as_listener(ps));
+static void on_close_pconnection(uv_handle_t *h) {
+  pconnection_t *pc = (pconnection_t*)h->data;
+  /* Delay the free till the timer handle is also closed */
+  uv_timer_stop(&pc->timer);
+  uv_safe_close((uv_handle_t*)&pc->timer, on_close_pconnection_final);
+}
+
+static void listener_close_lh(pn_listener_t* l) {
+  if (l->state < L_CLOSE) {
+    l->state = L_CLOSE;
   }
+  work_notify(&l->work);
+}
+
+static void on_close_lsocket(uv_handle_t *h) {
+  lsocket_t* ls = (lsocket_t*)h->data;
+  pn_listener_t *l = ls->parent;
+  uv_mutex_lock(&l->lock);
+  --l->nsockets;
+  listener_close_lh(l);
+  uv_mutex_unlock(&l->lock);
 }
 
 static pconnection_t *get_pconnection(pn_connection_t* c) {
@@ -364,174 +420,280 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
   return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
 }
 
+static inline void pconnection_bad_connect(pconnection_t *pc, int err) {
+  if (!pc->connected) {
+    pc->connected = err;        /* Remember first connect error in case they all fail  */
+  }
+}
+
 static void pconnection_error(pconnection_t *pc, int err, const char* what) {
   assert(err);
+  pconnection_bad_connect(pc, err);
   pn_connection_driver_t *driver = &pc->driver;
   pn_connection_driver_bind(driver); /* Bind so errors will be reported */
   if (!pn_condition_is_set(pn_transport_condition(driver->transport))) {
     pn_connection_driver_errorf(driver, uv_err_name(err), "%s %s:%s: %s",
-                                what, fixstr(pc->psocket.host), fixstr(pc->psocket.port),
+                                what, pc->addr.host, pc->addr.port,
                                 uv_strerror(err));
   }
   pn_connection_driver_close(driver);
 }
 
-static void listener_error(pn_listener_t *l, int err, const char* what) {
+static void listener_error_lh(pn_listener_t *l, int err, const char* what) {
   assert(err);
-  uv_mutex_lock(&l->lock);
   if (!pn_condition_is_set(l->condition)) {
     pn_condition_format(l->condition, uv_err_name(err), "%s %s:%s: %s",
-                        what, fixstr(l->psocket.host), fixstr(l->psocket.port),
+                        what, l->addr.host, l->addr.port,
                         uv_strerror(err));
   }
+  listener_close_lh(l);
+}
+
+static void listener_error(pn_listener_t *l, int err, const char* what) {
+  uv_mutex_lock(&l->lock);
+  listener_error_lh(l, err, what);
   uv_mutex_unlock(&l->lock);
-  pn_listener_close(l);
 }
 
-static void psocket_error(psocket_t *ps, int err, const char* what) {
-  if (ps->is_conn) {
-    pconnection_error(as_pconnection(ps), err, what);
+static int pconnection_init(pconnection_t *pc) {
+  int err = 0;
+  err = uv_tcp_init(&pc->work.proactor->loop, &pc->tcp);
+  if (!err) {
+    pc->tcp.data = pc;
+    pc->connect.data = pc;
+    err = uv_timer_init(&pc->work.proactor->loop, &pc->timer);
+    if (!err) {
+      pc->timer.data = pc;
+    } else {
+      uv_close((uv_handle_t*)&pc->tcp, NULL);
+    }
+  }
+  if (!err) {
+    leader_inc(pc->work.proactor);
   } else {
-    listener_error(as_listener(ps), err, what);
+    pconnection_error(pc, err, "initialization");
   }
+  return err;
 }
 
-/* psocket uv-initialization */
-static int leader_init(psocket_t *ps) {
-  ps->working = false;
-  ps->tcp.data = ps;
-  leader_count(ps->proactor, +1);
-  int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
+static void try_connect(pconnection_t *pc);
+
+static void on_connect_fail(uv_handle_t *handle) {
+  pconnection_t *pc = (pconnection_t*)handle->data;
+  /* Create a new TCP socket, the current one is closed */
+  int err = uv_tcp_init(&pc->work.proactor->loop, &pc->tcp);
   if (err) {
-    psocket_error(ps, err, "initialization");
+    pc->connected = err;
+    pc->addr.addrinfo = NULL; /* No point in trying anymore, we can't create a socket */
   } else {
-    pconnection_t *pc = as_pconnection(ps);
-    if (pc) {
-      pc->connect.data = ps;
-      int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
-      if (!err) {
-        pc->timer.data = ps;
-      }
-    }
+    try_connect(pc);
   }
-  return err;
 }
 
 /* Outgoing connection */
 static void on_connect(uv_connect_t *connect, int err) {
   pconnection_t *pc = (pconnection_t*)connect->data;
-  if (err) pconnection_error(pc, err, "on connect to");
-  psocket_notify(&pc->psocket);
+  if (!err) {
+    pc->connected = 1;
+    pn_connection_open(pc->driver.connection);
+    work_notify(&pc->work);
+    uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo); /* Done with address info */
+    pc->addr.getaddrinfo.addrinfo = NULL;
+  } else {
+    pconnection_bad_connect(pc, err);
+    uv_safe_close((uv_handle_t*)&pc->tcp, on_connect_fail); /* Try the next addr if there is one */
+  }
 }
 
 /* Incoming connection ready to be accepted */
 static void on_connection(uv_stream_t* server, int err) {
   /* Unlike most on_* functions, this can be called by the leader thread when the listener
-   * is ON_WORKER or ON_LEADER, because there's no way to stop libuv from calling
-   * on_connection(). Update the state of the listener and queue it for leader attention.
+   * is ON_WORKER or ON_LEADER, because
+   *
+   * 1. There's no way to stop libuv from calling on_connection().
+   * 2. There can be multiple lsockets per listener.
+   *
+   * Update the state of the listener and queue it for leader attention.
    */
-  pn_listener_t *l = (pn_listener_t*) server->data;
+  lsocket_t *ls = (lsocket_t*)server->data;
+  pn_listener_t *l = ls->parent;
   if (err) {
     listener_error(l, err, "on incoming connection");
   } else {
     uv_mutex_lock(&l->lock);
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+    pn_collector_put(l->collector, lsocket__class(), ls, PN_LISTENER_ACCEPT);
     uv_mutex_unlock(&l->lock);
-    psocket_notify(&l->psocket);
   }
+  work_notify(&l->work);
 }
 
 /* Common address resolution for leader_listen and leader_connect */
-static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
-  int err = leader_init(ps);
+static int leader_resolve(pn_proactor_t *p, addr_t *addr, bool listen) {
   struct addrinfo hints = { 0 };
-  if (server) hints.ai_flags = AI_PASSIVE;
-  if (!err) {
-    err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  /* Note this looks contradictory since we disable V4 mapping in bind() but it is
+     correct - read the getaddrinfo man page carefully!
+  */
+  hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG;
+  if (listen) {
+    hints.ai_flags |= AI_PASSIVE | AI_ALL;
   }
+  int err = uv_getaddrinfo(&p->loop, &addr->getaddrinfo, NULL,
+                           *addr->host ? addr->host : NULL, addr->port, &hints);
+  addr->addrinfo = addr->getaddrinfo.addrinfo; /* Start with the first addrinfo */
   return err;
 }
 
-static void leader_connect(pconnection_t *pc) {
-  uv_getaddrinfo_t info;
-  int err = leader_resolve(&pc->psocket, &info, false);
-  if (!err) {
-    err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
-    uv_freeaddrinfo(info.addrinfo);
+/* Try to connect to the current addrinfo. Called by leader and via callbacks for retry.*/
+static void try_connect(pconnection_t *pc) {
+  struct addrinfo *ai = pc->addr.addrinfo;
+  if (!ai) {                    /* End of list, connect fails */
+    uv_freeaddrinfo(pc->addr.getaddrinfo.addrinfo);
+    pc->addr.getaddrinfo.addrinfo = NULL;
+    pconnection_bad_connect(pc, UV_EAI_NODATA);
+    pconnection_error(pc, pc->connected, "connecting to");
+    work_notify(&pc->work);
+  } else {
+    pc->addr.addrinfo = ai->ai_next; /* Advance for next attempt */
+    int err = uv_tcp_connect(&pc->connect, &pc->tcp, ai->ai_addr, on_connect);
+    if (err) {
+      pconnection_bad_connect(pc, err);
+      uv_close((uv_handle_t*)&pc->tcp, on_connect_fail); /* Queue up next attempt */
+    }
   }
+}
+
+static bool leader_connect(pconnection_t *pc) {
+  int err = pconnection_init(pc);
+  if (!err) err = leader_resolve(pc->work.proactor, &pc->addr, false);
   if (err) {
-    pconnection_error(pc, err, "connecting to");
+    pconnection_error(pc, err, "connect resolving");
+    return true;
   } else {
-    pn_connection_open(pc->driver.connection);
+    try_connect(pc);
+    return false;
   }
 }
 
-static void leader_listen(pn_listener_t *l) {
-  uv_getaddrinfo_t info;
-  int err = leader_resolve(&l->psocket, &info, true);
+static int lsocket_init(lsocket_t *ls, pn_listener_t *l, struct addrinfo *ai) {
+  ls->type = T_LSOCKET;
+  ls->parent = l;
+  ls->tcp.data = ls;
+  int err = uv_tcp_init(&l->work.proactor->loop, &ls->tcp);
   if (!err) {
-    err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
-    uv_freeaddrinfo(info.addrinfo);
+    int flags = (ai->ai_family == AF_INET6) ? UV_TCP_IPV6ONLY : 0;
+    err = uv_tcp_bind(&ls->tcp, ai->ai_addr, flags);
+    if (!err) err = uv_listen((uv_stream_t*)&ls->tcp, l->backlog, on_connection);
+    if (err) uv_close((uv_handle_t*)&ls->tcp, NULL);
   }
+  return err;
+}
+
+#define ARRAY_LEN(A) (sizeof(A)/sizeof(*(A)))
+
+/* Listen on all available addresses */
+static void leader_listen_lh(pn_listener_t *l) {
+  leader_inc(l->work.proactor);
+  int err = leader_resolve(l->work.proactor, &l->addr, true);
   if (!err) {
-    err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection);
+    /* Count addresses, allocate enough space */
+    size_t len = 0;
+    for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
+      ++len;
+    }
+    assert(len > 0);            /* Guaranteed by getaddrinfo() */
+    l->sockets = (len > ARRAY_LEN(l->prealloc)) ? (lsocket_t*)calloc(len, sizeof(lsocket_t)) : l->prealloc;
+    /* Find the working addresses */
+    l->nsockets = 0;
+    int first_err = 0;
+    for (struct addrinfo *ai = l->addr.getaddrinfo.addrinfo; ai; ai = ai->ai_next) {
+      lsocket_t *ls = &l->sockets[l->nsockets];
+      int err2 = lsocket_init(ls, l, ai);
+      if (!err2) {
+        ++l->nsockets;                    /* Next socket */
+      } else if (!first_err) {
+        first_err = err2;
+      }
+    }
+    uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo);
+    l->addr.getaddrinfo.addrinfo = NULL;
+    if (l->nsockets == 0) err = first_err;
   }
-  uv_mutex_lock(&l->lock);
   /* Always put an OPEN event for symmetry, even if we immediately close with err */
   pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN);
-  uv_mutex_unlock(&l->lock);
   if (err) {
-    listener_error(l, err, "listening on");
+    listener_error_lh(l, err, "listening on");
   }
 }
 
-static bool listener_has_work(pn_listener_t *l) {
-  uv_mutex_lock(&l->lock);
-  bool has_work = pn_collector_peek(l->collector);
-  uv_mutex_unlock(&l->lock);
-  return has_work;
-}
-
-static pconnection_t *listener_pop(pn_listener_t *l) {
-  uv_mutex_lock(&l->lock);
-  pconnection_t *pc = (pconnection_t*)pop_lh(&l->accept);
-  uv_mutex_unlock(&l->lock);
-  return pc;
-}
-
-static bool listener_finished(pn_listener_t *l) {
-  uv_mutex_lock(&l->lock);
-  bool finished = l->closed && !pn_collector_peek(l->collector) && !l->accept.front;
-  uv_mutex_unlock(&l->lock);
-  return finished;
+static void pn_listener_free(pn_listener_t *l) {
+  if (l) {
+    if (l->addr.getaddrinfo.addrinfo) { /* Interrupted after resolve */
+      uv_freeaddrinfo(l->addr.getaddrinfo.addrinfo);
+    }
+    if (l->collector) pn_collector_free(l->collector);
+    if (l->condition) pn_condition_free(l->condition);
+    if (l->attachments) pn_free(l->attachments);
+    if (l->sockets && l->sockets != l->prealloc) free(l->sockets);
+    assert(!l->accept.front);
+    free(l);
+  }
 }
 
 /* Process a listener, return true if it has events for a worker thread */
 static bool leader_process_listener(pn_listener_t *l) {
   /* NOTE: l may be concurrently accessed by on_connection() */
+  bool closed = false;
+  uv_mutex_lock(&l->lock);
+  switch (l->state) {
 
-  if (l->psocket.tcp.data == NULL) {
-    /* Start listening if not already listening */
-    leader_listen(l);
-  } else if (listener_finished(l)) {
-    /* Close if listener is finished. */
-    uv_safe_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket);
-    return false;
-  } else {
-    /* Process accepted connections if any */
-    pconnection_t *pc;
-    while ((pc = listener_pop(l))) {
-      int err = leader_init(&pc->psocket);
-      if (!err) {
-        err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
-      } else {
-        listener_error(l, err, "accepting from");
-        pconnection_error(pc, err, "accepting from");
-      }
-      psocket_start(&pc->psocket);
+   case L_UNINIT:
+    l->state = L_LISTENING;
+    leader_listen_lh(l);
+    break;
+
+   case L_LISTENING:
+    break;
+
+   case L_CLOSE:                /* Close requested, start closing lsockets */
+    l->state = L_CLOSING;
+    for (size_t i = 0; i < l->nsockets; ++i) {
+      uv_safe_close((uv_handle_t*)&l->sockets[i].tcp, on_close_lsocket);
+    }
+    /* NOTE: Fall through in case we have 0 sockets - e.g. resolver error */
+
+   case L_CLOSING:              /* Closing - can we send PN_LISTENER_CLOSE? */
+    if (l->nsockets == 0) {
+      l->state = L_CLOSED;
+      pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+    }
+    break;
+
+   case L_CLOSED:              /* Closed, has LISTENER_CLOSE has been processed? */
+    if (!pn_collector_peek(l->collector)) {
+      leader_dec(l->work.proactor);
+      closed = true;
+    }
+  }
+  /* Process accepted connections - if we are closed they will get an error */
+  for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) {
+    int err = pconnection_init(pc);
+    if (!err) {
+      err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp);
+    } else {
+      listener_error(l, err, "accepting from");
+      pconnection_error(pc, err, "accepting from");
     }
+    work_start(&pc->work);      /* Process events for the accepted/failed connection */
   }
-  return listener_has_work(l);
+  bool has_work = !closed && pn_collector_peek(l->collector);
+  uv_mutex_unlock(&l->lock);
+
+  if (closed) {
+    pn_listener_free(l);
+  }
+  return has_work;
 }
 
 /* Generate tick events and return millis till next tick or 0 if no tick is required */
@@ -544,7 +706,7 @@ static pn_millis_t leader_tick(pconnection_t *pc) {
 static void on_tick(uv_timer_t *timer) {
   pconnection_t *pc = (pconnection_t*)timer->data;
   leader_tick(pc);
-  psocket_notify(&pc->psocket);
+  work_notify(&pc->work);
 }
 
 static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
@@ -556,7 +718,7 @@ static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
   } else {
     pconnection_error(pc, nread, "on read from");
   }
-  psocket_notify(&pc->psocket);
+  work_notify(&pc->work);
 }
 
 static void on_write(uv_write_t* write, int err) {
@@ -568,7 +730,7 @@ static void on_write(uv_write_t* write, int err) {
   } else {
     pn_connection_driver_write_done(&pc->driver, size);
   }
-  psocket_notify(&pc->psocket);
+  work_notify(&pc->work);
 }
 
 static void on_timeout(uv_timer_t *timer) {
@@ -592,13 +754,6 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t)
   return &p->batch;
 }
 
-static void on_stopping(uv_handle_t* h, void* v) {
-  /* Mark all sockets with an error, pn_proactor_free will clear the resulting events */
-  if (h->type == UV_TCP) {
-    psocket_error((psocket_t*)h->data, UV_ESHUTDOWN, "proactor free");
-  }
-}
-
 static pn_event_t *log_event(void* p, pn_event_t *e) {
   if (e) {
     pn_logf("[%p]:(%s)", (void*)p, pn_event_type_name(pn_event_type(e)));
@@ -620,16 +775,6 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
   return log_event(p, pn_collector_next(p->collector));
 }
 
-static void pn_listener_free(pn_listener_t *l) {
-  if (l) {
-    if (l->collector) pn_collector_free(l->collector);
-    if (l->condition) pn_condition_free(l->condition);
-    if (l->attachments) pn_free(l->attachments);
-    assert(!l->accept.front);
-    free(l);
-  }
-}
-
 /* Return the next event batch or NULL if no events are available */
 static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
   if (!p->batch_working) {       /* Can generate proactor events */
@@ -646,12 +791,15 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
       return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
     }
   }
-  for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
-    assert(ps->working);
-    if (ps->is_conn) {
-      return &as_pconnection(ps)->driver.batch;
-    } else {                    /* Listener */
-      return &as_listener(ps)->batch;
+  for (work_t *w = work_pop(&p->worker_q); w; w = work_pop(&p->worker_q)) {
+    assert(w->working);
+    switch (w->type) {
+     case T_CONNECTION:
+      return &((pconnection_t*)w)->driver.batch;
+     case T_LISTENER:
+      return &((pn_listener_t*)w)->batch;
+     default:
+      break;
     }
   }
   return NULL;
@@ -671,18 +819,18 @@ static void check_wake(pconnection_t *pc) {
 /* Process a pconnection, return true if it has events for a worker thread */
 static bool leader_process_pconnection(pconnection_t *pc) {
   /* Important to do the following steps in order */
+  if (!pc->connected) {
+    return leader_connect(pc);
+  }
   if (pc->writing) {
     /* We can't do anything while a write request is pending */
     return false;
   }
-  if (pc->psocket.tcp.data == NULL) {
-    /* Start the connection if not already connected */
-    leader_connect(pc);
-  } else if (pn_connection_driver_finished(&pc->driver)) {
+  if (pn_connection_driver_finished(&pc->driver)) {
     uv_mutex_lock(&pc->lock);
-    pc->wake = W_CLOSED;        /* wake() cannot notify anymore */
+    pc->wake = W_CLOSED;        /* wake() is a no-op from now on */
     uv_mutex_unlock(&pc->lock);
-    uv_safe_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket);
+    uv_safe_close((uv_handle_t*)&pc->tcp, on_close_pconnection);
   } else {
     /* Check for events that can be generated without blocking for IO */
     check_wake(pc);
@@ -702,17 +850,17 @@ static bool leader_process_pconnection(pconnection_t *pc) {
         what = "write";
         if (wbuf.size > 0) {
           uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-          err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+          err = uv_write(&pc->write, (uv_stream_t*)&pc->tcp, &buf, 1, on_write);
           if (!err) {
             pc->writing = wbuf.size;
           }
         } else if (pn_connection_driver_write_closed(&pc->driver)) {
-          uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL);
+          uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->tcp, NULL);
         }
       }
       if (!err && rbuf.size > 0) {
         what = "read";
-        err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+        err = uv_read_start((uv_stream_t*)&pc->tcp, alloc_read_buffer, on_read);
       }
       if (err) {
         /* Some IO requests failed, generate the error events */
@@ -725,8 +873,8 @@ static bool leader_process_pconnection(pconnection_t *pc) {
 
 /* Detach a connection from the UV loop so it can be used safely by a worker */
 void pconnection_detach(pconnection_t *pc) {
-  if (!pc->writing) {           /* Can't detach while a write is pending */
-    uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+  if (pc->connected && !pc->writing) {           /* Can't detach while a write is pending */
+    uv_read_stop((uv_stream_t*)&pc->tcp);
     uv_timer_stop(&pc->timer);
   }
 }
@@ -734,21 +882,29 @@ void pconnection_detach(pconnection_t *pc) {
 /* Process the leader_q and the UV loop, in the leader thread */
 static pn_event_batch_t *leader_lead_lh(pn_proactor_t *p, uv_run_mode mode) {
   pn_event_batch_t *batch = NULL;
-  for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-    assert(!ps->working);
+  for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) {
+    assert(!w->working);
 
     uv_mutex_unlock(&p->lock);  /* Unlock to process each item, may add more items to leader_q */
-    bool has_work = ps->is_conn ?
-      leader_process_pconnection(as_pconnection(ps)) :
-      leader_process_listener(as_listener(ps));
+    bool has_work = false;
+    switch (w->type) {
+     case T_CONNECTION:
+      has_work = leader_process_pconnection((pconnection_t*)w);
+      break;
+     case T_LISTENER:
+      has_work = leader_process_listener((pn_listener_t*)w);
+      break;
+     default:
+      break;
+    }
     uv_mutex_lock(&p->lock);
 
-    if (has_work && !ps->working && ps->next == &UNLISTED) {
-      if (ps->is_conn) {
-        pconnection_detach(as_pconnection(ps));
+    if (has_work && !w->working && w->next == work_unqueued) {
+      if (w->type == T_CONNECTION) {
+        pconnection_detach((pconnection_t*)w);
       }
-      ps->working = true;
-      push_lh(&p->worker_q, ps);
+      w->working = true;
+      work_push(&p->worker_q, w);
     }
   }
   batch = get_batch_lh(p);      /* Check for work */
@@ -805,13 +961,14 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
 }
 
 void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+  if (!batch) return;
   uv_mutex_lock(&p->lock);
-  psocket_t *ps = batch_psocket(batch);
-  if (ps) {
-    assert(ps->working);
-    assert(ps->next == &UNLISTED);
-    ps->working = false;
-    push_lh(&p->leader_q, ps);
+  work_t *w = batch_work(batch);
+  if (w) {
+    assert(w->working);
+    assert(w->next == work_unqueued);
+    w->working = false;
+    work_push(&p->leader_q, w);
   }
   pn_proactor_t *bp = batch_proactor(batch); /* Proactor events */
   if (bp == p) {
@@ -822,7 +979,13 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
 }
 
 pn_listener_t *pn_event_listener(pn_event_t *e) {
-  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+  if (pn_event_class(e) == pn_listener__class()) {
+    return (pn_listener_t*)pn_event_context(e);
+  } else if (pn_event_class(e) == lsocket__class()) {
+    return ((lsocket_t*)pn_event_context(e))->parent;
+  } else {
+    return NULL;
+  }
 }
 
 pn_proactor_t *pn_event_proactor(pn_event_t *e) {
@@ -831,7 +994,7 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) {
   }
   pn_listener_t *l = pn_event_listener(e);
   if (l) {
-    return l->psocket.proactor;
+    return l->work.proactor;
   }
   pn_connection_t *c = pn_event_connection(e);
   if (c) {
@@ -856,33 +1019,21 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
 }
 
 int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) {
-  char *buf = strdup(addr);
-  if (!buf) {
-    return PN_OUT_OF_MEMORY;
-  }
-  char *scheme, *user, *pass, *host, *port, *path;
-  pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path);
-  pconnection_t *pc = pconnection(p, c, false, host, port);
-  free(buf);
-  if (!pc) {
+  pconnection_t *pc = pconnection(p, c, false);
+  if (pc) {
+    parse_addr(&pc->addr, addr);
+    work_start(&pc->work);
+  } else {
     return PN_OUT_OF_MEMORY;
   }
-  psocket_start(&pc->psocket);
   return 0;
 }
 
 int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) {
-  assert(!l->closed);
-  char *buf = strdup(addr);
-  if (!buf) {
-    return PN_OUT_OF_MEMORY;
-  }
-  char *scheme, *user, *pass, *host, *port, *path;
-  pni_parse_url(buf, &scheme, &user, &pass, &host, &port, &path);
-  psocket_init(&l->psocket, p, false, host, port);
-  free(buf);
+  work_init(&l->work, p, T_LISTENER);
+  parse_addr(&l->addr, addr);
   l->backlog = backlog;
-  psocket_start(&l->psocket);
+  work_start(&l->work);
   return 0;
 }
 
@@ -900,26 +1051,41 @@ pn_proactor_t *pn_proactor() {
   return p;
 }
 
-void pn_proactor_free(pn_proactor_t *p) {
-  if (p->count > 0) {
-    uv_walk(&p->loop, on_stopping, NULL); /* Set errors on all sockets */
-    /* Drain all events so sockets can close normally */
-    pn_event_t *e = NULL;
-    do {
-      pn_event_batch_t *eb = pn_proactor_wait(p);
-      e = pn_event_batch_next(eb);
-      while (e && pn_event_type(e) != PN_PROACTOR_INACTIVE) {
-        e = pn_event_batch_next(eb);
-      }
-      pn_proactor_done(p, eb);
-    } while (pn_event_type(e) != PN_PROACTOR_INACTIVE);
+static void on_proactor_free(uv_handle_t* h, void* v) {
+  uv_safe_close(h, NULL);       /* Close the handle */
+  if (h->type == UV_TCP) {      /* Put the corresponding work item on the leader_q for cleanup */
+    work_t *w = NULL;
+    switch (*(struct_type*)h->data) {
+     case T_CONNECTION: w = (work_t*)h->data; break;
+     case T_LSOCKET: w = &((lsocket_t*)h->data)->parent->work; break;
+     default: break;
+    }
+    if (w && w->next == work_unqueued) {
+      work_push(&w->proactor->leader_q, w); /* Save to be freed after all closed */
+    }
   }
-  /* Close the the proactor handles */
-  uv_timer_stop(&p->timer);
-  uv_safe_close((uv_handle_t*)&p->timer, NULL);
-  uv_safe_close((uv_handle_t*)&p->async, NULL);
+}
+
+static void work_free(work_t *w) {
+  switch (w->type) {
+   case T_CONNECTION: pconnection_free((pconnection_t*)w); break;
+   case T_LISTENER: pn_listener_free((pn_listener_t*)w); break;
+   default: break;
+  }
+}
+
+void pn_proactor_free(pn_proactor_t *p) {
+  /* Close all open handles */
+  uv_walk(&p->loop, on_proactor_free, NULL);
   while (uv_loop_alive(&p->loop)) {
-    uv_run(&p->loop, UV_RUN_NOWAIT); /* Run till all handles closed */
+    uv_run(&p->loop, UV_RUN_DEFAULT); /* Finish closing the proactor handles */
+  }
+  /* Free all work items */
+  for (work_t *w = work_pop(&p->leader_q); w; w = work_pop(&p->leader_q)) {
+    work_free(w);
+  }
+  for (work_t *w = work_pop(&p->worker_q); w; w = work_pop(&p->worker_q)) {
+    work_free(w);
   }
   uv_loop_close(&p->loop);
   uv_mutex_destroy(&p->lock);
@@ -930,7 +1096,7 @@ void pn_proactor_free(pn_proactor_t *p) {
 
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
   pconnection_t *pc = get_pconnection(c);
-  return pc ? pc->psocket.proactor : NULL;
+  return pc ? pc->work.proactor : NULL;
 }
 
 void pn_connection_wake(pn_connection_t* c) {
@@ -945,7 +1111,7 @@ void pn_connection_wake(pn_connection_t* c) {
     }
     uv_mutex_unlock(&pc->lock);
     if (notify) {
-      psocket_notify(&pc->psocket);
+      work_notify(&pc->work);
     }
   }
 }
@@ -968,16 +1134,12 @@ pn_listener_t *pn_listener(void) {
 void pn_listener_close(pn_listener_t* l) {
   /* May be called from any thread */
   uv_mutex_lock(&l->lock);
-  if (!l->closed) {
-    l->closed = true;
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-  }
+  listener_close_lh(l);
   uv_mutex_unlock(&l->lock);
-  psocket_notify(&l->psocket);
 }
 
 pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  return l ? l->psocket.proactor : NULL;
+  return l ? l->work.proactor : NULL;
 }
 
 pn_condition_t* pn_listener_condition(pn_listener_t* l) {
@@ -998,13 +1160,18 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 
 int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
   uv_mutex_lock(&l->lock);
-  assert(!l->closed);
-  pconnection_t *pc = pconnection(l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+  pconnection_t *pc = pconnection(l->work.proactor, c, true);
   if (!pc) {
     return PN_OUT_OF_MEMORY;
   }
-  push_lh(&l->accept, &pc->psocket);
+  /* Get the socket from the accept event that we are processing */
+  pn_event_t *e = pn_collector_prev(l->collector);
+  assert(pn_event_type(e) == PN_LISTENER_ACCEPT);
+  assert(pn_event_listener(e) == l);
+  pc->lsocket = (lsocket_t*)pn_event_context(e);
+  pc->connected = 1;            /* Don't need to connect() */
+  pconnection_push(&l->accept, pc);
   uv_mutex_unlock(&l->lock);
-  psocket_notify(&l->psocket);
+  work_notify(&l->work);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/src/tests/proactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c
index 41d889b..38237b0 100644
--- a/proton-c/src/tests/proactor.c
+++ b/proton-c/src/tests/proactor.c
@@ -32,7 +32,7 @@ static pn_millis_t timeout = 7*1000; /* timeout for hanging tests */
 
 static const char *localhost = "127.0.0.1"; /* host for connect/listen */
 
-typedef int (*test_handler_fn)(test_t *, pn_event_t*);
+typedef pn_event_t *(*test_handler_fn)(test_t *, pn_event_t*);
 
 /* Proactor and handler that take part in a test */
 typedef struct proactor_test_t {
@@ -61,27 +61,50 @@ static void proactor_test_free(proactor_test_t *pts, size_t n) {
 
 #define PROACTOR_TEST_FREE(A) proactor_test_free(A, sizeof(A)/sizeof(*A))
 
-/* Run an array of proactors till a handler returns non-0 */
-static int proactor_test_run(proactor_test_t *pts, size_t n) {
-  int ret = 0;
-  while (!ret) {
+/* Process events on a proactor array until a handler returns an event, or
+ * all proactors return NULL
+ */
+static pn_event_t *proactor_test_get(proactor_test_t *pts, size_t n) {
+  while (true) {
+    bool busy = false;
     for (proactor_test_t *pt = pts; pt < pts + n; ++pt) {
-      pn_event_batch_t *events = pn_proactor_get(pt->proactor);
-      if (events) {
-          pn_event_t *e = pn_event_batch_next(events);
-          TEST_CHECKF(pts->t, e, "empty batch");
-          while (e && !ret) {
-            if (!(ret = pt->handler(pt->t, e)))
-              e = pn_event_batch_next(events);
-          }
-          pn_proactor_done(pt->proactor, events);
+      pn_event_batch_t *eb =  pn_proactor_get(pt->proactor);
+      if (eb) {
+        busy = true;
+        pn_event_t *ret = NULL;
+        for (pn_event_t* e = pn_event_batch_next(eb); e; e = pn_event_batch_next(eb)) {
+          ret = pt->handler(pt->t, e);
+          if (ret) break;
+        }
+        pn_proactor_done(pt->proactor, eb);
+        if (ret) return ret;
       }
     }
+    if (!busy) {
+      return NULL;
+    }
   }
-  return ret;
 }
 
+/* Run an array of proactors till a handler returns an event. */
+static pn_event_t *proactor_test_run(proactor_test_t *pts, size_t n) {
+  pn_event_t *e;
+  while ((e = proactor_test_get(pts, n)) == NULL)
+         ;
+  return e;
+}
+
+
+/* Drain and discard outstanding events from an array of proactors */
+static void proactor_test_drain(proactor_test_t *pts, size_t n) {
+  while (proactor_test_get(pts, n))
+         ;
+}
+
+
+#define PROACTOR_TEST_GET(A) proactor_test_get((A), sizeof(A)/sizeof(*A))
 #define PROACTOR_TEST_RUN(A) proactor_test_run((A), sizeof(A)/sizeof(*A))
+#define PROACTOR_TEST_DRAIN(A) proactor_test_drain((A), sizeof(A)/sizeof(*A))
 
 /* Wait for the next single event, return its type */
 static pn_event_type_t wait_next(pn_proactor_t *proactor) {
@@ -104,31 +127,31 @@ static void test_interrupt_timeout(test_t *t) {
 }
 
 /* Common handler for simple client/server interactions,  */
-static int common_handler(test_t *t, pn_event_t *e) {
+static pn_event_t *common_handler(test_t *t, pn_event_t *e) {
   pn_connection_t *c = pn_event_connection(e);
   pn_listener_t *l = pn_event_listener(e);
 
   switch (pn_event_type(e)) {
 
     /* Stop on these events */
+   case PN_LISTENER_CLOSE:
    case PN_LISTENER_OPEN:
+   case PN_PROACTOR_INACTIVE:
    case PN_PROACTOR_TIMEOUT:
    case PN_TRANSPORT_CLOSED:
-   case PN_PROACTOR_INACTIVE:
-   case PN_LISTENER_CLOSE:
-    return pn_event_type(e);
+    return e;
 
    case PN_LISTENER_ACCEPT:
     pn_listener_accept(l, pn_connection());
-    return 0;
+    return NULL;
 
    case PN_CONNECTION_REMOTE_OPEN:
     pn_connection_open(c);      /* Return the open (no-op if already open) */
-    return 0;
+    return NULL;
 
    case PN_CONNECTION_REMOTE_CLOSE:
     pn_connection_close(c);     /* Return the close */
-    return 0;
+    return NULL;
 
     /* Ignored these events */
    case PN_CONNECTION_INIT:
@@ -139,47 +162,54 @@ static int common_handler(test_t *t, pn_event_t *e) {
    case PN_TRANSPORT_ERROR:
    case PN_TRANSPORT_HEAD_CLOSED:
    case PN_TRANSPORT_TAIL_CLOSED:
-    return 0;
+    return NULL;
 
    default:
     TEST_ERRORF(t, "unexpected event %s", pn_event_type_name(pn_event_type(e)));
-    return 0;                   /* Fail the test but keep going */
+    return NULL;                   /* Fail the test but keep going */
   }
 }
 
 /* close a connection when it is remote open */
-static int open_close_handler(test_t *t, pn_event_t *e) {
+static pn_event_t *open_close_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_CONNECTION_REMOTE_OPEN:
     pn_connection_close(pn_event_connection(e));
-    return 0;          /* common_handler will finish on TRANSPORT_CLOSED */
+    return NULL;          /* common_handler will finish on TRANSPORT_CLOSED */
    default:
     return common_handler(t, e);
   }
 }
 
-/* Simple client/server connection with 2 proactors */
+/* Test several client/server connection with 2 proactors */
 static void test_client_server(test_t *t) {
   proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
   PROACTOR_TEST_INIT(pts, t);
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);
   pn_proactor_listen(server, pn_listener(), port.host_port, 4);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
-  pn_proactor_connect(client, pn_connection(), port.host_port);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
+  /* Connect and wait for close at both ends */
+  pn_proactor_connect(client, pn_connection(), port.host_port);
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  /* Connect and wait for close at both ends */
+  pn_proactor_connect(client, pn_connection(), port.host_port);
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+
   PROACTOR_TEST_FREE(pts);
 }
 
 /* Return on connection open, close and return on wake */
-static int open_wake_handler(test_t *t, pn_event_t *e) {
+static pn_event_t *open_wake_handler(test_t *t, pn_event_t *e) {
   switch (pn_event_type(e)) {
    case PN_CONNECTION_REMOTE_OPEN:
-    return pn_event_type(e);
+    return e;
    case PN_CONNECTION_WAKE:
     pn_connection_close(pn_event_connection(e));
-    return pn_event_type(e);
+    return e;
    default:
     return common_handler(t, e);
   }
@@ -192,20 +222,21 @@ static void test_connection_wake(test_t *t) {
   pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
   test_port_t port = test_port(localhost);          /* Hold a port */
   pn_proactor_listen(server, pn_listener(), port.host_port, 4);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
 
   pn_connection_t *c = pn_connection();
   pn_incref(c);                 /* Keep c alive after proactor frees it */
   pn_proactor_connect(client, c, port.host_port);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */
   pn_connection_wake(c);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  PROACTOR_TEST_FREE(pts);
+  TEST_EVENT_TYPE(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+    /* The pn_connection_t is still valid so wake is legal but a no-op */
+  pn_connection_wake(c);
 
+  PROACTOR_TEST_FREE(pts);
   /* The pn_connection_t is still valid so wake is legal but a no-op */
   pn_connection_wake(c);
   pn_decref(c);
@@ -220,34 +251,26 @@ static void test_inactive(test_t *t) {
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, port.host_port,  4);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts));
   pn_connection_wake(c);
-  TEST_ETYPE_EQUAL(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_CONNECTION_WAKE, PROACTOR_TEST_RUN(pts));
   /* expect TRANSPORT_CLOSED from client and server, INACTIVE from client */
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
   /* server won't be INACTIVE until listener is closed */
   TEST_CHECK(t, pn_proactor_get(server) == NULL);
   pn_listener_close(l);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   sock_close(port.sock);
   PROACTOR_TEST_FREE(pts);
 }
 
-#define TEST_CHECK_ERROR(T, WANT, COND) do {                            \
-    TEST_CHECKF((T), pn_condition_is_set(COND), "expecting error");     \
-    const char* description = pn_condition_get_description(COND);       \
-    if (!strstr(description, (WANT))) {                                 \
-      TEST_ERRORF((T), "bad error, expected '%s' in '%s'", (WANT), description); \
-    }                                                                   \
-  } while(0)
-
 /* Tests for error handling */
 static void test_errors(test_t *t) {
   proactor_test_t pts[] =  { { open_wake_handler }, { common_handler } };
@@ -258,29 +281,121 @@ static void test_errors(test_t *t) {
   /* Invalid connect/listen parameters */
   pn_connection_t *c = pn_connection();
   pn_proactor_connect(client, c, "127.0.0.1:xxx");
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_CHECK_ERROR(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
+  TEST_CHECK_COND(t, "xxx", pn_transport_condition(pn_connection_transport(c)));
+  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   pn_listener_t *l = pn_listener();
   pn_proactor_listen(server, l, "127.0.0.1:xxx", 1);
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
-  TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
-  TEST_CHECK_ERROR(t, "xxx", pn_listener_condition(l));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  TEST_EVENT_TYPE(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts));
+  TEST_CHECK_COND(t, "xxx", pn_listener_condition(l));
+  TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
 
   /* Connect with no listener */
   c = pn_connection();
   pn_proactor_connect(client, c, port.host_port);
-  TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts));
-  TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))));
-  TEST_CHECK_ERROR(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
-  TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+  if (TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts))) {
+    TEST_CHECK_COND(t, "connection refused", pn_transport_condition(pn_connection_transport(c)));
+    TEST_EVENT_TYPE(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts));
+    sock_close(port.sock);
+    PROACTOR_TEST_FREE(pts);
+  }
+}
 
+static inline const char *event_listener_desc(pn_event_t *e) {
+  return pn_condition_get_description(pn_listener_condition(pn_event_listener(e)));
+}
+
+/* Test that we can control listen/select on ipv6/v4 and listen on both by default */
+static void test_ipv4_ipv6(test_t *t) {
+  proactor_test_t pts[] ={ { open_close_handler }, { common_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+
+  /* Listen on all interfaces for IPv6 only. If this fails, skip IPv6 tests */
+  test_port_t port6 = test_port("[::]");
+  pn_proactor_listen(server, pn_listener(), port6.host_port, 4);
+  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port6.sock);
+  pn_event_t *e = PROACTOR_TEST_GET(pts);
+  bool has_ipv6 = (pn_event_type(e) != PN_LISTENER_CLOSE);
+  if (!has_ipv6) {
+    TEST_LOGF(t, "skip IPv6 tests: %s", event_listener_desc(e));
+  }
+  PROACTOR_TEST_DRAIN(pts);
+
+  /* Listen on all interfaces for IPv4 only. */
+  test_port_t port4 = test_port("0.0.0.0");
+  pn_proactor_listen(server, pn_listener(), port4.host_port, 4);
+  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+  sock_close(port4.sock);
+  e = PROACTOR_TEST_GET(pts);
+  if (pn_event_type(e) == PN_LISTENER_CLOSE) {
+    TEST_ERRORF(t, "listener error: %s",  event_listener_desc(e));
+  }
+  PROACTOR_TEST_DRAIN(pts);
+
+  /* Empty address listens on both IPv4 and IPv6 on all interfaces */
+  test_port_t port = test_port("");
+  pn_proactor_listen(server, pn_listener(), port.host_port, 4);
+  TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
   sock_close(port.sock);
+  e = PROACTOR_TEST_GET(pts);
+  if (pn_event_type(e) == PN_LISTENER_CLOSE) {
+    TEST_ERRORF(t, "listener error: %s",  event_listener_desc(e));
+  }
+  PROACTOR_TEST_DRAIN(pts);
+
+#define EXPECT_CONNECT(TP, HOST) do {                                   \
+    pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
+    pn_event_t *e = TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
+    if (e) TEST_CHECK_NO_COND(t, pn_transport_condition(pn_event_transport(e))); \
+    PROACTOR_TEST_DRAIN(pts);                                           \
+  } while(0)
+
+#define EXPECT_FAIL(TP, HOST) do {                                      \
+    pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \
+    pn_event_t *e = TEST_EVENT_TYPE(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); \
+    if (e) TEST_CHECK_COND(t, "refused", pn_transport_condition(pn_event_transport(e))); \
+    PROACTOR_TEST_DRAIN(pts);                                           \
+  } while(0)
+
+  EXPECT_CONNECT(port4, "127.0.0.1"); /* v4->v4 */
+  EXPECT_CONNECT(port4, "");          /* local->v4*/
+
+  EXPECT_CONNECT(port, "127.0.0.1"); /* v4->all */
+  EXPECT_CONNECT(port, "");          /* local->all */
+
+  if (has_ipv6) {
+    EXPECT_CONNECT(port6, "[::]"); /* v6->v6 */
+    EXPECT_CONNECT(port6, "");     /* local->v6 */
+    EXPECT_CONNECT(port, "[::1]"); /* v6->all */
+
+    EXPECT_FAIL(port6, "127.0.0.1"); /* fail v4->v6 */
+    EXPECT_FAIL(port4, "[::1]");     /* fail v6->v4 */
+  }
+
+  PROACTOR_TEST_FREE(pts);
+}
+
+/* Make sure pn_proactor_free cleans up open sockets */
+static void test_free_cleanup(test_t *t) {
+  proactor_test_t pts[] = { { open_wake_handler }, { common_handler } };
+  PROACTOR_TEST_INIT(pts, t);
+  pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor;
+  test_port_t ports[3] = { test_port(localhost), test_port(localhost), test_port(localhost) };
+  for (int i = 0; i < 3; ++i) {
+    pn_proactor_listen(server, pn_listener(), ports[i].host_port, 2);
+    TEST_EVENT_TYPE(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts));
+    sock_close(ports[i].sock);
+    pn_proactor_connect(client, pn_connection(), ports[i].host_port);
+    pn_proactor_connect(client, pn_connection(), ports[i].host_port);
+  }
   PROACTOR_TEST_FREE(pts);
 }
 
+
 int main(int argc, char **argv) {
   int failed = 0;
   RUN_ARGV_TEST(failed, t, test_inactive(&t));
@@ -288,5 +403,7 @@ int main(int argc, char **argv) {
   RUN_ARGV_TEST(failed, t, test_errors(&t));
   RUN_ARGV_TEST(failed, t, test_client_server(&t));
   RUN_ARGV_TEST(failed, t, test_connection_wake(&t));
+  RUN_ARGV_TEST(failed, t, test_ipv4_ipv6(&t));
+  RUN_ARGV_TEST(failed, t, test_free_cleanup(&t));
   return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ce1b3d1f/proton-c/src/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h
index f70a327..26f3b35 100644
--- a/proton-c/src/tests/test_tools.h
+++ b/proton-c/src/tests/test_tools.h
@@ -38,28 +38,49 @@ typedef struct test_t {
 } test_t;
 
 /* Internal, use macros. Print error message and increase the t->errors count.
-   All output from test marcros goes to stdout not stderr, error messages are normal for a test.
+   All output from test marcros goes to stderr so it interleaves with PN_TRACE logs.
 */
+
 static void test_vlogf_(test_t *t, const char *prefix, const char* expr,
                         const char* file, int line, const char *fmt, va_list ap)
 {
-  printf("%s:%d", file, line);
-  if (prefix && *prefix) printf(": %s", prefix);
-  if (expr && *expr) printf(": %s", expr);
+  fprintf(stderr, "%s:%d", file, line);
+  if (prefix && *prefix) fprintf(stderr, ": %s", prefix);
+  if (expr && *expr) fprintf(stderr, ": %s", expr);
   if (fmt && *fmt) {
-    printf(": ");
-    vprintf(fmt, ap);
+    fprintf(stderr, ": ");
+    vfprintf(stderr, fmt, ap);
   }
-  if (t) printf(" [%s]", t->name);
-  printf("\n");
+  if (t) fprintf(stderr, " [%s]", t->name);
+  fprintf(stderr, "\n");
   fflush(stdout);
 }
 
-static void test_errorf_(test_t *t, const char *prefix, const char* expr,
+static void test_errorf_(test_t *t, const char* expr,
                          const char* file, int line, const char *fmt, ...) {
+  ++t->errors;
+  va_list ap;
+  va_start(ap, fmt);
+  test_vlogf_(t, "error", expr, file, line, fmt, ap);
+  va_end(ap);
+}
+
+static bool test_check_(test_t *t, bool expr, const char *sexpr,
+                        const char *file, int line, const char* fmt, ...) {
+  if (!expr) {
+    ++t->errors;
+    va_list ap;
+    va_start(ap, fmt);
+    test_vlogf_(t, "error: check failed", sexpr, file, line, fmt, ap);
+    va_end(ap);
+  }
+  return expr;
+}
+
+static void test_logf_(test_t *t, const char *prefix, const char* expr,
+                       const char* file, int line, const char *fmt, ...) {
   va_list ap;
   va_start(ap, fmt);
-  ++t->errors;
   test_vlogf_(t, prefix, expr, file, line, fmt, ap);
   va_end(ap);
 }
@@ -87,25 +108,13 @@ static void assert_fail_(const char* expr, const char* file, int line, const cha
   TEST_ASSERTF((expr), "%s", strerror(err))
 
 
-/* Internal, use macros */
-static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) {
-  if (!expr) {
-    ++t->errors;
-    va_list ap;
-    va_start(ap, fmt);
-    test_vlogf_(t, "check failed", sexpr, file, line, fmt, ap);
-    va_end(ap);
-  }
-  return expr;
-}
-
 /* Print a message but don't mark the test as having an error */
 #define TEST_LOGF(TEST, ...) \
   test_logf_((TEST), "info", NULL, __FILE__, __LINE__, __VA_ARGS__)
 
 /* Print an error with printf-style message, increment TEST->errors */
 #define TEST_ERRORF(TEST, ...) \
-  test_errorf_((TEST), "error", NULL, __FILE__, __LINE__, __VA_ARGS__)
+  test_errorf_((TEST), NULL, __FILE__, __LINE__, __VA_ARGS__)
 
 /* If EXPR is false, print and record an error for t  */
 #define TEST_CHECKF(TEST, EXPR, ...) \
@@ -121,6 +130,23 @@ static inline bool test_etype_equal_(test_t *t, int want, int got, const char *f
                      pn_event_type_name((pn_event_type_t)got));
 }
 
+#define TEST_CHECK_COND(T, WANT, COND) do {                             \
+    pn_condition_t *cond = (COND);                                      \
+    if (TEST_CHECKF((T), pn_condition_is_set(cond), "expecting error")) { \
+      const char* description = pn_condition_get_description(cond);     \
+      if (!strstr(description, (WANT))) {                               \
+        TEST_ERRORF((T), "expected '%s' in '%s'", (WANT), description); \
+      }                                                                 \
+    }                                                                   \
+  } while(0)
+
+#define TEST_CHECK_NO_COND(T, COND) do {                                \
+    pn_condition_t *cond = (COND);                                      \
+    if (cond && pn_condition_is_set(cond)) {                            \
+      TEST_ERRORF((T), "unexpected condition: %s", pn_condition_get_description(cond)); \
+    }                                                                   \
+  } while(0)
+
 #define TEST_ETYPE_EQUAL(TEST, WANT, GOT) \
   test_etype_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
 
@@ -131,7 +157,7 @@ static inline pn_event_t *test_event_type_(test_t *t, pn_event_type_t want, pn_e
   if (want != pn_event_type(got)) {
     pn_condition_t *cond = pn_event_condition(got);
     if (cond && pn_condition_is_set(cond)) {
-      test_errorf_(t, NULL, NULL, file, line, "condition: %s:%s",
+      test_errorf_(t, NULL, file, line, "condition: %s:%s",
                    pn_condition_get_name(cond), pn_condition_get_description(cond));
     }
     return NULL;
@@ -146,12 +172,12 @@ static inline pn_event_t *test_event_type_(test_t *t, pn_event_type_t want, pn_e
    FAILED is incremented if the test has errors
 */
 #define RUN_TEST(FAILED, T, EXPR) do {                          \
-    printf("TEST: %s\n", #EXPR);                                \
+    fprintf(stderr, "TEST: %s\n", #EXPR);                                \
     fflush(stdout);                                             \
     test_t T = { #EXPR, 0 };                                    \
     (EXPR);                                                     \
     if (T.errors) {                                             \
-      printf("FAIL: %s (%d errors)\n", #EXPR, T.errors);        \
+      fprintf(stderr, "FAIL: %s (%d errors)\n", #EXPR, T.errors);        \
       ++(FAILED);                                               \
     }                                                           \
   } while(0)
@@ -226,13 +252,19 @@ typedef struct test_port_t {
   char host_port[256];          /* host:port string */
 } test_port_t;
 
+/* Modifies tp->host_port to use host, returns the new tp->host_port */
+static const char *test_port_use_host(test_port_t *tp, const char *host) {
+  snprintf(tp->host_port, sizeof(tp->host_port), "%s:%d", host, tp->port);
+  return tp->host_port;
+}
+
 /* Create a test_port_t  */
 static inline test_port_t test_port(const char* host) {
   test_port_t tp = {0};
   tp.sock = sock_bind0();
   tp.port = sock_port(tp.sock);
   snprintf(tp.str, sizeof(tp.str), "%d", tp.port);
-  snprintf(tp.host_port, sizeof(tp.host_port), "%s:%d", host, tp.port);
+  test_port_use_host(&tp, host);
   return tp;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message