qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [1/2] qpid-proton git commit: PROTON-1403: c proactor library
Date Sat, 11 Feb 2017 02:51:49 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master b9a57e8a7 -> ec70d73dd


PROTON-1403: c proactor library

Move the libuv example proactor into an installed library.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/afacb165
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/afacb165
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/afacb165

Branch: refs/heads/master
Commit: afacb16527e9f231ae76d5e16ca0d9ac7edcff86
Parents: b9a57e8
Author: Alan Conway <aconway@redhat.com>
Authored: Fri Feb 10 21:43:05 2017 -0500
Committer: Alan Conway <aconway@redhat.com>
Committed: Fri Feb 10 21:49:32 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/CMakeLists.txt         |  33 +-
 examples/c/proactor/broker.c               |  44 +-
 examples/c/proactor/libuv_proactor.c       | 873 ------------------------
 examples/c/proactor/test.py                |  14 +-
 examples/c/proactor/thread.h               |  49 ++
 proton-c/CMakeLists.txt                    |  74 +-
 proton-c/include/proton/import_export.h    |   7 +
 proton-c/include/proton/listener.h         |   4 +-
 proton-c/include/proton/proactor.h         |  26 +-
 proton-c/include/proton/types.h            |  17 +-
 proton-c/src/libqpid-proton-proactor.pc.in |  30 +
 proton-c/src/proactor/libuv.c              | 873 ++++++++++++++++++++++++
 12 files changed, 1074 insertions(+), 970 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index f701651..2ed4f94 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -23,21 +23,20 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
 
 add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
 
-find_package(Libuv)
-if (Libuv_FOUND)
-  foreach(name broker send receive)
-    add_executable(libuv_${name} ${name}.c libuv_proactor.c)
-    target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES})
-    set_target_properties(libuv_${name} PROPERTIES
-      COMPILE_DEFINITIONS  "PN_PROACTOR_INCLUDE=\"libuv_proactor.h\"")
-  endforeach()
+# Add a test with the correct environment to find test executables and valgrind.
+if(WIN32)
+  set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
+else(WIN32)
+  set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+  set(PLATFORM_LIBS pthread)
+endif(WIN32)
+
+foreach(name broker send receive)
+  add_executable(proactor-${name} ${name}.c)
+  target_link_libraries(proactor-${name} ${Proton_LIBRARIES} ${PLATFORM_LIBS})
+  set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name})
+endforeach()
+
+set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+add_test(c-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v)
 
-  # Add a test with the correct environment to find test executables and valgrind.
-  if(WIN32)
-    set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>")
-  else(WIN32)
-    set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
-  endif(WIN32)
-  set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-  add_test(c-proactor-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py)
-endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index ca52336..d6261f4 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+#include "thread.h"
+
 #include <proton/connection_driver.h>
 #include <proton/proactor.h>
 #include <proton/engine.h>
@@ -29,11 +31,6 @@
 #include <string.h>
 #include <unistd.h>
 
-/* TODO aconway 2016-10-14: this example does not require libuv IO,
-   it uses uv.h only for portable mutex and thread functions.
-*/
-#include <uv.h>
-
 bool enable_debug = false;
 
 void debug(const char* fmt, ...) {
@@ -91,7 +88,7 @@ void pcheck(int err, const char* s) {
 
 /* Simple thread-safe queue implementation */
 typedef struct queue_t {
-  uv_mutex_t lock;
+  pthread_mutex_t lock;
   char* name;
   VEC(pn_rwbytes_t) messages;   /* Messages on the queue_t */
   VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
@@ -101,7 +98,7 @@ typedef struct queue_t {
 
 static void queue_init(queue_t *q, const char* name, queue_t *next) {
   debug("created queue %s", name);
-  uv_mutex_init(&q->lock);
+  pthread_mutex_init(&q->lock, NULL);
   q->name = strdup(name);
   VEC_INIT(q->messages);
   VEC_INIT(q->waiting);
@@ -110,7 +107,7 @@ static void queue_init(queue_t *q, const char* name, queue_t *next) {
 }
 
 static void queue_destroy(queue_t *q) {
-  uv_mutex_destroy(&q->lock);
+  pthread_mutex_destroy(&q->lock);
   free(q->name);
   for (size_t i = 0; i < q->messages.len; ++i)
     free(q->messages.data[i].start);
@@ -126,7 +123,7 @@ static void queue_destroy(queue_t *q) {
 static void queue_send(queue_t *q, pn_link_t *s) {
   pn_rwbytes_t m = { 0 };
   size_t tag = 0;
-  uv_mutex_lock(&q->lock);
+  pthread_mutex_lock(&q->lock);
   if (q->messages.len == 0) { /* Empty, record connection as waiting */
     debug("queue is empty %s", q->name);
     /* Record connection for wake-up if not already on the list. */
@@ -143,7 +140,7 @@ static void queue_send(queue_t *q, pn_link_t *s) {
     VEC_POP(q->messages);
     tag = ++q->sent;
   }
-  uv_mutex_unlock(&q->lock);
+  pthread_mutex_unlock(&q->lock);
   if (m.start) {
     pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
     pn_link_send(s, m.start, m.size);
@@ -172,7 +169,7 @@ bool pn_connection_get_check_queues(pn_connection_t *c) {
 */
 static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
   debug("received to queue %s", q->name);
-  uv_mutex_lock(&q->lock);
+  pthread_mutex_lock(&q->lock);
   VEC_PUSH(q->messages, m);
   if (q->messages.len == 1) { /* Was empty, notify waiting connections */
     for (size_t i = 0; i < q->waiting.len; ++i) {
@@ -182,18 +179,18 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
     }
     q->waiting.len = 0;
   }
-  uv_mutex_unlock(&q->lock);
+  pthread_mutex_unlock(&q->lock);
 }
 
 /* Thread safe set of queues */
 typedef struct queues_t {
-  uv_mutex_t lock;
+  pthread_mutex_t lock;
   queue_t *queues;
   size_t sent;
 } queues_t;
 
 void queues_init(queues_t *qs) {
-  uv_mutex_init(&qs->lock);
+  pthread_mutex_init(&qs->lock, NULL);
   qs->queues = NULL;
 }
 
@@ -202,12 +199,12 @@ void queues_destroy(queues_t *qs) {
     queue_destroy(q);
     free(q);
   }
-  uv_mutex_destroy(&qs->lock);
+  pthread_mutex_destroy(&qs->lock);
 }
 
 /** Get or create the named queue. */
 queue_t* queues_get(queues_t *qs, const char* name) {
-  uv_mutex_lock(&qs->lock);
+  pthread_mutex_lock(&qs->lock);
   queue_t *q;
   for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
     ;
@@ -216,7 +213,7 @@ queue_t* queues_get(queues_t *qs, const char* name) {
     queue_init(q, name, qs->queues);
     qs->queues = q;
   }
-  uv_mutex_unlock(&qs->lock);
+  pthread_mutex_unlock(&qs->lock);
   return q;
 }
 
@@ -255,7 +252,7 @@ static void link_send(broker_t *b, pn_link_t *s) {
 }
 
 static void queue_unsub(queue_t *q, pn_connection_t *c) {
-  uv_mutex_lock(&q->lock);
+  pthread_mutex_lock(&q->lock);
   for (size_t i = 0; i < q->waiting.len; ++i) {
     if (q->waiting.data[i] == c){
       q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
@@ -263,7 +260,7 @@ static void queue_unsub(queue_t *q, pn_connection_t *c) {
       break;
     }
   }
-  uv_mutex_unlock(&q->lock);
+  pthread_mutex_unlock(&q->lock);
 }
 
 /* Unsubscribe from the queue of interest to this link. */
@@ -416,7 +413,7 @@ static void handle(broker_t* b, pn_event_t* e) {
   }
 }
 
-static void broker_thread(void *void_broker) {
+static void* broker_thread(void *void_broker) {
   broker_t *b = (broker_t*)void_broker;
   do {
     pn_event_batch_t *events = pn_proactor_wait(b->proactor);
@@ -426,6 +423,7 @@ static void broker_thread(void *void_broker) {
     }
     pn_proactor_done(b->proactor, events);
   } while(!b->finished);
+  return NULL;
 }
 
 static void usage(const char *arg0) {
@@ -474,13 +472,13 @@ int main(int argc, char **argv) {
     exit(1);
   }
   /* Start n-1 threads and use main thread */
-  uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads);
+  pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
   for (size_t i = 0; i < b.threads-1; ++i) {
-    check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
+    check(pthread_create(&threads[i], NULL, broker_thread, &b), "pthread_create");
   }
   broker_thread(&b);            /* Use the main thread too. */
   for (size_t i = 0; i < b.threads-1; ++i) {
-    check(uv_thread_join(&threads[i]), "pthread_join");
+    check(pthread_join(threads[i], NULL), "pthread_join");
   }
   pn_proactor_free(b.proactor);
   free(threads);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/libuv_proactor.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c
deleted file mode 100644
index 42bbfab..0000000
--- a/examples/c/proactor/libuv_proactor.c
+++ /dev/null
@@ -1,873 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <uv.h>
-
-#include <proton/condition.h>
-#include <proton/connection_driver.h>
-#include <proton/engine.h>
-#include <proton/message.h>
-#include <proton/object.h>
-#include <proton/proactor.h>
-#include <proton/transport.h>
-#include <proton/url.h>
-
-#include <assert.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/*
-  libuv loop functions are thread unsafe. The only exception is uv_async_send()
-  which is a thread safe "wakeup" that can wake the uv_loop from another thread.
-
-  To provide concurrency the proactor uses a "leader-worker-follower" model,
-  threads take turns at the roles:
-
-  - a single "leader" calls libuv functions and runs the uv_loop in short bursts
-    to generate work. When there is work available it gives up leadership and
-    becomes a "worker"
-
-  - "workers" handle events concurrently for distinct connections/listeners
-    They do as much work as they can get, when none is left they become "followers"
-
-  - "followers" wait for the leader to generate work and become workers.
-    When the leader itself becomes a worker, one of the followers takes over.
-
-  This model is symmetric: any thread can take on any role based on run-time
-  requirements. It also allows the IO and non-IO work associated with an IO
-  wake-up to be processed in a single thread with no context switches.
-
-  Function naming:
-  - on_ - called in leader thread via uv_run().
-  - leader_ - called in leader thread, while processing the leader_q.
-  - owner_ - called in owning thread, leader or worker but not concurrently.
-
-  Note on_ and leader_ functions can call each other, the prefix indicates the
-  path they are most often called on.
-*/
-
-const char *COND_NAME = "proactor";
-const char *AMQP_PORT = "5672";
-const char *AMQP_PORT_NAME = "amqp";
-const char *AMQPS_PORT = "5671";
-const char *AMQPS_PORT_NAME = "amqps";
-
-PN_HANDLE(PN_PROACTOR)
-
-/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
-   Class definitions are for identification as pn_event_t context only.
-*/
-PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
-PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
-
-/* common to connection and listener */
-typedef struct psocket_t {
-  /* Immutable */
-  pn_proactor_t *proactor;
-
-  /* Protected by proactor.lock */
-  struct psocket_t* next;
-  void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
-
-  /* Only used by leader */
-  uv_tcp_t tcp;
-  void (*action)(struct psocket_t*); /* deferred action for leader */
-  bool is_conn:1;
-  char host[NI_MAXHOST];
-  char port[NI_MAXSERV];
-} psocket_t;
-
-/* Special value for psocket.next pointer when socket is not on any any list. */
-psocket_t UNLISTED;
-
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
-  ps->proactor = p;
-  ps->next = &UNLISTED;
-  ps->is_conn = is_conn;
-  ps->tcp.data = ps;
-
-  /* For platforms that don't know about "amqp" and "amqps" service names. */
-  if (strcmp(port, AMQP_PORT_NAME) == 0)
-    port = AMQP_PORT;
-  else if (strcmp(port, AMQPS_PORT_NAME) == 0)
-    port = AMQPS_PORT;
-  /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
-  strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
-  strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
-}
-
-/* Turn "\001" back to NULL */
-static inline const char* fixstr(const char* str) {
-  return str[0] == '\001' ? NULL : str;
-}
-
-typedef struct pconnection_t {
-  psocket_t psocket;
-
-  /* Only used by owner thread */
-  pn_connection_driver_t driver;
-
-  /* Only used by leader */
-  uv_connect_t connect;
-  uv_timer_t timer;
-  uv_write_t write;
-  uv_shutdown_t shutdown;
-  size_t writing;
-  bool reading:1;
-  bool server:1;                /* accept, not connect */
-} pconnection_t;
-
-struct pn_listener_t {
-  psocket_t psocket;
-
-  /* Only used by owner thread */
-  pconnection_t *accepting;     /* accept in progress */
-  pn_condition_t *condition;
-  pn_collector_t *collector;
-  pn_event_batch_t batch;
-  pn_record_t *attachments;
-  void *context;
-  size_t backlog;
-};
-
-
-typedef struct queue { psocket_t *front, *back; } queue;
-
-struct pn_proactor_t {
-  /* Leader thread  */
-  uv_cond_t cond;
-  uv_loop_t loop;
-  uv_async_t async;
-  uv_timer_t timer;
-
-  /* Owner thread: proactor collector and batch can belong to leader or a worker */
-  pn_collector_t *collector;
-  pn_event_batch_t batch;
-
-  /* Protected by lock */
-  uv_mutex_t lock;
-  queue start_q;
-  queue worker_q;
-  queue leader_q;
-  size_t interrupt;             /* pending interrupts */
-  pn_millis_t timeout;
-  size_t count;                 /* psocket count */
-  bool inactive:1;
-  bool timeout_request:1;
-  bool timeout_elapsed:1;
-  bool has_leader:1;
-  bool batch_working:1;          /* batch belongs to a worker.  */
-};
-
-static bool push_lh(queue *q, psocket_t *ps) {
-  if (ps->next != &UNLISTED)  /* Don't move if already listed. */
-    return false;
-  ps->next = NULL;
-  if (!q->front) {
-    q->front = q->back = ps;
-  } else {
-    q->back->next = ps;
-    q->back =  ps;
-  }
-  return true;
-}
-
-static psocket_t* pop_lh(queue *q) {
-  psocket_t *ps = q->front;
-  if (ps) {
-    q->front = ps->next;
-    ps->next = &UNLISTED;
-  }
-  return ps;
-}
-
-static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
-  return ps->is_conn ? (pconnection_t*)ps : NULL;
-}
-
-static inline pn_listener_t *as_listener(psocket_t* ps) {
-  return ps->is_conn ? NULL: (pn_listener_t*)ps;
-}
-
-/* Put ps on the leader queue for processing. Thread safe. */
-static void to_leader_lh(psocket_t *ps) {
-  push_lh(&ps->proactor->leader_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-}
-
-static void to_leader(psocket_t *ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Detach from IO and put ps on the worker queue */
-static void leader_to_worker(psocket_t *ps) {
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    /* Don't detach if there are no events yet. */
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      if (pc->writing) {
-        pc->writing  = 0;
-        uv_cancel((uv_req_t*)&pc->write);
-      }
-      if (pc->reading) {
-        pc->reading = false;
-        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
-      }
-      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-        uv_timer_stop(&pc->timer);
-      }
-    }
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    uv_read_stop((uv_stream_t*)&l->psocket.tcp);
-  }
-  uv_mutex_lock(&ps->proactor->lock);
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Set a deferred action for leader, if not already set. */
-static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (!ps->action) {
-    ps->action = action;
-  }
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-/* Owner thread send to worker thread. Set deferred action if not already set. */
-static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  if (!ps->action) {
-    ps->action = action;
-  }
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-
-/* Re-queue for further work */
-static void worker_requeue(psocket_t* ps) {
-  uv_mutex_lock(&ps->proactor->lock);
-  push_lh(&ps->proactor->worker_q, ps);
-  uv_async_send(&ps->proactor->async); /* Wake leader */
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
-  pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
-  if (!pc) return NULL;
-  if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
-    return NULL;
-  }
-  psocket_init(&pc->psocket, p,  true, host, port);
-  if (server) {
-    pn_transport_set_server(pc->driver.transport);
-  }
-  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
-  pn_record_def(r, PN_PROACTOR, PN_VOID);
-  pn_record_set(r, PN_PROACTOR, pc);
-  return pc;
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
-
-static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
-  return (batch->next_event == proactor_batch_next) ?
-    (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
-}
-
-static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
-  return (batch->next_event == listener_batch_next) ?
-    (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
-}
-
-static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
-  pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
-  return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
-}
-
-static void leader_count(pn_proactor_t *p, int change) {
-  uv_mutex_lock(&p->lock);
-  p->count += change;
-  p->inactive = (p->count == 0);
-  uv_mutex_unlock(&p->lock);
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      leader_to_worker(&pc->psocket);         /* Return to worker */
-    } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
-      /* All UV requests are finished */
-      pn_connection_driver_destroy(&pc->driver);
-      leader_count(pc->psocket.proactor, -1);
-      free(pc);
-    }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_listener_maybe_free(pn_listener_t *l) {
-    if (pn_collector_peek(l->collector)) {
-      leader_to_worker(&l->psocket);         /* Return to worker */
-    } else if (!l->psocket.tcp.data) {
-      pn_condition_free(l->condition);
-      leader_count(l->psocket.proactor, -1);
-      free(l);
-    }
-}
-
-/* Free if there are no uv callbacks pending and no events */
-static void leader_maybe_free(psocket_t *ps) {
-  if (ps->is_conn) {
-    leader_pconnection_t_maybe_free(as_pconnection_t(ps));
-  } else {
-    leader_listener_maybe_free(as_listener(ps));
-  }
-}
-
-static void on_close(uv_handle_t *h) {
-  psocket_t *ps = (psocket_t*)h->data;
-  h->data = NULL;               /* Mark closed */
-  leader_maybe_free(ps);
-}
-
-static void on_shutdown(uv_shutdown_t *shutdown, int err) {
-  psocket_t *ps = (psocket_t*)shutdown->data;
-  shutdown->data = NULL;        /* Mark closed */
-  leader_maybe_free(ps);
-}
-
-static inline void leader_close(psocket_t *ps) {
-  if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
-    uv_close((uv_handle_t*)&ps->tcp, on_close);
-  }
-  pconnection_t *pc = as_pconnection_t(ps);
-  if (pc) {
-    pn_connection_driver_close(&pc->driver);
-    if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
-      uv_timer_stop(&pc->timer);
-      uv_close((uv_handle_t*)&pc->timer, on_close);
-    }
-  }
-  leader_maybe_free(ps);
-}
-
-static pconnection_t *get_pconnection_t(pn_connection_t* c) {
-  if (!c) return NULL;
-  pn_record_t *r = pn_connection_attachments(c);
-  return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
-}
-
-static void leader_error(psocket_t *ps, int err, const char* what) {
-  if (ps->is_conn) {
-    pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
-    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
-    pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
-                                what, fixstr(ps->host), fixstr(ps->port),
-                                uv_strerror(err));
-    pn_connection_driver_close(driver);
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
-                        what, fixstr(ps->host), fixstr(ps->port),
-                        uv_strerror(err));
-    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
-  }
-  leader_to_worker(ps);               /* Worker to handle the error */
-}
-
-/* uv-initialization */
-static int leader_init(psocket_t *ps) {
-  leader_count(ps->proactor, +1);
-  int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
-  if (!err) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    if (pc) {
-      pc->connect.data = ps;
-      int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
-      if (!err) {
-        pc->timer.data = pc;
-      }
-    }
-  }
-  if (err) {
-    leader_error(ps, err, "initialization");
-  }
-  return err;
-}
-
-/* Common logic for on_connect and on_accept */
-static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
-  if (!err) {
-    leader_to_worker(&pc->psocket);
-  } else {
-    leader_error(&pc->psocket, err, what);
-  }
-}
-
-static void on_connect(uv_connect_t *connect, int err) {
-  leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
-}
-
-static void on_accept(uv_stream_t* server, int err) {
-  pn_listener_t *l = (pn_listener_t*) server->data;
-  if (err) {
-    leader_error(&l->psocket, err, "on accept");
-  }
-  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
-  leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
-}
-
-static void leader_accept(psocket_t *ps) {
-  pn_listener_t * l = as_listener(ps);
-  pconnection_t *pc = l->accepting;
-  l->accepting = NULL;
-  if (pc) {
-    int err = leader_init(&pc->psocket);
-    if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
-    leader_connect_accept(pc, err, "on accept");
-  }
-}
-
-static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
-  int err = leader_init(ps);
-  struct addrinfo hints = { 0 };
-  if (server) hints.ai_flags = AI_PASSIVE;
-  if (!err) {
-    err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
-  }
-  return err;
-}
-
-static void leader_connect(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
-  uv_getaddrinfo_t info;
-  int err = leader_resolve(ps, &info, false);
-  if (!err) {
-    err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
-    uv_freeaddrinfo(info.addrinfo);
-  }
-  if (err) {
-    leader_error(ps, err, "connect to");
-  }
-}
-
-static void leader_listen(psocket_t *ps) {
-  pn_listener_t *l = as_listener(ps);
-   uv_getaddrinfo_t info;
-  int err = leader_resolve(ps, &info, true);
-  if (!err) {
-    err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
-    uv_freeaddrinfo(info.addrinfo);
-  }
-  if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
-  if (err) {
-    leader_error(ps, err, "listen on ");
-  }
-}
-
-static void on_tick(uv_timer_t *timer) {
-  pconnection_t *pc = (pconnection_t*)timer->data;
-  pn_transport_t *t = pc->driver.transport;
-  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
-    uv_timer_stop(&pc->timer);
-    uint64_t now = uv_now(pc->timer.loop);
-    uint64_t next = pn_transport_tick(t, now);
-    if (next) {
-      uv_timer_start(&pc->timer, on_tick, next - now, 0);
-    }
-  }
-}
-
-static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
-  pconnection_t *pc = (pconnection_t*)stream->data;
-  if (nread >= 0) {
-    pn_connection_driver_read_done(&pc->driver, nread);
-    on_tick(&pc->timer);         /* check for tick changes. */
-    leader_to_worker(&pc->psocket);
-    /* Reading continues automatically until stopped. */
-  } else if (nread == UV_EOF) { /* hangup */
-    pn_connection_driver_read_close(&pc->driver);
-    leader_maybe_free(&pc->psocket);
-  } else {
-    leader_error(&pc->psocket, nread, "on read from");
-  }
-}
-
-static void on_write(uv_write_t* write, int err) {
-  pconnection_t *pc = (pconnection_t*)write->data;
-  write->data = NULL;
-  if (err == 0) {
-    pn_connection_driver_write_done(&pc->driver, pc->writing);
-    leader_to_worker(&pc->psocket);
-  } else if (err == UV_ECANCELED) {
-    leader_maybe_free(&pc->psocket);
-  } else {
-    leader_error(&pc->psocket, err, "on write to");
-  }
-  pc->writing = 0;              /* Need to send a new write request */
-}
-
-static void on_timeout(uv_timer_t *timer) {
-  pn_proactor_t *p = (pn_proactor_t*)timer->data;
-  uv_mutex_lock(&p->lock);
-  p->timeout_elapsed = true;
-  uv_mutex_unlock(&p->lock);
-}
-
-// Read buffer allocation function for uv, just returns the transports read buffer.
-static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
-  pconnection_t *pc = (pconnection_t*)stream->data;
-  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-  *buf = uv_buf_init(rbuf.start, rbuf.size);
-}
-
-static void leader_rewatch(psocket_t *ps) {
-  int err = 0;
-  if (ps->is_conn) {
-    pconnection_t *pc = as_pconnection_t(ps);
-    if (pc->timer.data) {         /* uv-initialized */
-      on_tick(&pc->timer);        /* Re-enable ticks if required */
-    }
-    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
-    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-
-    /* Ticks and checking buffers can generate events, process before proceeding */
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      leader_to_worker(ps);
-    } else {                      /* Re-watch for IO */
-      if (wbuf.size > 0 && !pc->writing) {
-        pc->writing = wbuf.size;
-        uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
-        pc->write.data = ps;
-        uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
-      } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
-        pc->shutdown.data = ps;
-        uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
-      }
-      if (rbuf.size > 0 && !pc->reading) {
-        pc->reading = true;
-        err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
-      }
-    }
-  } else {
-    pn_listener_t *l = as_listener(ps);
-    err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
-  }
-  if (err) {
-    leader_error(ps, err, "rewatch");
-  }
-}
-
-/* Set the event in the proactor's batch  */
-static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
-  pn_collector_put(p->collector, pn_proactor__class(), p, t);
-  p->batch_working = true;
-  return &p->batch;
-}
-
-/* Return the next event batch or 0 if no events are ready */
-static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
-  if (!p->batch_working) {       /* Can generate proactor events */
-    if (p->inactive) {
-      p->inactive = false;
-      return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
-    }
-    if (p->interrupt > 0) {
-      --p->interrupt;
-      return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
-    }
-    if (p->timeout_elapsed) {
-      p->timeout_elapsed = false;
-      return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
-    }
-  }
-  for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
-    if (ps->is_conn) {
-      pconnection_t *pc = as_pconnection_t(ps);
-      return &pc->driver.batch;
-    } else {                    /* Listener */
-      pn_listener_t *l = as_listener(ps);
-      return &l->batch;
-    }
-    to_leader(ps);      /* No event, back to leader */
-  }
-  return 0;
-}
-
-/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
-static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
-  uv_mutex_lock(&ps->proactor->lock);
-  ps->wakeup = action;
-  to_leader_lh(ps);
-  uv_mutex_unlock(&ps->proactor->lock);
-}
-
-pn_listener_t *pn_event_listener(pn_event_t *e) {
-  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
-}
-
-pn_proactor_t *pn_event_proactor(pn_event_t *e) {
-  if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
-  pn_listener_t *l = pn_event_listener(e);
-  if (l) return l->psocket.proactor;
-  pn_connection_t *c = pn_event_connection(e);
-  if (c) return pn_connection_proactor(pn_event_connection(e));
-  return NULL;
-}
-
-void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
-  pconnection_t *pc = batch_pconnection(batch);
-  if (pc) {
-    if (pn_connection_driver_has_event(&pc->driver)) {
-      /* Process all events before going back to IO. */
-      worker_requeue(&pc->psocket);
-    } else if (pn_connection_driver_finished(&pc->driver)) {
-      owner_to_leader(&pc->psocket, leader_close);
-    } else {
-      owner_to_leader(&pc->psocket, leader_rewatch);
-    }
-    return;
-  }
-  pn_listener_t *l = batch_listener(batch);
-  if (l) {
-    owner_to_leader(&l->psocket, leader_rewatch);
-    return;
-  }
-  pn_proactor_t *bp = batch_proactor(batch);
-  if (bp == p) {
-    uv_mutex_lock(&p->lock);
-    p->batch_working = false;
-    uv_async_send(&p->async); /* Wake leader */
-    uv_mutex_unlock(&p->lock);
-    return;
-  }
-}
-
-/* Run follower/leader loop till we can return an event and be a worker */
-pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
-  uv_mutex_lock(&p->lock);
-  /* Try to grab work immediately. */
-  pn_event_batch_t *batch = get_batch_lh(p);
-  if (batch == NULL) {
-    /* No work available, follow the leader */
-    while (p->has_leader) {
-      uv_cond_wait(&p->cond, &p->lock);
-    }
-    /* Lead till there is work to do. */
-    p->has_leader = true;
-    while (batch == NULL) {
-      if (p->timeout_request) {
-        p->timeout_request = false;
-        if (p->timeout) {
-          uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
-        } else {
-          uv_timer_stop(&p->timer);
-        }
-      }
-      for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
-        void (*action)(psocket_t*) = ps->action;
-        void (*wakeup)(psocket_t*) = ps->wakeup;
-        ps->action = NULL;
-        ps->wakeup = NULL;
-        if (action || wakeup) {
-          uv_mutex_unlock(&p->lock);
-          if (action) action(ps);
-          if (wakeup) wakeup(ps);
-          uv_mutex_lock(&p->lock);
-        }
-      }
-      batch = get_batch_lh(p);
-      if (batch == NULL) {
-        uv_mutex_unlock(&p->lock);
-        uv_run(&p->loop, UV_RUN_ONCE);
-        uv_mutex_lock(&p->lock);
-      }
-    }
-    /* Signal the next leader and return to work */
-    p->has_leader = false;
-    uv_cond_signal(&p->cond);
-  }
-  uv_mutex_unlock(&p->lock);
-  return batch;
-}
-
-void pn_proactor_interrupt(pn_proactor_t *p) {
-  uv_mutex_lock(&p->lock);
-  ++p->interrupt;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
-  uv_mutex_unlock(&p->lock);
-}
-
-void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
-  uv_mutex_lock(&p->lock);
-  p->timeout = t;
-  p->timeout_request = true;
-  uv_async_send(&p->async);   /* Interrupt the UV loop */
-  uv_mutex_unlock(&p->lock);
-}
-
-int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
-  pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
-  if (!pc) {
-    return PN_OUT_OF_MEMORY;
-  }
-  /* Process PN_CONNECTION_INIT before binding */
-  owner_to_worker(&pc->psocket, leader_connect);
-  return 0;
-}
-
-int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
-{
-  psocket_init(&l->psocket, p, false, host, port);
-  l->backlog = backlog;
-  owner_to_leader(&l->psocket, leader_listen);
-  return 0;
-}
-
-pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
-  pconnection_t *pc = get_pconnection_t(c);
-  return pc ? pc->psocket.proactor : NULL;
-}
-
-void leader_wake_connection(psocket_t *ps) {
-  pconnection_t *pc = as_pconnection_t(ps);
-  pn_connection_t *c = pc->driver.connection;
-  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
-  leader_to_worker(ps);
-}
-
-void pn_connection_wake(pn_connection_t* c) {
-  wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
-}
-
-pn_proactor_t *pn_proactor() {
-  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
-  p->collector = pn_collector();
-  p->batch.next_event = &proactor_batch_next;
-  if (!p->collector) return NULL;
-  uv_loop_init(&p->loop);
-  uv_mutex_init(&p->lock);
-  uv_cond_init(&p->cond);
-  uv_async_init(&p->loop, &p->async, NULL);
-  uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
-  p->timer.data = p;
-  return p;
-}
-
-static void on_stopping(uv_handle_t* h, void* v) {
-  uv_close(h, NULL);           /* Close this handle */
-  if (!uv_loop_alive(h->loop)) /* Everything closed */
-    uv_stop(h->loop);        /* Stop the loop, pn_proactor_destroy() can return */
-}
-
-void pn_proactor_free(pn_proactor_t *p) {
-  uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
-  uv_run(&p->loop, UV_RUN_DEFAULT);     /* Run till stop, all handles closed */
-  uv_loop_close(&p->loop);
-  uv_mutex_destroy(&p->lock);
-  uv_cond_destroy(&p->cond);
-  pn_collector_free(p->collector);
-  free(p);
-}
-
-static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
-  pn_listener_t *l = batch_listener(batch);
-  pn_event_t *handled = pn_collector_prev(l->collector);
-  if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
-    owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
-  }
-  return pn_collector_next(l->collector);
-}
-
-static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
-  return pn_collector_next(batch_proactor(batch)->collector);
-}
-
-static void pn_listener_free(pn_listener_t *l) {
-  if (l) {
-    if (!l->collector) pn_collector_free(l->collector);
-    if (!l->condition) pn_condition_free(l->condition);
-    if (!l->attachments) pn_free(l->attachments);
-    free(l);
-  }
-}
-
-pn_listener_t *pn_listener() {
-  pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
-  if (l) {
-    l->batch.next_event = listener_batch_next;
-    l->collector = pn_collector();
-    l->condition = pn_condition();
-    l->attachments = pn_record();
-    if (!l->condition || !l->collector || !l->attachments) {
-      pn_listener_free(l);
-      return NULL;
-    }
-  }
-  return l;
-}
-
-void pn_listener_close(pn_listener_t* l) {
-  wakeup(&l->psocket, leader_close);
-}
-
-pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
-  return l ? l->psocket.proactor : NULL;
-}
-
-pn_condition_t* pn_listener_condition(pn_listener_t* l) {
-  return l->condition;
-}
-
-void *pn_listener_get_context(pn_listener_t *l) {
-  return l->context;
-}
-
-void pn_listener_set_context(pn_listener_t *l, void *context) {
-  l->context = context;
-}
-
-pn_record_t *pn_listener_attachments(pn_listener_t *l) {
-  return l->attachments;
-}
-
-int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
-  if (l->accepting) {
-    return PN_STATE_ERR;        /* Only one at a time */
-  }
-  l->accepting = new_pconnection_t(
-      l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
-  if (!l->accepting) {
-    return UV_ENOMEM;
-  }
-  owner_to_leader(&l->psocket, leader_accept);
-  return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index a86425d..29aa327 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -32,27 +32,27 @@ def receive_expect(n):
     return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
 
 class CExampleTest(BrokerTestCase):
-    broker_exe = ["libuv_broker"]
+    broker_exe = ["broker"]
 
     def test_send_receive(self):
         """Send first then receive"""
-        s = self.proc(["libuv_send", "-a", self.addr])
+        s = self.proc(["send", "-a", self.addr])
         self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
-        r = self.proc(["libuv_receive", "-a", self.addr])
+        r = self.proc(["receive", "-a", self.addr])
         self.assertEqual(receive_expect(100), r.wait_out())
 
     def test_receive_send(self):
         """Start receiving  first, then send."""
-        r = self.proc(["libuv_receive", "-a", self.addr]);
-        s = self.proc(["libuv_send", "-a", self.addr]);
+        r = self.proc(["receive", "-a", self.addr]);
+        s = self.proc(["send", "-a", self.addr]);
         self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
         self.assertEqual(receive_expect(100), r.wait_out())
 
     def test_timed_send(self):
         """Send with timed delay"""
-        s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"])
+        s = self.proc(["send", "-a", self.addr, "-d100", "-m3"])
         self.assertEqual("3 messages sent and acknowledged\n", s.wait_out())
-        r = self.proc(["libuv_receive", "-a", self.addr, "-m3"])
+        r = self.proc(["receive", "-a", self.addr, "-m3"])
         self.assertEqual(receive_expect(3), r.wait_out())
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/thread.h
----------------------------------------------------------------------
diff --git a/examples/c/proactor/thread.h b/examples/c/proactor/thread.h
new file mode 100644
index 0000000..3b9f19e
--- /dev/null
+++ b/examples/c/proactor/thread.h
@@ -0,0 +1,49 @@
+#ifndef _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
+#define _PROTON_EXAMPLES_C_PROACTOR_THREADS_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */
+
+#ifdef _WIN32
+
+#include <windows.h>
+#include <time.h>
+#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */
+#include <process.h>
+#include <windows.h>
+
+#define pthread_function DWORD WINAPI
+#define pthread_function_return DWORD
+#define pthread_t HANDLE
+#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL)
+#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread))
+#define pthread_mutex_T HANDLE
+#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL))
+#define pthread_mutex_destroy(pobject) CloseHandle(*pobject)
+#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE)
+#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject)
+
+#else
+
+#include <pthread.h>
+
+#endif
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 8edb661..e5552c5 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -105,6 +105,13 @@ else(PN_WINAPI)
   set (pn_selector_impl src/reactor/io/posix/selector.c)
 endif(PN_WINAPI)
 
+# Select proactor impl
+find_package(Libuv)
+if (Libuv_FOUND)
+  set (qpid-proton-proactor src/proactor/libuv.c)
+  set (PROACTOR_LIBS ${Libuv_LIBRARIES})
+endif()
+
 # Link in SASL if present
 if (SASL_IMPL STREQUAL cyrus)
   set(pn_sasl_impl src/sasl/sasl.c src/sasl/cyrus_sasl.c)
@@ -116,7 +123,7 @@ endif ()
 
 # Set Compiler extra flags for Solaris when using SunStudio
 if(CMAKE_CXX_COMPILER_ID STREQUAL "SunPro" )
-    set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mt" )
+  set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mt" )
 endif()
 
 if(CMAKE_C_COMPILER_ID STREQUAL "SunPro" )
@@ -327,6 +334,7 @@ set (qpid-proton-platform-all
   src/reactor/io/windows/selector.c
   src/reactor/io/posix/io.c
   src/reactor/io/posix/selector.c
+  src/proactor/libuv.c
   )
 
 # platform specific library build:
@@ -379,7 +387,7 @@ set (qpid-proton-core
   src/core/autodetect.c
   src/core/transport.c
   src/core/message.c
-  )
+)
 
 set (qpid-proton-include-generated
   ${CMAKE_CURRENT_BINARY_DIR}/src/encodings.h
@@ -455,6 +463,7 @@ set (qpid-proton-include
   include/proton/log.h
   include/proton/message.h
   include/proton/object.h
+  include/proton/proactor.h
   include/proton/sasl.h
   include/proton/session.h
   include/proton/ssl.h
@@ -495,9 +504,15 @@ set_source_files_properties (
   COMPILE_DEFINITIONS "${PLATFORM_DEFINITIONS}"
   )
 
+set_source_files_properties (${qpid-proton-proactor} PROPERTIES
+  # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99
+  COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${LTO} "
+  )
+
 if (BUILD_WITH_CXX)
   set_source_files_properties (
     ${qpid-proton-core}
+    ${qpid-proton-proactor}
     ${qpid-proton-layers}
     ${qpid-proton-extra}
     ${qpid-proton-platform}
@@ -526,6 +541,12 @@ set_target_properties (
   LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
   )
 
+add_library (
+  qpid-proton-proactor SHARED
+  ${qpid-proton-proactor}
+  )
+target_link_libraries (qpid-proton-proactor qpid-proton-core ${PROACTOR_LIBS})
+
 add_library(
   qpid-proton SHARED
   # Proton Core
@@ -534,7 +555,8 @@ add_library(
   ${qpid-proton-platform}
   ${qpid-proton-include}
   ${qpid-proton-include-generated}
-
+  # Proactor
+  ${qpid-proton-proactor}
   # Proton Reactor/Messenger
   ${qpid-proton-extra}
   ${qpid-proton-platform-io}
@@ -550,7 +572,7 @@ if (MSVC)
   add_dependencies(qpid-proton qpid-proton-core)
 endif (MSVC)
 
-target_link_libraries (qpid-proton ${UUID_LIB} ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS})
+target_link_libraries (qpid-proton ${UUID_LIB} ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS} ${PROACTOR_LIBS})
 
 set_target_properties (
   qpid-proton
@@ -586,32 +608,26 @@ install (FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/proton)
 install (FILES  ${CMAKE_CURRENT_BINARY_DIR}/include/proton/version.h
          DESTINATION ${INCLUDE_INSTALL_DIR}/proton)
 
-# Pkg config file
-configure_file(
-  ${CMAKE_CURRENT_SOURCE_DIR}/src/libqpid-proton.pc.in
-  ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc @ONLY)
-install (FILES
-  ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc
-  DESTINATION ${LIB_INSTALL_DIR}/pkgconfig)
-configure_file(
-  ${CMAKE_CURRENT_SOURCE_DIR}/src/libqpid-proton-core.pc.in
-  ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-core.pc @ONLY)
-install (FILES
-  ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-core.pc
-  DESTINATION ${LIB_INSTALL_DIR}/pkgconfig)
-
+# Set ${VAR}/${VAR}DEBUG variables, configure and install the packageconf files for LIB
+macro(configure_lib VAR LIB)
+  if(DEFINED CMAKE_IMPORT_LIBRARY_PREFIX)
+    set(LIB_PREFIX ${CMAKE_IMPORT_LIBRARY_PREFIX})
+    set(LIB_SUFFIX ${CMAKE_IMPORT_LIBRARY_SUFFIX})
+  else()
+    set(LIB_PREFIX ${CMAKE_SHARED_LIBRARY_PREFIX})
+    set(LIB_SUFFIX ${CMAKE_SHARED_LIBRARY_SUFFIX})
+  endif()
+  set(${VAR} ${LIB_PREFIX}${LIB}${LIB_SUFFIX})
+  set("${VAR}DEBUG" ${LIB_PREFIX}${LIB}${CMAKE_DEBUG_POSTFIX}${LIB_SUFFIX})
+  configure_file(
+    ${CMAKE_CURRENT_SOURCE_DIR}/src/lib${LIB}.pc.in
+    ${CMAKE_CURRENT_BINARY_DIR}/lib${LIB}.pc @ONLY)
+  install (FILES ${CMAKE_CURRENT_BINARY_DIR}/lib${LIB}.pc DESTINATION ${LIB_INSTALL_DIR}/pkgconfig)
+endmacro()
 
-if (DEFINED CMAKE_IMPORT_LIBRARY_PREFIX)
-set(PROTONLIB ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton${CMAKE_IMPORT_LIBRARY_SUFFIX})
-set(PROTONLIBDEBUG ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton${CMAKE_DEBUG_POSTFIX}${CMAKE_IMPORT_LIBRARY_SUFFIX})
-set(PROTONCORELIB ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-core${CMAKE_IMPORT_LIBRARY_SUFFIX})
-set(PROTONCORELIBDEBUG ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-core${CMAKE_DEBUG_POSTFIX}${CMAKE_IMPORT_LIBRARY_SUFFIX})
-else ()
-set(PROTONLIB ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton${CMAKE_SHARED_LIBRARY_SUFFIX})
-set(PROTONLIBDEBUG ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX})
-set(PROTONCORELIB ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-core${CMAKE_SHARED_LIBRARY_SUFFIX})
-set(PROTONCORELIBDEBUG ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-core${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX})
-endif ()
+configure_lib(PROTONLIB qpid-proton)
+configure_lib(PROTONCORELIB qpid-proton-core)
+configure_lib(PROTONPROACTORLIB qpid-proton-proactor)
 
 include(WriteBasicConfigVersionFile)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/import_export.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/import_export.h b/proton-c/include/proton/import_export.h
index 6547a07..86776cd 100644
--- a/proton-c/include/proton/import_export.h
+++ b/proton-c/include/proton/import_export.h
@@ -56,6 +56,13 @@
 #  define PN_EXTERN PN_IMPORT
 #endif
 
+// For proactor proton symbols
+#if defined(qpid_proton_proactor_EXPORTS) || defined(qpid_proton_EXPORTS)
+#  define PNP_EXTERN PN_EXPORT
+#else
+#  define PNP_EXTERN PN_IMPORT
+#endif
+
 // For extra proton symbols
 #if defined(qpid_proton_EXPORTS)
 #  define PNX_EXTERN PN_EXPORT

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/listener.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h
index 729c095..4656ee4 100644
--- a/proton-c/include/proton/listener.h
+++ b/proton-c/include/proton/listener.h
@@ -63,7 +63,7 @@ PN_EXTERN pn_condition_t *pn_listener_condition(pn_listener_t *l);
 /**
  * @cond INTERNAL
  */
-    
+
 /**
  * @deprecated
  *
@@ -81,7 +81,7 @@ PN_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context);
 /**
  * @endcond
  */
-    
+
 /**
  * Get the attachments that are associated with a listener object.
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/proactor.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h
index 695bbb1..71a7dda 100644
--- a/proton-c/include/proton/proactor.h
+++ b/proton-c/include/proton/proactor.h
@@ -53,13 +53,13 @@ extern "C" {
 /**
  * Create a proactor. Must be freed with pn_proactor_free()
  */
-pn_proactor_t *pn_proactor(void);
+PNP_EXTERN pn_proactor_t *pn_proactor(void);
 
 /**
  * Free the proactor. Abort any open network connections and clean up all
  * associated resources.
  */
-void pn_proactor_free(pn_proactor_t *proactor);
+PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor);
 
 /**
  * Connect connection to host/port. Connection and transport events will be
@@ -72,9 +72,9 @@ void pn_proactor_free(pn_proactor_t *proactor);
  *
  * @return error on immediate error, e.g. an allocation failure.
  * Other errors are indicated by connection or transport events via
- * pn_proactor_wait()
+PNP_EXTERN  * pn_proactor_wait()
  */
-int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
+PNP_EXTERN int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
                         const char *host, const char *port);
 
 /**
@@ -91,7 +91,7 @@ int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection,
  * Other errors are indicated by pn_listener_condition() on the
  * PN_LISTENER_CLOSE event.
  */
-int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
+PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
                        const char *host, const char *port, int backlog);
 
 /**
@@ -111,7 +111,7 @@ int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener,
  * batch must be handled in sequence, but batches returned by separate
  * calls to pn_proactor_wait() can be handled concurrently.
  */
-pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
+PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
 
 /**
  * Call when done handling a batch of events.
@@ -122,7 +122,7 @@ pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor);
  * @note Thread-safe: may be called from any thread provided the
  * exactly once rule is respected.
  */
-void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events);
+PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events);
 
 /**
  * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one call of
@@ -136,7 +136,7 @@ void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events);
  *
  * @note Thread-safe.
  */
-void pn_proactor_interrupt(pn_proactor_t *proactor);
+PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor);
 
 /**
  * Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait()
@@ -148,7 +148,7 @@ void pn_proactor_interrupt(pn_proactor_t *proactor);
  * timeout. `pn_proactor_set_timeout(0)` will cancel the timeout
  * without setting a new one.
  */
-void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
+PNP_EXTERN void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
 
 /**
  * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if
@@ -160,22 +160,22 @@ void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout);
  * Wakes can be "coalesced" - if several pn_connection_wake() calls happen
  * concurrently, there may be only one PN_CONNECTION_WAKE event.
  */
-void pn_connection_wake(pn_connection_t *connection);
+PNP_EXTERN void pn_connection_wake(pn_connection_t *connection);
 
 /**
  * Return the proactor associated with a connection or NULL.
  */
-pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
+PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection);
 
 /**
  * Return the proactor associated with an event or NULL.
  */
-pn_proactor_t *pn_event_proactor(pn_event_t *event);
+PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event);
 
 /**
  * Return the listener associated with an event or NULL.
  */
-pn_listener_t *pn_event_listener(pn_event_t *event);
+PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event);
 
 /**
  * @}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/types.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h
index 4400393..1abe9e6 100644
--- a/proton-c/include/proton/types.h
+++ b/proton-c/include/proton/types.h
@@ -407,6 +407,12 @@ typedef struct pn_delivery_t pn_delivery_t;
 typedef struct pn_collector_t pn_collector_t;
 
 /**
+ * A listener accepts connections.
+ * @ingroup listener
+ */
+typedef struct pn_listener_t pn_listener_t;
+
+/**
  * An AMQP Transport object.
  *
  * A pn_transport_t encapsulates the transport related state of all
@@ -419,6 +425,11 @@ typedef struct pn_collector_t pn_collector_t;
 typedef struct pn_transport_t pn_transport_t;
 
 /**
+ * The proactor, see pn_proactor()
+ */
+typedef struct pn_proactor_t pn_proactor_t;
+
+/**
  * @cond INTERNAL
  *
  * An event handler
@@ -426,12 +437,6 @@ typedef struct pn_transport_t pn_transport_t;
  * A pn_handler_t is target of ::pn_event_t dispatched by the pn_reactor_t
  */
 typedef struct pn_handler_t pn_handler_t;
-
-/**
- *
- */
-typedef struct pn_proactor_t pn_proactor_t;
-typedef struct pn_listener_t pn_listener_t;
 /**
  * @endcond
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/src/libqpid-proton-proactor.pc.in
----------------------------------------------------------------------
diff --git a/proton-c/src/libqpid-proton-proactor.pc.in b/proton-c/src/libqpid-proton-proactor.pc.in
new file mode 100644
index 0000000..19007a8
--- /dev/null
+++ b/proton-c/src/libqpid-proton-proactor.pc.in
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+prefix=@PREFIX@
+exec_prefix=@EXEC_PREFIX@
+libdir=@LIBDIR@
+includedir=@INCLUDEDIR@
+
+Name: Proton Proactor
+Description: Qpid Proton C proative IO library
+Version: @PN_VERSION@
+URL: http://qpid.apache.org/proton/
+Libs: -L${libdir} -lqpid-proton-proactor
+Cflags: -I${includedir}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/src/proactor/libuv.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c
new file mode 100644
index 0000000..42bbfab
--- /dev/null
+++ b/proton-c/src/proactor/libuv.c
@@ -0,0 +1,873 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <uv.h>
+
+#include <proton/condition.h>
+#include <proton/connection_driver.h>
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/object.h>
+#include <proton/proactor.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/*
+  libuv loop functions are thread unsafe. The only exception is uv_async_send()
+  which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+
+  To provide concurrency the proactor uses a "leader-worker-follower" model,
+  threads take turns at the roles:
+
+  - a single "leader" calls libuv functions and runs the uv_loop in short bursts
+    to generate work. When there is work available it gives up leadership and
+    becomes a "worker"
+
+  - "workers" handle events concurrently for distinct connections/listeners
+    They do as much work as they can get, when none is left they become "followers"
+
+  - "followers" wait for the leader to generate work and become workers.
+    When the leader itself becomes a worker, one of the followers takes over.
+
+  This model is symmetric: any thread can take on any role based on run-time
+  requirements. It also allows the IO and non-IO work associated with an IO
+  wake-up to be processed in a single thread with no context switches.
+
+  Function naming:
+  - on_ - called in leader thread via uv_run().
+  - leader_ - called in leader thread, while processing the leader_q.
+  - owner_ - called in owning thread, leader or worker but not concurrently.
+
+  Note on_ and leader_ functions can call each other, the prefix indicates the
+  path they are most often called on.
+*/
+
+const char *COND_NAME = "proactor";
+const char *AMQP_PORT = "5672";
+const char *AMQP_PORT_NAME = "amqp";
+const char *AMQPS_PORT = "5671";
+const char *AMQPS_PORT_NAME = "amqps";
+
+PN_HANDLE(PN_PROACTOR)
+
+/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management.
+   Class definitions are for identification as pn_event_t context only.
+*/
+PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
+PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
+
+/* common to connection and listener */
+typedef struct psocket_t {
+  /* Immutable */
+  pn_proactor_t *proactor;
+
+  /* Protected by proactor.lock */
+  struct psocket_t* next;
+  void (*wakeup)(struct psocket_t*); /* interrupting action for leader */
+
+  /* Only used by leader */
+  uv_tcp_t tcp;
+  void (*action)(struct psocket_t*); /* deferred action for leader */
+  bool is_conn:1;
+  char host[NI_MAXHOST];
+  char port[NI_MAXSERV];
+} psocket_t;
+
+/* Special value for psocket.next pointer when socket is not on any any list. */
+psocket_t UNLISTED;
+
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) {
+  ps->proactor = p;
+  ps->next = &UNLISTED;
+  ps->is_conn = is_conn;
+  ps->tcp.data = ps;
+
+  /* For platforms that don't know about "amqp" and "amqps" service names. */
+  if (strcmp(port, AMQP_PORT_NAME) == 0)
+    port = AMQP_PORT;
+  else if (strcmp(port, AMQPS_PORT_NAME) == 0)
+    port = AMQPS_PORT;
+  /* Set to "\001" to indicate a NULL as opposed to an empty string "" */
+  strncpy(ps->host, host ? host : "\001", sizeof(ps->host));
+  strncpy(ps->port, port ? port : "\001", sizeof(ps->port));
+}
+
+/* Turn "\001" back to NULL */
+static inline const char* fixstr(const char* str) {
+  return str[0] == '\001' ? NULL : str;
+}
+
+typedef struct pconnection_t {
+  psocket_t psocket;
+
+  /* Only used by owner thread */
+  pn_connection_driver_t driver;
+
+  /* Only used by leader */
+  uv_connect_t connect;
+  uv_timer_t timer;
+  uv_write_t write;
+  uv_shutdown_t shutdown;
+  size_t writing;
+  bool reading:1;
+  bool server:1;                /* accept, not connect */
+} pconnection_t;
+
+struct pn_listener_t {
+  psocket_t psocket;
+
+  /* Only used by owner thread */
+  pconnection_t *accepting;     /* accept in progress */
+  pn_condition_t *condition;
+  pn_collector_t *collector;
+  pn_event_batch_t batch;
+  pn_record_t *attachments;
+  void *context;
+  size_t backlog;
+};
+
+
+typedef struct queue { psocket_t *front, *back; } queue;
+
+struct pn_proactor_t {
+  /* Leader thread  */
+  uv_cond_t cond;
+  uv_loop_t loop;
+  uv_async_t async;
+  uv_timer_t timer;
+
+  /* Owner thread: proactor collector and batch can belong to leader or a worker */
+  pn_collector_t *collector;
+  pn_event_batch_t batch;
+
+  /* Protected by lock */
+  uv_mutex_t lock;
+  queue start_q;
+  queue worker_q;
+  queue leader_q;
+  size_t interrupt;             /* pending interrupts */
+  pn_millis_t timeout;
+  size_t count;                 /* psocket count */
+  bool inactive:1;
+  bool timeout_request:1;
+  bool timeout_elapsed:1;
+  bool has_leader:1;
+  bool batch_working:1;          /* batch belongs to a worker.  */
+};
+
+static bool push_lh(queue *q, psocket_t *ps) {
+  if (ps->next != &UNLISTED)  /* Don't move if already listed. */
+    return false;
+  ps->next = NULL;
+  if (!q->front) {
+    q->front = q->back = ps;
+  } else {
+    q->back->next = ps;
+    q->back =  ps;
+  }
+  return true;
+}
+
+static psocket_t* pop_lh(queue *q) {
+  psocket_t *ps = q->front;
+  if (ps) {
+    q->front = ps->next;
+    ps->next = &UNLISTED;
+  }
+  return ps;
+}
+
+static inline pconnection_t *as_pconnection_t(psocket_t* ps) {
+  return ps->is_conn ? (pconnection_t*)ps : NULL;
+}
+
+static inline pn_listener_t *as_listener(psocket_t* ps) {
+  return ps->is_conn ? NULL: (pn_listener_t*)ps;
+}
+
+/* Put ps on the leader queue for processing. Thread safe. */
+static void to_leader_lh(psocket_t *ps) {
+  push_lh(&ps->proactor->leader_q, ps);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+}
+
+static void to_leader(psocket_t *ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Detach from IO and put ps on the worker queue */
+static void leader_to_worker(psocket_t *ps) {
+  if (ps->is_conn) {
+    pconnection_t *pc = as_pconnection_t(ps);
+    /* Don't detach if there are no events yet. */
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      if (pc->writing) {
+        pc->writing  = 0;
+        uv_cancel((uv_req_t*)&pc->write);
+      }
+      if (pc->reading) {
+        pc->reading = false;
+        uv_read_stop((uv_stream_t*)&pc->psocket.tcp);
+      }
+      if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+        uv_timer_stop(&pc->timer);
+      }
+    }
+  } else {
+    pn_listener_t *l = as_listener(ps);
+    uv_read_stop((uv_stream_t*)&l->psocket.tcp);
+  }
+  uv_mutex_lock(&ps->proactor->lock);
+  push_lh(&ps->proactor->worker_q, ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Set a deferred action for leader, if not already set. */
+static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (!ps->action) {
+    ps->action = action;
+  }
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+/* Owner thread send to worker thread. Set deferred action if not already set. */
+static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  if (!ps->action) {
+    ps->action = action;
+  }
+  push_lh(&ps->proactor->worker_q, ps);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+
+/* Re-queue for further work */
+static void worker_requeue(psocket_t* ps) {
+  uv_mutex_lock(&ps->proactor->lock);
+  push_lh(&ps->proactor->worker_q, ps);
+  uv_async_send(&ps->proactor->async); /* Wake leader */
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) {
+  pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc));
+  if (!pc) return NULL;
+  if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) {
+    return NULL;
+  }
+  psocket_init(&pc->psocket, p,  true, host, port);
+  if (server) {
+    pn_transport_set_server(pc->driver.transport);
+  }
+  pn_record_t *r = pn_connection_attachments(pc->driver.connection);
+  pn_record_def(r, PN_PROACTOR, PN_VOID);
+  pn_record_set(r, PN_PROACTOR, pc);
+  return pc;
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
+
+static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
+  return (batch->next_event == proactor_batch_next) ?
+    (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+}
+
+static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
+  return (batch->next_event == listener_batch_next) ?
+    (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+}
+
+static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
+  pn_connection_driver_t *d = pn_event_batch_connection_driver(batch);
+  return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL;
+}
+
+static void leader_count(pn_proactor_t *p, int change) {
+  uv_mutex_lock(&p->lock);
+  p->count += change;
+  p->inactive = (p->count == 0);
+  uv_mutex_unlock(&p->lock);
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_pconnection_t_maybe_free(pconnection_t *pc) {
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      leader_to_worker(&pc->psocket);         /* Return to worker */
+    } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) {
+      /* All UV requests are finished */
+      pn_connection_driver_destroy(&pc->driver);
+      leader_count(pc->psocket.proactor, -1);
+      free(pc);
+    }
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_listener_maybe_free(pn_listener_t *l) {
+    if (pn_collector_peek(l->collector)) {
+      leader_to_worker(&l->psocket);         /* Return to worker */
+    } else if (!l->psocket.tcp.data) {
+      pn_condition_free(l->condition);
+      leader_count(l->psocket.proactor, -1);
+      free(l);
+    }
+}
+
+/* Free if there are no uv callbacks pending and no events */
+static void leader_maybe_free(psocket_t *ps) {
+  if (ps->is_conn) {
+    leader_pconnection_t_maybe_free(as_pconnection_t(ps));
+  } else {
+    leader_listener_maybe_free(as_listener(ps));
+  }
+}
+
+static void on_close(uv_handle_t *h) {
+  psocket_t *ps = (psocket_t*)h->data;
+  h->data = NULL;               /* Mark closed */
+  leader_maybe_free(ps);
+}
+
+static void on_shutdown(uv_shutdown_t *shutdown, int err) {
+  psocket_t *ps = (psocket_t*)shutdown->data;
+  shutdown->data = NULL;        /* Mark closed */
+  leader_maybe_free(ps);
+}
+
+static inline void leader_close(psocket_t *ps) {
+  if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) {
+    uv_close((uv_handle_t*)&ps->tcp, on_close);
+  }
+  pconnection_t *pc = as_pconnection_t(ps);
+  if (pc) {
+    pn_connection_driver_close(&pc->driver);
+    if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) {
+      uv_timer_stop(&pc->timer);
+      uv_close((uv_handle_t*)&pc->timer, on_close);
+    }
+  }
+  leader_maybe_free(ps);
+}
+
+static pconnection_t *get_pconnection_t(pn_connection_t* c) {
+  if (!c) return NULL;
+  pn_record_t *r = pn_connection_attachments(c);
+  return (pconnection_t*) pn_record_get(r, PN_PROACTOR);
+}
+
+static void leader_error(psocket_t *ps, int err, const char* what) {
+  if (ps->is_conn) {
+    pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver;
+    pn_connection_driver_bind(driver); /* Bind so errors will be reported */
+    pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s",
+                                what, fixstr(ps->host), fixstr(ps->port),
+                                uv_strerror(err));
+    pn_connection_driver_close(driver);
+  } else {
+    pn_listener_t *l = as_listener(ps);
+    pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s",
+                        what, fixstr(ps->host), fixstr(ps->port),
+                        uv_strerror(err));
+    pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE);
+  }
+  leader_to_worker(ps);               /* Worker to handle the error */
+}
+
+/* uv-initialization */
+static int leader_init(psocket_t *ps) {
+  leader_count(ps->proactor, +1);
+  int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp);
+  if (!err) {
+    pconnection_t *pc = as_pconnection_t(ps);
+    if (pc) {
+      pc->connect.data = ps;
+      int err = uv_timer_init(&ps->proactor->loop, &pc->timer);
+      if (!err) {
+        pc->timer.data = pc;
+      }
+    }
+  }
+  if (err) {
+    leader_error(ps, err, "initialization");
+  }
+  return err;
+}
+
+/* Common logic for on_connect and on_accept */
+static void leader_connect_accept(pconnection_t *pc, int err, const char *what) {
+  if (!err) {
+    leader_to_worker(&pc->psocket);
+  } else {
+    leader_error(&pc->psocket, err, what);
+  }
+}
+
+static void on_connect(uv_connect_t *connect, int err) {
+  leader_connect_accept((pconnection_t*)connect->data, err, "on connect");
+}
+
+static void on_accept(uv_stream_t* server, int err) {
+  pn_listener_t *l = (pn_listener_t*) server->data;
+  if (err) {
+    leader_error(&l->psocket, err, "on accept");
+  }
+  pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT);
+  leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */
+}
+
+static void leader_accept(psocket_t *ps) {
+  pn_listener_t * l = as_listener(ps);
+  pconnection_t *pc = l->accepting;
+  l->accepting = NULL;
+  if (pc) {
+    int err = leader_init(&pc->psocket);
+    if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp);
+    leader_connect_accept(pc, err, "on accept");
+  }
+}
+
+static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) {
+  int err = leader_init(ps);
+  struct addrinfo hints = { 0 };
+  if (server) hints.ai_flags = AI_PASSIVE;
+  if (!err) {
+    err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints);
+  }
+  return err;
+}
+
+static void leader_connect(psocket_t *ps) {
+  pconnection_t *pc = as_pconnection_t(ps);
+  uv_getaddrinfo_t info;
+  int err = leader_resolve(ps, &info, false);
+  if (!err) {
+    err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect);
+    uv_freeaddrinfo(info.addrinfo);
+  }
+  if (err) {
+    leader_error(ps, err, "connect to");
+  }
+}
+
+static void leader_listen(psocket_t *ps) {
+  pn_listener_t *l = as_listener(ps);
+   uv_getaddrinfo_t info;
+  int err = leader_resolve(ps, &info, true);
+  if (!err) {
+    err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0);
+    uv_freeaddrinfo(info.addrinfo);
+  }
+  if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+  if (err) {
+    leader_error(ps, err, "listen on ");
+  }
+}
+
+static void on_tick(uv_timer_t *timer) {
+  pconnection_t *pc = (pconnection_t*)timer->data;
+  pn_transport_t *t = pc->driver.transport;
+  if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) {
+    uv_timer_stop(&pc->timer);
+    uint64_t now = uv_now(pc->timer.loop);
+    uint64_t next = pn_transport_tick(t, now);
+    if (next) {
+      uv_timer_start(&pc->timer, on_tick, next - now, 0);
+    }
+  }
+}
+
+static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
+  pconnection_t *pc = (pconnection_t*)stream->data;
+  if (nread >= 0) {
+    pn_connection_driver_read_done(&pc->driver, nread);
+    on_tick(&pc->timer);         /* check for tick changes. */
+    leader_to_worker(&pc->psocket);
+    /* Reading continues automatically until stopped. */
+  } else if (nread == UV_EOF) { /* hangup */
+    pn_connection_driver_read_close(&pc->driver);
+    leader_maybe_free(&pc->psocket);
+  } else {
+    leader_error(&pc->psocket, nread, "on read from");
+  }
+}
+
+static void on_write(uv_write_t* write, int err) {
+  pconnection_t *pc = (pconnection_t*)write->data;
+  write->data = NULL;
+  if (err == 0) {
+    pn_connection_driver_write_done(&pc->driver, pc->writing);
+    leader_to_worker(&pc->psocket);
+  } else if (err == UV_ECANCELED) {
+    leader_maybe_free(&pc->psocket);
+  } else {
+    leader_error(&pc->psocket, err, "on write to");
+  }
+  pc->writing = 0;              /* Need to send a new write request */
+}
+
+static void on_timeout(uv_timer_t *timer) {
+  pn_proactor_t *p = (pn_proactor_t*)timer->data;
+  uv_mutex_lock(&p->lock);
+  p->timeout_elapsed = true;
+  uv_mutex_unlock(&p->lock);
+}
+
+// Read buffer allocation function for uv, just returns the transports read buffer.
+static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
+  pconnection_t *pc = (pconnection_t*)stream->data;
+  pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+  *buf = uv_buf_init(rbuf.start, rbuf.size);
+}
+
+static void leader_rewatch(psocket_t *ps) {
+  int err = 0;
+  if (ps->is_conn) {
+    pconnection_t *pc = as_pconnection_t(ps);
+    if (pc->timer.data) {         /* uv-initialized */
+      on_tick(&pc->timer);        /* Re-enable ticks if required */
+    }
+    pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
+    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+
+    /* Ticks and checking buffers can generate events, process before proceeding */
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      leader_to_worker(ps);
+    } else {                      /* Re-watch for IO */
+      if (wbuf.size > 0 && !pc->writing) {
+        pc->writing = wbuf.size;
+        uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+        pc->write.data = ps;
+        uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write);
+      } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) {
+        pc->shutdown.data = ps;
+        uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown);
+      }
+      if (rbuf.size > 0 && !pc->reading) {
+        pc->reading = true;
+        err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read);
+      }
+    }
+  } else {
+    pn_listener_t *l = as_listener(ps);
+    err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept);
+  }
+  if (err) {
+    leader_error(ps, err, "rewatch");
+  }
+}
+
+/* Set the event in the proactor's batch  */
+static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) {
+  pn_collector_put(p->collector, pn_proactor__class(), p, t);
+  p->batch_working = true;
+  return &p->batch;
+}
+
+/* Return the next event batch or 0 if no events are ready */
+static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) {
+  if (!p->batch_working) {       /* Can generate proactor events */
+    if (p->inactive) {
+      p->inactive = false;
+      return proactor_batch_lh(p, PN_PROACTOR_INACTIVE);
+    }
+    if (p->interrupt > 0) {
+      --p->interrupt;
+      return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT);
+    }
+    if (p->timeout_elapsed) {
+      p->timeout_elapsed = false;
+      return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
+    }
+  }
+  for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) {
+    if (ps->is_conn) {
+      pconnection_t *pc = as_pconnection_t(ps);
+      return &pc->driver.batch;
+    } else {                    /* Listener */
+      pn_listener_t *l = as_listener(ps);
+      return &l->batch;
+    }
+    to_leader(ps);      /* No event, back to leader */
+  }
+  return 0;
+}
+
+/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */
+static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) {
+  uv_mutex_lock(&ps->proactor->lock);
+  ps->wakeup = action;
+  to_leader_lh(ps);
+  uv_mutex_unlock(&ps->proactor->lock);
+}
+
+pn_listener_t *pn_event_listener(pn_event_t *e) {
+  return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL;
+}
+
+pn_proactor_t *pn_event_proactor(pn_event_t *e) {
+  if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e);
+  pn_listener_t *l = pn_event_listener(e);
+  if (l) return l->psocket.proactor;
+  pn_connection_t *c = pn_event_connection(e);
+  if (c) return pn_connection_proactor(pn_event_connection(e));
+  return NULL;
+}
+
+void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
+  pconnection_t *pc = batch_pconnection(batch);
+  if (pc) {
+    if (pn_connection_driver_has_event(&pc->driver)) {
+      /* Process all events before going back to IO. */
+      worker_requeue(&pc->psocket);
+    } else if (pn_connection_driver_finished(&pc->driver)) {
+      owner_to_leader(&pc->psocket, leader_close);
+    } else {
+      owner_to_leader(&pc->psocket, leader_rewatch);
+    }
+    return;
+  }
+  pn_listener_t *l = batch_listener(batch);
+  if (l) {
+    owner_to_leader(&l->psocket, leader_rewatch);
+    return;
+  }
+  pn_proactor_t *bp = batch_proactor(batch);
+  if (bp == p) {
+    uv_mutex_lock(&p->lock);
+    p->batch_working = false;
+    uv_async_send(&p->async); /* Wake leader */
+    uv_mutex_unlock(&p->lock);
+    return;
+  }
+}
+
+/* Run follower/leader loop till we can return an event and be a worker */
+pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
+  uv_mutex_lock(&p->lock);
+  /* Try to grab work immediately. */
+  pn_event_batch_t *batch = get_batch_lh(p);
+  if (batch == NULL) {
+    /* No work available, follow the leader */
+    while (p->has_leader) {
+      uv_cond_wait(&p->cond, &p->lock);
+    }
+    /* Lead till there is work to do. */
+    p->has_leader = true;
+    while (batch == NULL) {
+      if (p->timeout_request) {
+        p->timeout_request = false;
+        if (p->timeout) {
+          uv_timer_start(&p->timer, on_timeout, p->timeout, 0);
+        } else {
+          uv_timer_stop(&p->timer);
+        }
+      }
+      for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) {
+        void (*action)(psocket_t*) = ps->action;
+        void (*wakeup)(psocket_t*) = ps->wakeup;
+        ps->action = NULL;
+        ps->wakeup = NULL;
+        if (action || wakeup) {
+          uv_mutex_unlock(&p->lock);
+          if (action) action(ps);
+          if (wakeup) wakeup(ps);
+          uv_mutex_lock(&p->lock);
+        }
+      }
+      batch = get_batch_lh(p);
+      if (batch == NULL) {
+        uv_mutex_unlock(&p->lock);
+        uv_run(&p->loop, UV_RUN_ONCE);
+        uv_mutex_lock(&p->lock);
+      }
+    }
+    /* Signal the next leader and return to work */
+    p->has_leader = false;
+    uv_cond_signal(&p->cond);
+  }
+  uv_mutex_unlock(&p->lock);
+  return batch;
+}
+
+void pn_proactor_interrupt(pn_proactor_t *p) {
+  uv_mutex_lock(&p->lock);
+  ++p->interrupt;
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
+  uv_mutex_unlock(&p->lock);
+}
+
+void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) {
+  uv_mutex_lock(&p->lock);
+  p->timeout = t;
+  p->timeout_request = true;
+  uv_async_send(&p->async);   /* Interrupt the UV loop */
+  uv_mutex_unlock(&p->lock);
+}
+
+int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) {
+  pconnection_t *pc = new_pconnection_t(p, c, false, host, port);
+  if (!pc) {
+    return PN_OUT_OF_MEMORY;
+  }
+  /* Process PN_CONNECTION_INIT before binding */
+  owner_to_worker(&pc->psocket, leader_connect);
+  return 0;
+}
+
+int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog)
+{
+  psocket_init(&l->psocket, p, false, host, port);
+  l->backlog = backlog;
+  owner_to_leader(&l->psocket, leader_listen);
+  return 0;
+}
+
+pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
+  pconnection_t *pc = get_pconnection_t(c);
+  return pc ? pc->psocket.proactor : NULL;
+}
+
+void leader_wake_connection(psocket_t *ps) {
+  pconnection_t *pc = as_pconnection_t(ps);
+  pn_connection_t *c = pc->driver.connection;
+  pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE);
+  leader_to_worker(ps);
+}
+
+void pn_connection_wake(pn_connection_t* c) {
+  wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection);
+}
+
+pn_proactor_t *pn_proactor() {
+  pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
+  p->collector = pn_collector();
+  p->batch.next_event = &proactor_batch_next;
+  if (!p->collector) return NULL;
+  uv_loop_init(&p->loop);
+  uv_mutex_init(&p->lock);
+  uv_cond_init(&p->cond);
+  uv_async_init(&p->loop, &p->async, NULL);
+  uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */
+  p->timer.data = p;
+  return p;
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+  uv_close(h, NULL);           /* Close this handle */
+  if (!uv_loop_alive(h->loop)) /* Everything closed */
+    uv_stop(h->loop);        /* Stop the loop, pn_proactor_destroy() can return */
+}
+
+void pn_proactor_free(pn_proactor_t *p) {
+  uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */
+  uv_run(&p->loop, UV_RUN_DEFAULT);     /* Run till stop, all handles closed */
+  uv_loop_close(&p->loop);
+  uv_mutex_destroy(&p->lock);
+  uv_cond_destroy(&p->cond);
+  pn_collector_free(p->collector);
+  free(p);
+}
+
+static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
+  pn_listener_t *l = batch_listener(batch);
+  pn_event_t *handled = pn_collector_prev(l->collector);
+  if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) {
+    owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */
+  }
+  return pn_collector_next(l->collector);
+}
+
+static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
+  return pn_collector_next(batch_proactor(batch)->collector);
+}
+
+static void pn_listener_free(pn_listener_t *l) {
+  if (l) {
+    if (!l->collector) pn_collector_free(l->collector);
+    if (!l->condition) pn_condition_free(l->condition);
+    if (!l->attachments) pn_free(l->attachments);
+    free(l);
+  }
+}
+
+pn_listener_t *pn_listener() {
+  pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t));
+  if (l) {
+    l->batch.next_event = listener_batch_next;
+    l->collector = pn_collector();
+    l->condition = pn_condition();
+    l->attachments = pn_record();
+    if (!l->condition || !l->collector || !l->attachments) {
+      pn_listener_free(l);
+      return NULL;
+    }
+  }
+  return l;
+}
+
+void pn_listener_close(pn_listener_t* l) {
+  wakeup(&l->psocket, leader_close);
+}
+
+pn_proactor_t *pn_listener_proactor(pn_listener_t* l) {
+  return l ? l->psocket.proactor : NULL;
+}
+
+pn_condition_t* pn_listener_condition(pn_listener_t* l) {
+  return l->condition;
+}
+
+void *pn_listener_get_context(pn_listener_t *l) {
+  return l->context;
+}
+
+void pn_listener_set_context(pn_listener_t *l, void *context) {
+  l->context = context;
+}
+
+pn_record_t *pn_listener_attachments(pn_listener_t *l) {
+  return l->attachments;
+}
+
+int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) {
+  if (l->accepting) {
+    return PN_STATE_ERR;        /* Only one at a time */
+  }
+  l->accepting = new_pconnection_t(
+      l->psocket.proactor, c, true, l->psocket.host, l->psocket.port);
+  if (!l->accepting) {
+    return UV_ENOMEM;
+  }
+  owner_to_leader(&l->psocket, leader_accept);
+  return 0;
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message