qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/2] qpid-proton git commit: C driver for C/C++ bindings and direct C users.
Date Mon, 24 Oct 2016 16:08:20 GMT
C driver for C/C++ bindings and direct C users.

driver.h is an SPI so an IO integration can be shared between C and C++
applications and bindings. Provides common connnect/listen/wakeup API to drive
the pn_connection_engine.

Examples show C sender, receiver and broker using a libuv driver.

The driver API is source compatible so an app can be re-compiled to use a
different driver implementation by changing the PN_DRIVER_INCLUDE setting.

This is not intended for use by non-C/C++ bindings where the binding language
has its own IO and concurrency framework. Such bindings should implement their
own driver in the binding language, using native IO/threading for the
language. This driver can be a structural example, as is the Go binding.

NOTE: preview only, not finished. Issues to address include:

   - handle transport ticks
   - support for scheduled wakeup (leave task queueing outside like conn wakeup)
   - check when driver is "empty" - not monitoring anything. For clean shutdown.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/99222efc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/99222efc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/99222efc

Branch: refs/heads/aconway-libuv-driver
Commit: 99222efc1f8ade21bbe56e1f8ad3317fc1facd6c
Parents: d280f8f
Author: Alan Conway <aconway@redhat.com>
Authored: Fri Sep 30 17:03:02 2016 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Mon Oct 24 12:05:03 2016 -0400

----------------------------------------------------------------------
 config.sh.in                                |   2 +-
 examples/CMakeLists.txt                     |  13 +
 examples/c/CMakeLists.txt                   |   2 +
 examples/c/driver/CMakeLists.txt            |  42 ++
 examples/c/driver/broker.c                  | 462 ++++++++++++++++++
 examples/c/driver/libuv_driver.c            | 566 +++++++++++++++++++++++
 examples/c/driver/libuv_driver.h            | 111 +++++
 examples/c/driver/receive.c                 | 175 +++++++
 examples/c/driver/send.c                    | 199 ++++++++
 examples/c/driver/test.py                   |  49 ++
 examples/cpp/README.dox                     |   2 +
 examples/cpp/mt/broker.cpp                  |   1 +
 examples/exampletest.py                     | 183 ++++++++
 proton-c/docs/api/index.md                  |  49 +-
 proton-c/include/proton/connection_engine.h | 243 +++++-----
 proton-c/include/proton/driver.h            | 282 +++++++++++
 proton-c/include/proton/event.h             |   2 +-
 proton-c/src/driver/driver.c                |  18 +
 proton-c/src/engine/connection_engine.c     |  17 +-
 tools/cmake/Modules/FindLibuv.cmake         |  37 ++
 20 files changed, 2331 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 4902b61..edb77e6 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -40,7 +40,7 @@ RUBY_BINDINGS=$PROTON_BINDINGS/ruby
 PERL_BINDINGS=$PROTON_BINDINGS/perl
 
 # Python & Jython
-COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python
+COMMON_PYPATH=$PROTON_HOME/tests/python:$PROTON_HOME/proton-c/bindings/python:$PROTON_HOME/examples
 export PYTHONPATH=$COMMON_PYPATH:$PYTHON_BINDINGS
 export JYTHONPATH=$COMMON_PYPATH:$PROTON_HOME/proton-j/src/main/resources:$PROTON_JARS
 export CLASSPATH=$PROTON_JARS

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index 99e8315..4d744d2 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -20,6 +20,19 @@
 set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR})
 set (ProtonCpp_DIR ${CMAKE_CURRENT_SOURCE_DIR})
 
+# Set result to a native search path
+macro(set_search_path result)  # args after result are directories or search paths.
+  set(${result} ${ARGN})
+  if (UNIX)
+    string(REPLACE ";" ":" ${result} "${${result}}") # native search path separators.
+  endif()
+  file(TO_NATIVE_PATH "${${result}}" ${result}) # native slash separators
+endmacro()
+
+# Some non-python examples use exampletest.py to drive their self-tests.
+set_search_path(EXAMPLE_PYTHONPATH "${CMAKE_CURRENT_SOURCE_DIR}" "$ENV{PYTHON_PATH}")
+set(EXAMPLE_ENV "PYTHONPATH=${EXAMPLE_PYTHONPATH}")
+
 add_subdirectory(c)
 add_subdirectory(go)
 if (BUILD_CPP)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 1612a86..2fbfffb 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -17,6 +17,8 @@
 # under the License.
 #
 
+find_package(Proton REQUIRED)
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+add_subdirectory(driver)
 add_subdirectory(messenger)
 add_subdirectory(reactor)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/driver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/driver/CMakeLists.txt b/examples/c/driver/CMakeLists.txt
new file mode 100644
index 0000000..63d8777
--- /dev/null
+++ b/examples/c/driver/CMakeLists.txt
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+find_package(Proton REQUIRED)
+
+include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
+add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
+
+find_package(Libuv)
+if (Libuv_FOUND)
+  foreach(name broker send receive)
+    add_executable(libuv_${name} ${name}.c libuv_driver.c)
+    target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES})
+    set_target_properties(libuv_${name} PROPERTIES
+      COMPILE_DEFINITIONS  "PN_DRIVER_INCLUDE=\"libuv_driver.h\"")
+  endforeach()
+
+  # Add a test with the correct environment to find test executables and valgrind.
+  if(WIN32)
+    set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>")
+  else(WIN32)
+    set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+  endif(WIN32)
+  set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+  add_test(c-driver-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py)
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/driver/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/broker.c b/examples/c/driver/broker.c
new file mode 100644
index 0000000..3b40f6e
--- /dev/null
+++ b/examples/c/driver/broker.c
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/connection_engine.h>
+#include <proton/driver.h>
+#include <proton/engine.h>
+#include <proton/sasl.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+/* TODO aconway 2016-10-14: this example does not require libuv IO,
+   it uses uv.h only for portable mutex and thread functions.
+*/
+#include <uv.h>
+
+bool verbose = false;
+
+typedef struct broker_connection_t {
+    pn_driver_connection_t dc;
+    bool check_queues;
+} broker_connection_t;
+
+
+void debug(const char* fmt, ...) {
+    if (verbose) {
+        va_list(ap);
+        va_start(ap, fmt);
+        vfprintf(stderr, fmt, ap);
+        fputc('\n', stderr);
+        fflush(stderr);
+    }
+}
+
+void check(int err, const char* s) {
+    if (err != 0) {
+        perror(s);
+        exit(1);
+    }
+}
+
+void pcheck(int err, const char* s) {
+    if (err != 0) {
+        fprintf(stderr, "%s: %s", s, pn_code(err));
+        exit(1);
+    }
+}
+
+/* Simple re-sizable vector that acts as a queue */
+#define VEC(T) struct { T* data; size_t len, cap; }
+
+#define VEC_INIT(V)                                     \
+    do {                                                \
+        V.len = 0;                                      \
+        V.cap = 16;                                     \
+        void **vp = (void**)&V.data;                    \
+        *vp = malloc(V.cap * sizeof(*V.data));          \
+    } while(0)
+
+#define VEC_FINAL(V) free(V.data)
+
+#define VEC_PUSH(V, X)                                          \
+    do {                                                        \
+        if (V.len == V.cap) {                                   \
+            V.cap *= 2;                                         \
+            void **vp = (void**)&V.data;                        \
+            *vp = realloc(V.data, V.cap * sizeof(*V.data));     \
+        }                                                       \
+        V.data[V.len++] = X;                                    \
+    } while(0)                                                  \
+
+#define VEC_POP(V)                                                      \
+    do {                                                                \
+        if (V.len > 0)                                                  \
+            memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data));       \
+    } while(0)
+
+/* Simple thread-safe queue implementation */
+typedef struct queue {
+    uv_mutex_t lock;
+    char* name;
+    VEC(pn_rwbytes_t) messages;   /* Messages on the queue */
+    VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
+    struct queue *next;            /* Next queue in chain */
+} queue;
+
+static void queue_init(queue *q, const char* name, queue *next) {
+    debug("created queue %s", name);
+    uv_mutex_init(&q->lock);
+    q->name = strdup(name);
+    VEC_INIT(q->messages);
+    VEC_INIT(q->waiting);
+    q->next = next;
+}
+
+static void queue_final(queue *q) {
+    uv_mutex_destroy(&q->lock);
+    free(q->name);
+    for (size_t i = 0; i < q->messages.len; ++i)
+        free(q->messages.data[i].start);
+    VEC_FINAL(q->messages);
+    for (size_t i = 0; i < q->waiting.len; ++i)
+        pn_decref(q->waiting.data[i]);
+    VEC_FINAL(q->waiting);
+}
+
+/* Send a message on s, or record s as eating if no messages.
+   Called in s dispatch loop, assumes s has credit.
+*/
+static void queue_send(queue *q, pn_link_t *s) {
+    pn_rwbytes_t m = { 0 };
+    uv_mutex_lock(&q->lock);
+    if (q->messages.len == 0) { /* Empty, record connection as waiting */
+        debug("queue is empty %s", q->name);
+        /* Record connection for wake-up if not already on the list. */
+        pn_connection_t *c = pn_session_connection(pn_link_session(s));
+        size_t i = 0;
+        for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
+            ;
+        if (i == q->waiting.len) {
+            VEC_PUSH(q->waiting, c);
+        }
+    } else {
+        debug("sending from queue %s", q->name);
+        m = q->messages.data[0];
+        VEC_POP(q->messages);
+    }
+    uv_mutex_unlock(&q->lock);
+    if (m.start) {
+        /* FIXME aconway 2016-10-13: unique tags, see send.c */
+        pn_delivery_t *d = pn_delivery(s, pn_dtag("tag", 4));
+        pn_link_send(s, m.start, m.size);
+        pn_link_advance(s);
+        pn_delivery_settle(d);  /* Pre-settled */
+        free(m.start);
+    }
+}
+
+/* Put a message on the queue, called in receiver dispatch loop.
+   If the queue was previously empty, notify waiting senders.
+*/
+static void queue_receive(pn_driver_t *d, queue *q, pn_rwbytes_t m) {
+    debug("received to queue %s", q->name);
+    uv_mutex_lock(&q->lock);
+    VEC_PUSH(q->messages, m);
+    if (q->messages.len == 1) { /* Was empty, notify waiting connections */
+        for (size_t i = 0; i < q->waiting.len; ++i) {
+            pn_connection_t *c = q->waiting.data[i];
+            broker_connection_t *bc = (broker_connection_t*)pn_driver_connection_get(c);
+            if (bc) {
+                bc->check_queues = true;
+                pn_driver_wake(&bc->dc); /* Signal that the connection should check queues */
+            }
+        }
+        q->waiting.len = 0;
+    }
+    uv_mutex_unlock(&q->lock);
+}
+
+/* Thread safe set of queues */
+typedef struct queues_t {
+    uv_mutex_t lock;
+    queue *queues;
+} queues_t;
+
+void queues_init(queues_t *qs) {
+    uv_mutex_init(&qs->lock);
+    qs->queues = NULL;
+}
+
+void queues_final(queues_t *qs) {
+    for (queue *q = qs->queues; q; q = q->next) {
+        queue_final(q);
+        free(q);
+    }
+    uv_mutex_destroy(&qs->lock);
+}
+
+/** Get or create the named queue. */
+queue* queues_get(queues_t *qs, const char* name) {
+    uv_mutex_lock(&qs->lock);
+    queue *q;
+    for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
+        ;
+    if (!q) {
+        q = (queue*)malloc(sizeof(queue));
+        queue_init(q, name, qs->queues);
+        qs->queues = q;
+    }
+    uv_mutex_unlock(&qs->lock);
+    return q;
+}
+
+/* The broker implementation */
+typedef struct broker {
+    pn_driver_t driver;
+    pn_driver_listener_t listener;
+    queues_t queues;
+    char container_id[256];     /* AMQP container-id */
+    int threads;
+} broker;
+
+void broker_stop(broker *b) {
+    for (int i = 0; i < b->threads; ++i)
+        pn_driver_interrupt(&b->driver);
+}
+
+/* Try to send if link is sender and has credit */
+static void link_send(broker *b, pn_link_t *s) {
+    if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
+        const char *qname = pn_terminus_get_address(pn_link_source(s));
+        queue *q = queues_get(&b->queues, qname);
+        queue_send(q, s);
+    }
+}
+
+static void queue_unsub(queue *q, pn_connection_t *c) {
+    uv_mutex_lock(&q->lock);
+    for (size_t i = 0; i < q->waiting.len; ++i) {
+        if (q->waiting.data[i] == c){
+            q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
+            VEC_POP(q->waiting);
+            break;
+        }
+    }
+    uv_mutex_unlock(&q->lock);
+}
+
+/* Unsubscribe from the queue of interest to this link. */
+static void link_unsub(broker *b, pn_link_t *s) {
+    const char *qname = pn_terminus_get_address(pn_link_source(s));
+    if (qname) {
+        queue *q = queues_get(&b->queues, qname);
+        queue_unsub(q, pn_session_connection(pn_link_session(s)));
+    }
+}
+
+/* Called in connection's event loop when a connection is woken for messages.*/
+static void broker_unsub(broker *b, pn_connection_t *c) {
+    for (pn_link_t *l = pn_link_head(c, PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE);
+         l != NULL;
+         l = pn_link_next(l, PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE))
+    {
+        link_unsub(b, l);
+    }
+}
+
+static void broker_close_error(pn_event_t *e, pn_condition_t *cond) {
+    if (pn_condition_is_set(cond)) {
+        fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+                pn_condition_get_name(cond), pn_condition_get_description(cond));
+    }
+}
+
+const int WINDOW=10;            /* Incoming credit window */
+
+broker_connection_t *broker_connection(broker* b) {
+    broker_connection_t *bc = (broker_connection_t*)calloc(1, sizeof(broker_connection_t));
+    pn_driver_connection_init(&b->driver, &bc->dc);
+    bc->check_queues = false;
+    pn_connection_engine_t *eng = pn_driver_engine(&bc->dc);
+    pn_connection_set_container(eng->connection, b->container_id);
+    pn_transport_set_server(eng->transport);
+    /* No security */
+    pn_transport_require_auth(eng->transport, false);
+    pn_sasl_allowed_mechs(pn_sasl(eng->transport), "ANONYMOUS");
+    return bc;
+}
+
+/* Dispatch events for a broker connection */
+static void broker_dispatch(broker* b, broker_connection_t* bc) {
+    pn_connection_engine_t *eng = pn_driver_engine(&bc->dc);
+    pn_connection_t *c = pn_connection_engine_connection(eng);
+
+    if (bc->check_queues) {
+        int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
+        for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
+            link_send(b, l);
+        bc->check_queues = false;
+    }
+
+    pn_event_t *e;
+    while ((e = pn_connection_engine_dispatch(eng)) != NULL) {
+        switch (pn_event_type(e)) {
+          case PN_CONNECTION_REMOTE_OPEN: {
+              pn_connection_open(pn_event_connection(e)); /* Complete the open */
+              break;
+          }
+          case PN_SESSION_REMOTE_OPEN: {
+              pn_session_open(pn_event_session(e));
+              break;
+          }
+          case PN_LINK_REMOTE_OPEN: {
+              pn_link_t *l = pn_event_link(e);
+              if (pn_link_is_sender(l)) {
+                  const char *source = pn_terminus_get_address(pn_link_remote_source(l));
+                  pn_terminus_set_address(pn_link_source(l), source);
+              } else {
+                  const char* target = pn_terminus_get_address(pn_link_remote_target(l));
+                  pn_terminus_set_address(pn_link_target(l), target);
+                  pn_link_flow(l, WINDOW);
+              }
+              pn_link_open(l);
+              break;
+          }
+          case PN_LINK_FLOW: {
+              link_send(b, pn_event_link(e));
+              break;
+          }
+          case PN_DELIVERY: {
+              pn_delivery_t *d = pn_event_delivery(e);
+              pn_link_t *r = pn_delivery_link(d);
+              if (pn_link_is_receiver(r) &&
+                  pn_delivery_readable(d) && !pn_delivery_partial(d))
+              {
+                  size_t size = pn_delivery_pending(d);
+                  /* The broker does not decode the message, just forwards it. */
+                  pn_rwbytes_t m = { size, (char*)malloc(size) };
+                  pn_link_recv(r, m.start, m.size);
+                  const char *qname = pn_terminus_get_address(pn_link_target(r));
+                  queue_receive(&b->driver, queues_get(&b->queues, qname), m);
+                  pn_delivery_update(d, PN_ACCEPTED);
+                  pn_delivery_settle(d);
+                  pn_link_flow(r, WINDOW - pn_link_credit(r));
+              }
+              /* FIXME aconway 2016-09-30: settlement... */
+              break;
+          }
+
+            /* Fall through cases */
+          case PN_TRANSPORT_CLOSED:
+            broker_close_error(e, pn_transport_condition(pn_event_transport(e)));
+            /* Will be freed by connection engine */
+            break;
+          case PN_CONNECTION_REMOTE_CLOSE:
+            broker_unsub(b, pn_event_connection(e));
+            broker_close_error(e, pn_connection_condition(pn_event_connection(e)));
+            pn_connection_close(pn_event_connection(e));
+            /* Don't call pn_connection_free, will be done by connection engine. */
+            break;
+          case PN_SESSION_REMOTE_CLOSE:
+            broker_close_error(e, pn_session_condition(pn_event_session(e)));
+            pn_session_close(pn_event_session(e));
+            pn_session_free(pn_event_session(e));
+            break;
+          case PN_LINK_REMOTE_CLOSE:
+            broker_close_error(e, pn_link_condition(pn_event_link(e)));
+            pn_link_close(pn_event_link(e));
+            pn_link_free(pn_event_link(e));
+            break;
+          default:
+            break;
+        }
+    }
+}
+
+static void broker_thread(void *void_broker) {
+    broker *b = (broker*)void_broker;
+    pn_driver_t *d = (pn_driver_t*)&b->driver;
+    while (true) {
+        pn_driver_event_t e = pn_driver_wait(d);
+        switch (e.type) {
+          case PN_DRIVER_LISTENER_READY: {
+              broker_connection_t *bc = broker_connection(b);
+              pn_driver_accept(e.listener, &bc->dc);
+              break;
+          }
+          case PN_DRIVER_CONNECTION_READY:
+            broker_dispatch(b, (broker_connection_t*)e.connection);
+            pn_driver_watch(e.connection);
+            break;
+          case PN_DRIVER_INTERRUPT:
+            return;
+          case PN_DRIVER_CONNECTION_FINISHED:
+            pn_driver_connection_final(e.connection);
+            free(e.connection);
+            break;
+          case PN_DRIVER_LISTENER_FINISHED: {
+              const char *errstr = pn_driver_listener_error(e.listener);
+              if (errstr) {
+                  fprintf(stderr, "listener error: %s\n", errstr);
+              }
+              pn_driver_listener_final(e.listener);
+              broker_stop(b);
+              /* FIXME aconway 2016-10-22: this will leak connections
+                 if the listener fails after accepting some connections
+              */
+              break;
+          }
+        }
+    }
+}
+
+static void usage(const char *arg0) {
+    fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0);
+    exit(1);
+}
+
+int main(int argc, char **argv) {
+    /* Command line options */
+    const char *urlstr = NULL;
+    size_t nthreads = 4;        /* Thread count. */
+    int opt;
+    while ((opt = getopt(argc, argv, "a:t:d")) != -1) {
+        switch (opt) {
+          case 'a': urlstr = optarg; break;
+          case 't': nthreads = atoi(optarg); break;
+          case 'd': verbose = true; break;
+          default: usage(argv[0]); break;
+        }
+    }
+    if (optind < argc)
+        usage(argv[0]);
+
+    /* Run the broker */
+    broker b;
+    b.threads = nthreads;
+    pn_driver_init(&b.driver);
+    pn_driver_listener_init(&b.driver, &b.listener);
+    snprintf(b.container_id, sizeof(b.container_id), "%s:%d", argv[0], getpid());
+    queues_init(&b.queues);
+
+    /* Parse the URL or use default values */
+    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+    const char *host = url ? pn_url_get_host(url) : NULL;
+    const char *port = url ? pn_url_get_port(url) : NULL;
+    pn_driver_listen(&b.listener, host, port, NULL, 16);
+    printf("listening on '%s:%s' %zd threads\n", host, port, nthreads);
+
+    if (url) pn_url_free(url);
+    uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), nthreads);
+    for (size_t i = 0; i < nthreads-1; ++i) {
+        check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create");
+    }
+    broker_thread(&b);          /* Use the main thread too. */
+    for (size_t i = 0; i < nthreads-1; ++i) {
+        check(uv_thread_join(&threads[i]), "pthread_join");
+    }
+    pn_driver_final(&b.driver);
+    free(threads);
+    return 0;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/driver/libuv_driver.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/libuv_driver.c b/examples/c/driver/libuv_driver.c
new file mode 100644
index 0000000..c57d14e
--- /dev/null
+++ b/examples/c/driver/libuv_driver.c
@@ -0,0 +1,566 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ libuv loop functions are thread unsafe. The only exception is uv_async_send()
+ which is a thread safe "wakeup" that can wake the uv_loop from another thread.
+
+ To provide concurrency this driver uses a "leader-worker-follower" model.
+
+ - Multiple threads can be "workers" and concurrently process distinct driver connections
+   or listeners that have been woken by IO and now have non-IO work to do.
+
+ - Only one thread, the "leader", is allowed to call unsafe libuv functions and
+   run the uv_loop to wait for IO events.
+
+ - Threads with no work to do are "followers", they wait on the leader.
+
+ - The leader runs the uv_loop for one iteration, then gives up leadership and
+   becomes a worker. One of the followers becomes the next leader.
+
+ This model is symmetric: any thread can take on any role based on run-time
+ requirements. It also allows the IO and non-IO work associated with an IO
+ wake-up to be processed in a single thread with no context switches.
+
+ Connections and listeners both contain a "dsocket". Requests to modify a
+ dsocket are queued on the driver.leader_q to be handled by the leader. Sockets
+ that are ready for user processing are queued on driver.user_q.
+
+ Connections can be closed by IO (read EOF, read/write error) or by proton
+ events (SASL failures, application closing the connection etc.) Once a
+ connection is fully closed (uv_socket closed + pn_connection_engine_finished)
+
+ Listeners can similarly be closed by IO or by pn_driver_listener_close.
+*/
+
+#include <proton/driver.h>
+
+#include <proton/engine.h>
+#include <proton/message.h>
+#include <proton/object.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stddef.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <uv.h>
+
+const char COND_NAME[] = "driver";
+const char DEFAULT_SERVICE[] = "5672";
+
+/* Short aliases */
+typedef pn_uv_socket_t dsocket;
+typedef pn_uv_queue_t queue;
+typedef pn_driver_t driver;
+
+/* State of a driver socket */
+typedef enum {
+    INACTIVE,                   /* Initialized but not yet called connect/listen/accept */
+    RUNNING,                    /* Normal operation */
+    CLOSING,                    /* uv_close request pending */
+    CLOSED                      /* UV close completed */
+}  dsocket_state;
+
+/* Special value for dsocket.next pointer when socket is not on any any list. */
+dsocket UNLISTED;
+
+static void push(queue *q, dsocket *ds) {
+    if (ds->next != &UNLISTED)  /* Don't move if already listed. */
+        return;
+    ds->next = NULL;
+    if (!q->front) {
+        q->front = q->back = ds;
+    } else {
+        q->back->next = ds;
+        q->back =  ds;
+    }
+}
+
+static dsocket* pop(queue *q) {
+    dsocket *ds = q->front;
+    if (ds) {
+        q->front = ds->next;
+        ds->next = &UNLISTED;
+    }
+    return ds;
+}
+
+static void dsocket_init(dsocket* ds, driver* d, bool is_conn) {
+    ds->next = &UNLISTED;
+    ds->state = INACTIVE;
+    ds->driver = d;
+    ds->user = true;
+    ds->is_conn = is_conn;
+    ds->socket.data = ds;
+}
+
+static void set_addr(pn_uv_addr_t* addr, const char *host, const char *service) {
+    if (!host && !service)
+        service = DEFAULT_SERVICE;
+    /* Set to "\001" to indicate a NULL host/service as opposed to an empty string "" host/service */
+    strncpy(addr->host, host ? host : "\001", sizeof(addr->host));
+    strncpy(addr->service, service ? service : "\001", sizeof(addr->service));
+}
+
+static void get_addr(pn_uv_addr_t* addr, const char **host, const char **service) {
+    *host = addr->host[0] == '\001' ? NULL : addr->host;
+    *service = addr->service[0] == '\001' ? NULL : addr->service;
+}
+
+static void to_leader_lh(dsocket* ds) {
+    ds->user = false;
+    push(&ds->driver->leader_q, ds);
+    uv_async_send(&ds->driver->async); /* wake up the uv_loop */
+}
+
+static void to_leader(dsocket* ds) {
+    uv_mutex_lock(&ds->driver->lock);
+    to_leader_lh(ds);
+    uv_mutex_unlock(&ds->driver->lock);
+}
+
+static void to_user(dsocket* ds) {
+    uv_mutex_lock(&ds->driver->lock);
+    ds->user = true;
+    push(&ds->driver->user_q, ds);
+    uv_mutex_unlock(&ds->driver->lock);
+}
+
+static void on_close(uv_handle_t *socket) {
+    dsocket *ds = (dsocket*)socket->data;
+    uv_mutex_lock(&ds->driver->lock);
+    ds->state = CLOSED;
+    push(&ds->driver->user_q, ds); /* Return in FINISHED event */
+    uv_mutex_unlock(&ds->driver->lock);
+}
+
+static void do_close(dsocket *ds) {
+    switch ((dsocket_state)ds->state) {
+      case INACTIVE:
+        ds->state = CLOSED;
+        to_user(ds);
+        break;
+      case RUNNING:
+        ds->state = CLOSING;
+        uv_close((uv_handle_t*)&ds->socket, on_close);
+        break;
+      case CLOSING:
+        break;
+      case CLOSED:
+        to_user(ds);
+        break;
+    }
+}
+
+static void set_error(dsocket* ds, const char* fmt, ...) {
+    if (ds->is_conn) {
+        pn_connection_engine_t *eng = &((pn_driver_connection_t*)ds)->engine;
+        pn_condition_t* cond = pn_connection_engine_condition(eng);
+        if (!pn_condition_is_set(cond)) { /* Don't overwrite an existing error */
+            va_list ap;
+            va_start(ap, fmt);
+            pn_condition_vformat(cond, COND_NAME, fmt, ap);
+            va_end(ap);
+            pn_connection_engine_disconnected(eng);
+        }
+    } else {                    /* Listener */
+        pn_driver_listener_t *dl = (pn_driver_listener_t*)ds;
+        va_list ap;
+        va_start(ap, fmt);
+        vsnprintf(dl->error, sizeof(dl->error), fmt, ap);
+        va_end(ap);
+    }
+    do_close(ds);
+}
+
+static int set_uv_error(dsocket *ds, int err, const char* prefix) {
+    if (err < 0)
+        set_error(ds, "%s: %s", prefix, uv_strerror(err));
+    return err;
+}
+
+static void on_connect(uv_connect_t* connect, int status) {
+    pn_driver_connection_t* dc = (pn_driver_connection_t*)connect->data;
+    if (set_uv_error(&dc->dsocket, status, "cannot connect") == 0) {
+        pn_connection_engine_start(&dc->engine);
+        to_user(&dc->dsocket);    /* Process initial events before doing IO */
+    }
+}
+
+static void on_connection(uv_stream_t* server, int status) {
+    pn_driver_listener_t* dl = (pn_driver_listener_t*)server->data;
+    if (status == 0) {
+        ++dl->pending;
+        to_user(&dl->dsocket);
+    } else {
+        set_uv_error(&dl->dsocket, status, "incoming connection error");
+    }
+}
+
+static void do_connect(pn_driver_connection_t *dc) {
+    const char *host, *service;
+    get_addr(&dc->addr, &host, &service);
+    uv_getaddrinfo_t info;
+    int err = uv_getaddrinfo(&dc->dsocket.driver->loop, &info, NULL, host, service, NULL);
+    if (!err) {
+        err = uv_tcp_connect(&dc->connect, &dc->dsocket.socket, info.addrinfo->ai_addr, on_connect);
+        uv_freeaddrinfo(info.addrinfo);
+    }
+    if (err)
+        set_error(&dc->dsocket, "connect to %s:%s: %s", host, service, uv_strerror(err));
+}
+
+static void do_listen(pn_driver_listener_t *dl) {
+    const char *host, *service;
+    get_addr(&dl->addr, &host, &service);
+    uv_getaddrinfo_t info;
+    int err = uv_getaddrinfo(&dl->dsocket.driver->loop, &info, NULL, host, service, NULL);
+    if (!err) {
+        err = uv_tcp_bind(&dl->dsocket.socket, info.addrinfo->ai_addr, 0);
+        if (!err)
+            err = uv_listen((uv_stream_t*)&dl->dsocket.socket, dl->backlog, on_connection);
+        uv_freeaddrinfo(info.addrinfo);
+    }
+    if (err)
+        set_error(&dl->dsocket, "listen on %s:%s: %s", host, service, uv_strerror(err));
+}
+
+static void do_accept(pn_driver_connection_t *dc) {
+    pn_driver_listener_t *dl = dc->listener;
+    const char *host, *service;
+    get_addr(&dl->addr, &host, &service);
+
+    if (dl->pending == 0) {
+        set_error(&dl->dsocket, "accept from %s:%s: %s", host, service,
+                  "no connection available");
+        return;
+    }
+    --dl->pending;
+    int err = uv_accept((uv_stream_t*)&dl->dsocket.socket, (uv_stream_t*)&dc->dsocket.socket);
+    if (err) {
+        set_error(&dl->dsocket, "accept from %s:%s: %s", host, service, uv_strerror(err));
+        set_error(&dc->dsocket, "accept from %s:%s: %s", host, service, uv_strerror(err));
+    }
+}
+
+static void do_activate(dsocket *ds) {
+    int err = uv_tcp_init(&ds->driver->loop, &ds->socket);
+    if (err) {
+        set_uv_error(ds, err, "tcp socket init");
+        return;
+    }
+    if (ds->is_conn) {
+        pn_driver_connection_t *dc = (pn_driver_connection_t*)ds;
+        if (dc->is_accept)
+            do_accept(dc);
+        else
+            do_connect(dc);
+        to_user(ds);              /* Process initial events */
+    } else {
+        do_listen((pn_driver_listener_t*)ds);
+    }
+}
+
+static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) {
+    pn_driver_connection_t *dc = (pn_driver_connection_t*)stream->data;
+    if (nread >= 0) {
+        pn_connection_engine_read_done(&dc->engine, nread);
+        if (!dc->writing) {     /* Don't go ready if write is pending */
+            uv_read_stop(stream);
+            dc->reading = false;
+            to_user(&dc->dsocket);
+        }
+    } else if (nread == UV_EOF) { /* hangup */
+        pn_connection_engine_read_close(&dc->engine);
+        to_user(&dc->dsocket);
+    } else {
+        set_uv_error(&dc->dsocket, nread, "read");
+    }
+}
+
+static void on_write(uv_write_t* request, int status) {
+    if (status == UV_ECANCELED)
+        return;                 /* Nothing to do */
+    pn_driver_connection_t *dc = (pn_driver_connection_t*)request->data;
+    if (set_uv_error(&dc->dsocket, status, "write") == 0) {
+        pn_connection_engine_write_done(&dc->engine, dc->writing);
+        dc->writing = 0;
+        if (dc->reading) {         /* Cancel the read request before going ready. */
+            uv_read_stop((uv_stream_t*)&dc->dsocket.socket);
+            dc->reading = false;
+        }
+        to_user(&dc->dsocket);
+    }
+}
+
+// Read buffer allocation function just returns the engine's read buffer.
+static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) {
+    pn_driver_connection_t *dc = (pn_driver_connection_t*)stream->data;
+    pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&dc->engine);
+    *buf = uv_buf_init(rbuf.start, rbuf.size);
+}
+
+static void do_connection(pn_driver_connection_t *dc) {
+    uv_mutex_lock(&dc->dsocket.driver->lock);
+    bool do_wake = dc->dsocket.wake;
+    dc->dsocket.wake = false;
+    uv_mutex_unlock(&dc->dsocket.driver->lock);
+
+    if (do_wake) {
+        /* Detach from the IO loop before sending to user. */
+        if (dc->writing) {
+            uv_cancel((uv_req_t*)&dc->write);
+            dc->writing  = 0;
+        }
+        if (dc->reading) {
+            uv_read_stop((uv_stream_t*)&dc->dsocket.socket);
+            dc->reading = false;
+        }
+        to_user(&dc->dsocket);
+    } else if (pn_connection_engine_finished(&dc->engine)) {
+        do_close(&dc->dsocket); /* Request close */
+    } else {                    /* Check for IO */
+        pn_bytes_t wbuf = pn_connection_engine_write_buffer(&dc->engine);
+        pn_rwbytes_t rbuf = pn_connection_engine_read_buffer(&dc->engine);
+        /* Calling write_buffer can generate events.
+           Make all events are processed before we resume IO, since events may
+           close the transport.
+        */
+        if (pn_collector_peek(dc->engine.collector)) {
+            to_user(&dc->dsocket);
+        } else {                    /* Really resume IO */
+            if (wbuf.size > 0 && !dc->writing) {
+                dc->writing = wbuf.size;
+                uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size);
+                uv_write(&dc->write, (uv_stream_t*)&dc->dsocket.socket, &buf, 1, on_write);
+            }
+            if (rbuf.size > 0 && !dc->reading) {
+                dc->reading = true;
+                uv_read_start((uv_stream_t*)&dc->dsocket.socket, alloc_read_buffer, on_read);
+            }
+        }
+    }
+}
+
+static void do_listener(pn_driver_listener_t *dl) {
+    uv_mutex_lock(&dl->dsocket.driver->lock);
+    bool do_wake = dl->dsocket.wake;
+    uv_mutex_unlock(&dl->dsocket.driver->lock);
+    if (do_wake)
+        do_close(&dl->dsocket);
+    else if (dl->pending > 0)        /* Ready for another accept */
+        to_user(&dl->dsocket);
+    /* Don't need to resume IO, the uv_listen call remains in force all the time. */
+}
+
+/* Called by leader to handle queued requests */
+static void do_socket(dsocket *ds) {
+    switch ((dsocket_state)ds->state) {
+      case INACTIVE:
+        do_activate(ds);
+        if (ds->state == INACTIVE)
+            ds->state = RUNNING;
+        break;
+      case RUNNING:
+        if (ds->is_conn) {
+            do_connection((pn_driver_connection_t*)ds);
+        } else {
+            do_listener((pn_driver_listener_t*)ds);
+        }
+        break;
+      case CLOSING:
+        break;
+      case CLOSED:
+        to_user(ds);              /* Return FINISHED event to user */
+        break;
+    }
+}
+
+/* Fill in event and return true or return false if no events are available. */
+bool get_event_lh(driver *d, pn_driver_event_t *event) {
+    if (d->interrupt > 0) {
+        --d->interrupt;
+        event->type = PN_DRIVER_INTERRUPT;
+        event->connection = NULL;
+        return true;
+    }
+    dsocket *ds = pop(&d->user_q);
+    if (!ds) return false;
+    if (ds->is_conn) {
+        pn_driver_connection_t *dc = event->connection = (pn_driver_connection_t*)ds;
+        if (pn_connection_engine_finished(&dc->engine) && ds->state == CLOSED)
+            event->type = PN_DRIVER_CONNECTION_FINISHED;
+        else
+            event->type = PN_DRIVER_CONNECTION_READY;
+        return true;
+    } else {                    /* Listener */
+        pn_driver_listener_t *dl = event->listener = (pn_driver_listener_t*)ds;
+        if (dl->dsocket.state == CLOSED) {
+            event->type = PN_DRIVER_LISTENER_FINISHED;
+            return true;
+        }
+        else if (dl->pending > 0) {
+            event->type = PN_DRIVER_LISTENER_READY;
+            return true;
+        }
+    }
+    return false;
+}
+
+
+pn_driver_event_t pn_driver_wait(struct pn_driver_t* d) {
+    uv_mutex_lock(&d->lock);
+    pn_driver_event_t event;
+    /* Try to grab work immediately. */
+    if (!get_event_lh(d, &event)) {
+        /* No work available, follow the leader */
+        while (d->has_leader)
+            uv_cond_wait(&d->cond, &d->lock);
+        d->has_leader = true;       /* I am the leader */
+        while (!get_event_lh(d, &event)) { /* Lead till there is work to do. */
+            /* Run IO outside the lock */
+            uv_mutex_unlock(&d->lock);
+            uv_run(&d->loop, UV_RUN_ONCE);
+            uv_mutex_lock(&d->lock);
+            /* Process leader requests outside the lock */
+            for (dsocket* ds = pop(&d->leader_q); ds; ds = pop(&d->leader_q)) {
+                uv_mutex_unlock(&d->lock);
+                do_socket(ds);
+                uv_mutex_lock(&d->lock);
+            }
+        }
+        d->has_leader = false;
+        uv_cond_signal(&d->cond); /* Signal the next leader */
+    }
+    uv_mutex_unlock(&d->lock);
+    return event;
+}
+
+void pn_driver_interrupt(pn_driver_t *d) {
+    uv_mutex_lock(&d->lock);
+    ++d->interrupt;
+    uv_async_send(&d->async);   /* Interrupt the UV loop */
+    uv_mutex_unlock(&d->lock);
+}
+
+void pn_driver_connect(pn_driver_connection_t* dc, const char *host, const char *service, const char *network) {
+    dc->is_accept = false;
+    set_addr(&dc->addr, host, service);
+    pn_connection_engine_start(&dc->engine);
+    to_leader(&dc->dsocket);
+}
+
+void pn_driver_watch(pn_driver_connection_t *dc) {
+    to_leader(&dc->dsocket);
+}
+
+static void on_connect(uv_connect_t* connect, int status);
+
+void pn_driver_connection_init(pn_driver_t *d, pn_driver_connection_t *dc) {
+    memset(dc, 0, sizeof(*dc));
+    dsocket_init(&dc->dsocket, d,  true);
+    pn_connection_engine_init(&dc->engine);
+    pn_connection_set_context(dc->engine.connection, dc);
+    dc->connect.data = dc;
+    dc->write.data = dc;
+}
+
+void pn_driver_connection_final(pn_driver_connection_t* dc) {
+    pn_connection_engine_final(&dc->engine);
+}
+
+void pn_driver_wake(pn_driver_connection_t* dc) {
+    uv_mutex_lock(&dc->dsocket.driver->lock);
+    dc->dsocket.wake = true;
+    if (!dc->dsocket.user)
+        to_leader_lh(&dc->dsocket);
+    uv_mutex_unlock(&dc->dsocket.driver->lock);
+}
+
+void pn_driver_listener_init(pn_driver_t *d, pn_driver_listener_t *dl) {
+    memset(dl, 0, sizeof(*dl));
+    dsocket_init(&dl->dsocket, d, false);
+}
+
+void pn_driver_listen(pn_driver_listener_t *dl, const char *host, const char *service, const char *network, int backlog) {
+    set_addr(&dl->addr, host, service);
+    dl->backlog = backlog;
+    dl->pending = 0;
+    to_leader(&dl->dsocket);
+}
+
+void pn_driver_accept(pn_driver_listener_t* dl, pn_driver_connection_t* dc) {
+    dc->is_accept = true;
+    dc->listener = dl;
+    pn_connection_engine_start(&dc->engine);
+    to_leader(&dc->dsocket);
+    to_leader(&dl->dsocket);            /* Re-activate the listener */
+}
+
+void pn_driver_listener_close(pn_driver_listener_t* dl) {
+    uv_mutex_lock(&dl->dsocket.driver->lock);
+    dl->dsocket.wake = true;
+    if (!dl->dsocket.user)
+        to_leader_lh(&dl->dsocket);
+    uv_mutex_unlock(&dl->dsocket.driver->lock);
+}
+
+const char* pn_driver_listener_error(pn_driver_listener_t* dl) {
+    return dl->error[0] ? dl->error : NULL;
+}
+
+void pn_driver_listener_final(pn_driver_listener_t* l) {
+    /* Nothing to do. */
+}
+
+void pn_driver_init(pn_driver_t *d) {
+    memset(d, '\0', sizeof(*d));
+    uv_mutex_init(&d->lock);
+    uv_cond_init(&d->cond);
+    uv_loop_init(&d->loop);
+    uv_async_init(&d->loop, &d->async, NULL); /* Just wake the loop */
+}
+
+static void on_stopping(uv_handle_t* h, void* v) {
+    uv_close(h, NULL);
+    if (!uv_loop_alive(h->loop))
+        uv_stop(h->loop);
+}
+
+void pn_driver_final(pn_driver_t *pd) {
+    driver *d = (driver*)pd;
+    uv_walk(&d->loop, on_stopping, NULL); /* Close all handles */
+    uv_run(&d->loop, UV_RUN_DEFAULT);     /* Run till stop, all handles closed */
+    uv_loop_close(&d->loop);
+    uv_mutex_destroy(&d->lock);
+    uv_cond_destroy(&d->cond);
+}
+
+pn_driver_connection_t *pn_driver_connection_get(pn_connection_t* c) {
+    return c ? (pn_driver_connection_t*)pn_connection_get_context(c) : NULL;
+}
+
+pn_connection_engine_t *pn_driver_engine(pn_driver_connection_t *dc) {
+    return dc ? &dc->engine : NULL;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/driver/libuv_driver.h
----------------------------------------------------------------------
diff --git a/examples/c/driver/libuv_driver.h b/examples/c/driver/libuv_driver.h
new file mode 100644
index 0000000..f00629c
--- /dev/null
+++ b/examples/c/driver/libuv_driver.h
@@ -0,0 +1,111 @@
+#ifndef LIBUV_DRIVER_H
+#define LIBUV_DRIVER_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**@cond INTERNAL  */
+
+/*
+  Definitions for libuv driver implementation.
+  Included as part of proton/driver.h if libuv implementation is selected.
+
+  Defines structs specific to libuv driver, see proton/driver.h for public API.
+*/
+
+#include <proton/connection_engine.h>
+#include <uv.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct pn_driver_t pn_driver_t;
+
+typedef struct pn_uv_addr_t {
+    char host[NI_MAXHOST];
+    char service[NI_MAXSERV];
+} pn_uv_addr_t;
+
+typedef struct pn_uv_socket_t {
+    /* Protected by driver.lock */
+    struct pn_uv_socket_t* next;
+    bool user:1;                /* In use by user thread */
+    bool wake:1;                /* Wake or close requested */
+
+    /* Remaining members only used in leader thread */
+    int state;
+    uv_tcp_t socket;
+    pn_driver_t *driver;
+    bool is_conn:1;             /* True for connection, false for listener. */
+} pn_uv_socket_t;
+
+typedef struct pn_driver_connection_t {
+    pn_uv_socket_t dsocket;
+
+    /* Members only used by user or leader thread, never both at the same time. */
+    pn_connection_engine_t engine;
+    union {
+        struct {
+            pn_uv_addr_t addr;  /* for connect() */
+            uv_connect_t connect;
+        };
+        pn_driver_listener_t *listener; /* for accept() */
+    };
+    uv_write_t write;
+    size_t writing;
+    bool reading:1;
+    bool is_accept:1;
+} pn_driver_connection_t;
+
+#define PN_UV_MAX_ERR_LEN 128
+
+typedef struct pn_driver_listener_t {
+    pn_uv_socket_t dsocket;
+
+    /* Members only used by leader thread */
+    pn_uv_addr_t addr;
+    size_t backlog;
+    size_t pending;
+    char error[PN_UV_MAX_ERR_LEN];
+} pn_driver_listener_t;
+
+typedef struct pn_uv_queue_t { pn_uv_socket_t *front, *back; } pn_uv_queue_t;
+
+typedef struct pn_driver_t {
+    uv_mutex_t lock;
+    pn_uv_queue_t user_q;
+    pn_uv_queue_t leader_q;
+    size_t interrupt;
+
+    uv_cond_t cond;
+    uv_loop_t loop;
+    uv_async_t async;
+
+    bool has_leader:1;
+} pn_driver_t;
+
+#ifdef __cplusplus
+}
+#endif
+
+/**@endcond INTERNAL  */
+
+#endif // LIBUV_DRIVER_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/driver/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/receive.c b/examples/c/driver/receive.c
new file mode 100644
index 0000000..2d24bbb
--- /dev/null
+++ b/examples/c/driver/receive.c
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+    str address;
+    str container_id;
+    pn_rwbytes_t message_buffer;
+    int message_count;
+    int received;
+} app_data_t;
+
+static const int BATCH = 100; /* Batch size for unlimited receive */
+
+// Dispatch proton events.
+static void dispatch(app_data_t* app, pn_connection_engine_t* eng) {
+    for (pn_event_t *event = pn_connection_engine_dispatch(eng);
+         event != NULL;
+         event = pn_connection_engine_dispatch(eng)
+    ) {
+        switch (pn_event_type(event)) {
+
+          case PN_CONNECTION_INIT: {
+              pn_connection_t* c = pn_event_connection(event);
+              pn_connection_set_container(c, app->container_id);
+              pn_connection_open(c);
+              pn_session_t* s = pn_session(c);
+              pn_session_open(s);
+              pn_link_t* l = pn_receiver(s, "my_receiver");
+              pn_terminus_set_address(pn_link_source(l), app->address);
+              pn_link_open(l);
+              // cannot receive without granting credit:
+              pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+          } break;
+
+          case PN_DELIVERY: {
+              // A message has been received
+              pn_link_t *link = NULL;
+              pn_delivery_t *dlv = pn_event_delivery(event);
+              if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+                  /* TODO aconway 2016-10-14: decode and print message content */
+                  link = pn_delivery_link(dlv);
+                  if (!pn_delivery_settled(dlv)) {
+                      // remote has not settled, so it is tracking the delivery.  Ack
+                      // it.
+                      pn_delivery_update(dlv, PN_ACCEPTED);
+                  }
+
+                  // done with the delivery, move to the next and free it
+                  pn_link_advance(link);
+                  pn_delivery_settle(dlv);  // dlv is now freed
+
+                  if (app->message_count == 0) {
+                      // receive forever - see if more credit is needed
+                      if (pn_link_credit(link) < BATCH/2) {
+                          // Grant enough credit to bring it up to BATCH:
+                          pn_link_flow(link, BATCH - pn_link_credit(link));
+                      }
+                  } else if (++app->received >= app->message_count) {
+                      // done receiving, close the endpoints
+                      printf("%d messages received\n", app->received);
+                      pn_session_t *ssn = pn_link_session(link);
+                      pn_link_close(link);
+                      pn_session_close(ssn);
+                      pn_connection_close(pn_session_connection(ssn));
+                  }
+              }
+          } break;
+
+          case PN_TRANSPORT_CLOSED: {
+              pn_transport_t *tport = pn_event_transport(event);
+              pn_condition_t *cond = pn_transport_condition(tport);
+              if (pn_condition_is_set(cond)) {
+                  fprintf(stderr, "transport error: %s: %s\n",
+                          pn_condition_get_name(cond), pn_condition_get_description(cond));
+              }
+          } break;
+
+          default: break;
+        }
+    }
+}
+
+static void usage(const char *arg0) {
+    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+    exit(1);
+}
+
+int main(int argc, char **argv) {
+    /* Default values for application and connection. */
+    app_data_t app = {0};
+    app.message_count = 100;
+    const char* urlstr = NULL;
+
+    int opt;
+    while((opt = getopt(argc, argv, "a:m:")) != -1) {
+        switch(opt) {
+          case 'a': urlstr = optarg; break;
+          case 'm': app.message_count = atoi(optarg); break;
+          default: usage(argv[0]); break;
+        }
+    }
+    if (optind < argc)
+        usage(argv[0]);
+
+    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+    /* Parse the URL or use default values */
+    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+    const char *host = url ? pn_url_get_host(url) : NULL;
+    const char *port = url ? pn_url_get_port(url) : NULL;
+    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+    /* Create the driver and connect */
+    pn_driver_t driver;
+    pn_driver_init(&driver);
+    pn_driver_connection_t dc;
+    pn_driver_connection_init(&driver, &dc);
+    pn_driver_connect(&dc, host, port, NULL);
+    if (url) pn_url_free(url);
+
+    for (pn_driver_event_t e = pn_driver_wait(&driver);
+         e.type != PN_DRIVER_INTERRUPT;
+         e = pn_driver_wait(&driver))
+    {
+        switch (e.type) {
+          case PN_DRIVER_CONNECTION_READY:
+            dispatch(&app, pn_driver_engine(e.connection));
+            pn_driver_watch(e.connection);
+            break;
+          case PN_DRIVER_CONNECTION_FINISHED:
+            pn_driver_connection_final(e.connection);
+            pn_driver_interrupt(&driver);
+            break;
+          default:
+            break;
+        }
+    }
+    pn_driver_final(&driver);
+    return 0;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/driver/send.c
----------------------------------------------------------------------
diff --git a/examples/c/driver/send.c b/examples/c/driver/send.c
new file mode 100644
index 0000000..9b52a0f
--- /dev/null
+++ b/examples/c/driver/send.c
@@ -0,0 +1,199 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_engine.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+    str address;
+    str container_id;
+    pn_rwbytes_t message_buffer;
+    int message_count;
+    int sent;
+    int acknowledged;
+} app_data_t;
+
+// Create a message with a map { "sequence" : number } encode it and return the
+// encoded buffer.
+static pn_bytes_t encode_message(app_data_t* app) {
+    /* Construct a message with the map { "sequence": app.sent } */
+    pn_message_t* message = pn_message();
+    pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+    pn_data_t* body = pn_message_body(message);
+    pn_data_put_map(body);
+    pn_data_enter(body);
+    pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+    pn_data_put_int(body, app->sent); /* The sequence number */
+    pn_data_exit(body);
+
+    // encode the message, expanding the encode buffer as needed
+    //
+    if (app->message_buffer.start == NULL) {
+        static const size_t initial_size = 128;
+        app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+    }
+    /* app->message_buffer is the total buffer space available. */
+    /* mbuf wil point at just the portion used by the encoded message */
+    pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+    int status = 0;
+    while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+        app->message_buffer.size *= 2;
+        app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+        mbuf.size = app->message_buffer.size;
+    }
+    if (status != 0) {
+        fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+        exit(1);
+    }
+    pn_message_free(message);
+    return pn_bytes(mbuf.size, mbuf.start);
+}
+
+// Dispatch proton events.
+static void dispatch(app_data_t* app, pn_connection_engine_t* eng) {
+    for (pn_event_t *event = pn_connection_engine_dispatch(eng);
+         event != NULL;
+         event = pn_connection_engine_dispatch(eng)
+    ) {
+        switch (pn_event_type(event)) {
+
+          case PN_CONNECTION_INIT: {
+              pn_connection_t* c = pn_event_connection(event);
+              pn_connection_set_container(c, app->container_id);
+              pn_connection_open(c);
+              pn_session_t* s = pn_session(c);
+              pn_session_open(s);
+              pn_link_t* l = pn_sender(s, "my_sender");
+              pn_terminus_set_address(pn_link_target(l), app->address);
+              pn_link_open(l);
+          } break;
+
+          case PN_LINK_FLOW: {
+              // The peer has given us some credit, now we can send messages
+              pn_link_t *sender = pn_event_link(event);
+              while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+                  ++app->sent;
+                  /* TODO aconway 2016-06-30: explain delivery tag */
+                  pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+                  pn_bytes_t msgbuf = encode_message(app);
+                  pn_link_send(sender, msgbuf.start, msgbuf.size);
+                  pn_link_advance(sender);
+              }
+          } break;
+
+          case PN_DELIVERY: {
+              // We received acknowledgedment from the peer that a message was delivered.
+              pn_delivery_t* d = pn_event_delivery(event);
+              if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+                  if (++app->acknowledged == app->message_count) {
+                      printf("%d messages sent and acknowledged\n", app->acknowledged);
+                      pn_connection_close(pn_event_connection(event));
+                  }
+              }
+          } break;
+
+          case PN_TRANSPORT_CLOSED: {
+              pn_transport_t *tport = pn_event_transport(event);
+              pn_condition_t *cond = pn_transport_condition(tport);
+              if (pn_condition_is_set(cond)) {
+                  fprintf(stderr, "transport error: %s: %s\n",
+                          pn_condition_get_name(cond), pn_condition_get_description(cond));
+              }
+          } break;
+
+          default: break;
+        }
+    }
+}
+
+static void usage(const char *arg0) {
+    fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0);
+    exit(1);
+}
+
+int main(int argc, char **argv) {
+    /* Default values for application and connection. */
+    app_data_t app = {0};
+    app.message_count = 100;
+    const char* urlstr = NULL;
+
+    int opt;
+    while((opt = getopt(argc, argv, "a:m:")) != -1) {
+        switch(opt) {
+          case 'a': urlstr = optarg; break;
+          case 'm': app.message_count = atoi(optarg); break;
+          default: usage(argv[0]); break;
+        }
+    }
+    if (optind < argc)
+        usage(argv[0]);
+
+    snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid());
+
+    /* Parse the URL or use default values */
+    pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+    const char *host = url ? pn_url_get_host(url) : NULL;
+    const char *port = url ? pn_url_get_port(url) : NULL;
+    strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address));
+
+    /* Create the driver and connect */
+    pn_driver_t driver;
+    pn_driver_init(&driver);
+    pn_driver_connection_t dc;
+    pn_driver_connection_init(&driver, &dc);
+    pn_driver_connect(&dc, host, port, NULL);
+    if (url) pn_url_free(url);
+
+    for (pn_driver_event_t e = pn_driver_wait(&driver);
+         e.type != PN_DRIVER_INTERRUPT;
+         e = pn_driver_wait(&driver))
+    {
+        switch (e.type) {
+          case PN_DRIVER_CONNECTION_READY:
+            dispatch(&app, pn_driver_engine(e.connection));
+            pn_driver_watch(e.connection);
+            break;
+          case PN_DRIVER_CONNECTION_FINISHED:
+            pn_driver_connection_final(e.connection);
+            pn_driver_interrupt(&driver);
+            break;
+          default:
+            break;
+        }
+    }
+    pn_driver_final(&driver);
+    free(app.message_buffer.start);
+    return 0;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/c/driver/test.py
----------------------------------------------------------------------
diff --git a/examples/c/driver/test.py b/examples/c/driver/test.py
new file mode 100644
index 0000000..bb4d7eb
--- /dev/null
+++ b/examples/c/driver/test.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# This is a test script to run the examples and verify that they behave as expected.
+
+from exampletest import *
+
+import unittest
+import sys
+
+def python_cmd(name):
+    dir = os.path.dirname(__file__)
+    return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
+
+class CExampleTest(BrokerTestCase):
+    broker_exe = ["libuv_broker"]
+
+    def test_send_receive(self):
+        """Send first then receive"""
+        s = self.proc(["libuv_send", "-a", self.addr])
+        self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+        r = self.proc(["libuv_receive", "-a", self.addr])
+        self.assertEqual("100 messages received\n", r.wait_out())
+
+    def test_receive_send(self):
+        """Start receiving  first, then send."""
+        r = self.proc(["libuv_receive", "-a", self.addr]);
+        s = self.proc(["libuv_send", "-a", self.addr]);
+        self.assertEqual("100 messages sent and acknowledged\n", s.wait_out())
+        self.assertEqual("100 messages received\n", r.wait_out())
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 1d46ec8..421dd34 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -161,3 +161,5 @@ A working example for accessing Service Bus session-enabled queues.
 Also provides some general notes on Service Bus usage.
 
 */
+
+/** FIXME aconway - documentation for driver examples */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/cpp/mt/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp
index 39d7132..0945a07 100644
--- a/examples/cpp/mt/broker.cpp
+++ b/examples/cpp/mt/broker.cpp
@@ -149,6 +149,7 @@ class broker_connection_handler : public proton::messaging_handler {
 
     // A sender sends messages from a queue to a subscriber.
     void on_sender_open(proton::sender &sender) OVERRIDE {
+        // FIXME aconway 2016-09-29: need to set the source address here.
         queue *q = sender.source().dynamic() ?
             queues_.dynamic() : queues_.get(sender.source().address());
         std::cout << "sending from " << q->name() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
new file mode 100644
index 0000000..d40b9cb
--- /dev/null
+++ b/examples/exampletest.py
@@ -0,0 +1,183 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# A test library to make it easy to run unittest tests that start,
+# monitor, and report output from sub-processes. In particular
+# it helps with starting processes that listen on random ports.
+
+import unittest
+import os, sys, socket, time, re, inspect, errno, threading
+from  random import randrange
+from subprocess import Popen, PIPE, STDOUT
+from copy import copy
+import platform
+from os.path import dirname as dirname
+
+def pick_port():
+    """Pick a random port."""
+    p =  randrange(10000, 20000)
+    return p
+
+class ProcError(Exception):
+    """An exception that captures failed process output"""
+    def __init__(self, proc, what="bad exit status"):
+        out = proc.out.strip()
+        if out:
+            out = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % out
+        else:
+            out = ", no output)"
+        super(Exception, self, ).__init__(
+            "%s %s, code=%s%s" % (proc.args, what, proc.returncode, out))
+
+class NotFoundError(ProcError):
+    pass
+
+class Proc(Popen):
+    """A example process that stores its output, optionally run with valgrind."""
+
+    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
+        env_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"]
+    else:
+        env_args = []
+
+    @property
+    def out(self):
+        self._out.seek(0)
+        return self._out.read()
+
+    def __init__(self, args, **kwargs):
+        """Start an example process"""
+        args = list(args)
+        self.args = args
+        self._out = os.tmpfile()
+        try:
+            Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs)
+        except OSError, e:
+            if e.errno == errno.ENOENT:
+                raise NotFoundError(self, str(e))
+            raise ProcError(self, str(e))
+        except Exception, e:
+            raise ProcError(self, str(e))
+
+    def kill(self):
+        try:
+            if self.poll() is None:
+                Popen.kill(self)
+        except:
+            pass                # Already exited.
+        return self.out
+
+    def wait_out(self, timeout=10, expect=0):
+        """Wait for process to exit, return output. Raise ProcError  on failure."""
+        t = threading.Thread(target=self.wait)
+        t.start()
+        t.join(timeout)
+        if self.poll() is None:      # Still running
+            self.kill()
+            raise ProcError(self, "timeout")
+        if expect is not None and self.poll() != expect:
+            raise ProcError(self)
+        return self.out
+
+# Work-around older python unittest that lacks setUpClass.
+if hasattr(unittest.TestCase, 'setUpClass') and  hasattr(unittest.TestCase, 'tearDownClass'):
+    TestCase = unittest.TestCase
+else:
+    class TestCase(unittest.TestCase):
+        """
+        Roughly provides setUpClass and tearDownClass functionality for older python
+        versions in our test scenarios. If subclasses override setUp or tearDown
+        they *must* call the superclass.
+        """
+        def setUp(self):
+            if not hasattr(type(self), '_setup_class_count'):
+                type(self)._setup_class_count = len(
+                    inspect.getmembers(
+                        type(self),
+                        predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_')))
+                type(self).setUpClass()
+
+        def tearDown(self):
+            self.assertTrue(self._setup_class_count > 0)
+            self._setup_class_count -=  1
+            if self._setup_class_count == 0:
+                type(self).tearDownClass()
+
+class ExampleTestCase(TestCase):
+    """TestCase that manages started processes"""
+    def setUp(self):
+        super(ExampleTestCase, self).setUp()
+        self.procs = []
+
+    def tearDown(self):
+        for p in self.procs:
+            p.kill()
+        super(ExampleTestCase, self).tearDown()
+
+    def proc(self, *args, **kwargs):
+        p = Proc(*args, **kwargs)
+        self.procs.append(p)
+        return p
+
+def wait_port(port, timeout=10):
+    """Wait up to timeout for port to be connectable."""
+    if timeout:
+        deadline = time.time() + timeout
+    while (timeout is None or time.time() < deadline):
+        try:
+            s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4
+            s.close()
+            return
+        except socket.error, e:
+            if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error.
+                raise
+    raise socket.timeout()
+
+
+class BrokerTestCase(ExampleTestCase):
+    """
+    ExampleTest that starts a broker in setUpClass and kills it in tearDownClass.
+    Subclass must set `broker_exe` class variable with the name of the broker executable.
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        cls.port = pick_port()
+        cls.addr = "127.0.0.1:%s/examples" % (cls.port)
+        cls.broker = None       # In case Proc throws, create the attribute.
+        cls.broker = Proc(cls.broker_exe + ["-a", cls.addr])
+        try:
+            wait_port(cls.port)
+        except Exception, e:
+            cls.broker.kill()
+            raise ProcError(cls.broker, "timed out waiting for port")
+
+    @classmethod
+    def tearDownClass(cls):
+        if cls.broker: cls.broker.kill()
+
+    def tearDown(self):
+        b = type(self).broker
+        if b and b.poll() !=  None: # Broker crashed
+            type(self).setUpClass() # Start another for the next test.
+            raise ProcError(b, "broker crash")
+        super(BrokerTestCase, self).tearDown()
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/proton-c/docs/api/index.md
----------------------------------------------------------------------
diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md
index 10aea84..4a6dec6 100644
--- a/proton-c/docs/api/index.md
+++ b/proton-c/docs/api/index.md
@@ -1,5 +1,50 @@
 Proton Documentation            {#index}
 ====================
 
-The proton library contains two APIs: The [Engine API](@ref engine),
-and the [Messenger API](@ref messenger).
+## The Protocol Engine
+
+The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes
+into proton [events](@ref event) and generates AMQP bytes from application
+calls.
+
+The [connection engine](@ref connection_engine) provides a simple bytes in/bytes
+out, event-driven interface so you can read AMQP data from any source, process
+the resulting [events](@ref event) and write AMQP output to any destination.
+
+There is no IO or threading code in this part of the library, so it can be
+embedded in many different environments. The proton project provides language
+bindings (Python, Ruby, Go etc.) that embed it into the standard IO and
+threading facilities of the bound language.
+
+## Integrating with IO
+
+The [Driver SPI](@ref driver) is a portable framework to build single or
+multi-threaded Proton C applications with replaceable IO implementations.
+
+The driver can initiate or listen for connections. Application threads wait for
+a [connection engine](@ref connection_engine) to become ready for processing due
+to an IO event. The application thread uses [Engine API](@ref engine) described
+above, fully decoupled from the IO mechanism.
+
+The driver is the basis for the Proton C++ binding and the
+[Qpid Disptch Router](http://qpid.apache.org/components/dispatch-router/) The
+Proton project provides drivers for many platforms, and you can implement your
+own.
+
+## Messenger and Reactor APIs
+
+The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended
+to be simple APIs that included IO support directly out of the box.
+
+They both had good points but were both based on POSIX-style polling
+assumptions, and did not support concurrent or multi-threaded use. This creates
+several problems:
+
+- Difficult to port (e.g. Windows poll() is inefficient, IOCP has a different model)
+- Difficult to integrate: even in python and ruby, foreign C code blocking on IO violates normal IO and threading assumptions and requires some dubious hacking to make it work.
+- Impossible to use in multi-threaded servers or concurrent languages like Go.
+
+Note however that the Reactor API was built around the Proton @ref engine and
+@ref event APIs. For the most part using the @ref driver or @ref connection_engine
+is the same as using the reactor. Connection setup/teardown and pumping the
+event loop are the main differences.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/99222efc/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
index b1476c7..d923eeb 100644
--- a/proton-c/include/proton/connection_engine.h
+++ b/proton-c/include/proton/connection_engine.h
@@ -20,48 +20,31 @@
  * under the License.
  */
 
-///@file
-///
-/// **Experimental** The Connection Engine API wraps up the proton engine
-/// objects associated with a single connection: pn_connection_t, pn_transport_t
-/// and pn_collector_t. It provides a simple bytes-in/bytes-out interface for IO
-/// and generates pn_event_t events to be handled by the application.
-///
-/// The connection engine can be fed with raw AMQP bytes from any source, and it
-/// generates AMQP byte output to be written to any destination. You can use the
-/// engine to integrate proton AMQP with any IO library, or native IO on any
-/// platform.
-///
-/// The engine is not thread safe but each engine is independent. Separate
-/// engines can be used concurrently. For example a multi-threaded application
-/// can process connections in multiple threads, but serialize work for each
-/// connection to the corresponding engine.
-///
-/// The engine is designed to be thread and IO neutral so it can be integrated with
-/// single or multi-threaded code in reactive or proactive IO frameworks.
-///
-/// Summary of use:
-///
-/// - while !pn_connection_engine_finished()
-///   - Call pn_connection_engine_dispatch() to dispatch events until it returns NULL.
-///   - Read data from your source into pn_connection_engine_read_buffer()
-///   - Call pn_connection_engine_read_done() when complete.
-///   - Write data from pn_connection_engine_write_buffer() to your destination.
-///   - Call pn_connection_engine_write_done() to indicate how much was written.
-///
-/// Note on blocking: the _read/write_buffer and _read/write_done functions can
-/// all generate events that may cause the engine to finish. Before you wait for
-/// IO, always drain pn_connection_engine_dispatch() till it returns NULL and
-/// check pn_connection_engine_finished() in case there is nothing more to do..
-///
-/// Note on error handling: the pn_connection_engine_*() functions do not return
-/// an error code. If an error occurs it will be reported as a
-/// PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return
-/// true once all final events have been processed.
-///
-/// @defgroup connection_engine The Connection Engine
-/// @{
-///
+
+/**
+ * @file
+ * @defgroup connection_engine Connection Engine
+ * @ingroup engine
+ *
+ * **Experimental**: AMQP input bytes into proton @ref event "events", and
+ * application calls to the @ref engine API into AMQP output bytes.
+ *
+ * Each individual engine is not thread safe but separate engines can be used
+ * concurrently. A multi-threaded application must serialize activity for each
+ * connection but can process separate connections concurrently.
+ *
+ * Note on blocking: the _read/write_buffer and _read/write_done functions can
+ * generate events that may cause the engine to finish. Before you wait for IO,
+ * always drain pn_connection_engine_dispatch() till it returns NULL and check
+ * pn_connection_engine_finished() in case there is nothing more to do.
+ *
+ * Note on error handling: the pn_connection_engine_*() functions do not return
+ * an error code. If an error occurs it will be reported as a
+ * PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return
+ * true once all final events have been processed.
+ *
+ * @{
+ */
 
 #include <proton/condition.h>
 #include <proton/event.h>
@@ -72,11 +55,11 @@
 extern "C" {
 #endif
 
-/// A connection engine is a trio of pn_connection_t, pn_transport_t and pn_collector_t.
-/// Use the pn_connection_engine_*() functions to operate on it.
-/// It is a plain struct, not a proton object. Use pn_connection_engine_init to set up
-/// the initial objects and pn_connection_engine_final to release them.
-///
+/**
+ * connection_engine is a plain struct, not a proton object.  Use
+ * pn_connection_engine_init to initialize and pn_connection_engine_final before
+ * freeing.
+ */
 typedef struct pn_connection_engine_t {
     pn_connection_t* connection;
     pn_transport_t* transport;
@@ -84,104 +67,134 @@ typedef struct pn_connection_engine_t {
     pn_event_t* event;
 } pn_connection_engine_t;
 
-/// Initialize a pn_connection_engine_t struct with a new connection and
-/// transport.
-///
-/// Configure connection properties and call connection_engine_start() before
-/// using the engine.
-///
-/// Call pn_connection_engine_final to free resources when you are done.
-///
-///@return 0 on success, a proton error code on failure (@see error.h)
-///
+/**
+ * Initialize a pn_connection_engine_t struct, create a pn_connection_t and
+ * pn_transport_t. You can configure security properties on the connection,
+ * call connection_engine_start() to bind the transport before using the engine.
+ *
+ * Call pn_connection_engine_final to free resources when you are done.
+ *
+ * @return 0 on success, a proton error code on failure (@see error.h)
+ */
 PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t* engine);
 
-/// Start the engine, call after setting security and host properties.
+/**
+ * Start the engine, call after setting security and host properties.
+ * Binds the transport and connection ready for use.
+ */
 PN_EXTERN void pn_connection_engine_start(pn_connection_engine_t* engine);
 
-/// Free resources used by the engine, set the connection and transport pointers
-/// to NULL.
+/**
+ * Free resources used by the engine, set the connection and transport pointers
+ * to NULL.
+ */
 PN_EXTERN void pn_connection_engine_final(pn_connection_engine_t* engine);
 
-/// Get the engine's read buffer. Read data from your IO source to buf.start, up
-/// to a max of buf.size. Then call pn_connection_engine_read_done().
-///
-/// buf.size==0 means the engine cannot read presently, calling
-/// pn_connection_engine_dispatch() may create more buffer space.
-///
+/**
+ * Get the engine's read buffer. Read data from your IO source to buf.start, up
+ * to a max of buf.size. Then call pn_connection_engine_read_done().
+ *
+ * buf.size==0 means the engine cannot read presently, calling
+ * pn_connection_engine_dispatch() may create more buffer space.
+ */
 PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t*);
 
-/// Consume the first n bytes of data in pn_connection_engine_read_buffer() and
-/// update the buffer.
+/**
+ * Consume the first n bytes of data in pn_connection_engine_read_buffer() and
+ * update the buffer.
+ */
 PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t*, size_t n);
 
-/// Close the read side of the transport when no more data is available.
-/// Note there may still be events for pn_connection_engine_dispatch() or data
-/// in pn_connection_engine_write_buffer()
+/**
+ * Close the read side of the transport when no more data is available.
+ * Note there may still be events for pn_connection_engine_dispatch() or data
+ * in pn_connection_engine_write_buffer()
+ */
 PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t*);
 
-/// Get the engine's write buffer. Write data from buf.start to your IO destination,
-/// up to a max of buf.size. Then call pn_connection_engine_write_done().
-///
-/// buf.size==0 means the engine has nothing to write presently.  Calling
-/// pn_connection_engine_dispatch() may generate more data.
+/**
+ * Get the engine's write buffer. Write data from buf.start to your IO destination,
+ * up to a max of buf.size. Then call pn_connection_engine_write_done().
+
+ * buf.size==0 means the engine has nothing to write presently.  Calling
+ * pn_connection_engine_dispatch() may generate more data.
+ */
 PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t*);
 
-/// Call when the first n bytes of pn_connection_engine_write_buffer() have been
-/// written to IO and can be re-used for new data.  Updates the buffer.
+/**
+ * Call when the first n bytes of pn_connection_engine_write_buffer() have been
+ * written to IO and can be re-used for new data.  Updates the buffer.
+ */
 PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t*, size_t n);
 
-/// Call when the write side of IO has closed and no more data can be written.
-/// Note that there may still be events for pn_connection_engine_dispatch() or
-/// data to read into pn_connection_engine_read_buffer().
+/**
+ * Call when the write side of IO has closed and no more data can be written.
+ * Note that there may still be events for pn_connection_engine_dispatch() or
+ * data to read into pn_connection_engine_read_buffer().
+ */
 PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t*);
 
-/// Close both sides of the transport, equivalent to
-///     pn_connection_engine_read_close(); pn_connection_engine_write_close()
-///
-/// You must still call pn_connection_engine_dispatch() to process final
-/// events.
-///
-/// To provide transport error information to the handler, set it on
-///     pn_connection_engine_condition()
-/// *before* calling pn_connection_engine_disconnected(). This sets
-/// the error on the pn_transport_t object.
-///
-/// Note this does *not* modify the pn_connection_t, so you can distinguish
-/// between a connection close error sent by the remote peer (which will set the
-/// connection condition) and a transport error (which sets the transport
-/// condition.)
-///
+/**
+ * Close both sides of the transport, equivalent to
+ *
+ *     pn_connection_engine_read_close(); pn_connection_engine_write_close()
+ *
+ * To pass a transport error to the handler, set it on
+ * pn_connection_engine_condition() *before* calling
+ * pn_connection_engine_disconnected().  Note this is different from an AMQP
+ * close sent by the remote peer, which sets the connection condition.
+ *
+ * You must still call pn_connection_engine_dispatch() to process final
+ * events.
+ */
 PN_EXTERN void pn_connection_engine_disconnected(pn_connection_engine_t*);
 
-/// Get the next available event.
-/// Call in a loop until it returns NULL to dispatch all available events.
-/// Note this call may modify the read and write buffers.
-///
-/// @return Pointer to the next event, or NULL if there are none available.
-///
+/**
+ * Get the next available event.
+ * Call in a loop until it returns NULL to dispatch all available events.
+ * Note this call may modify the read and write buffers.
+ *
+ * @return Pointer to the next event, or NULL if there are none available.
+ *
+ */
 PN_EXTERN pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t*);
 
-/// Return true if the engine is finished - all data has been written, all
-/// events have been handled and the transport is closed.
+/**
+ * Return true if the engine is finished - all data has been written, all
+ * events have been handled and the transport is closed.
+ */
 PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t*);
 
-/// Get the AMQP connection, owned by the pn_connection_engine_t.
+/**
+ * Return true if the engine transport is closed. There may still be
+ * outstanding events to process.
+
+ * Check if the transport has been closed internally (e.g. by an authentication
+ * failure) before blocking for IO.
+ */
+PN_EXTERN bool pn_connection_engine_closed(pn_connection_engine_t*);
+
+/**
+ * Get the AMQP connection, owned by the pn_connection_engine_t.
+ */
 PN_EXTERN pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t*);
 
-/// Get the proton transport, owned by the pn_connection_engine_t.
+/**
+ * Get the proton transport, owned by the pn_connection_engine_t.
+ */
 PN_EXTERN pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t*);
 
-/// Get the condition object for the engine's transport.
-///
-/// Note that IO errors should be set on this, the transport condition, not on
-/// the pn_connection_t condition. The connection's condition is for errors
-/// received via the AMQP protocol, the transport condition is for errors in the
-/// the IO layer such as a socket read or disconnect errors.
-///
+/**
+ * Get the condition object for the engine's transport.
+ *
+ * Note that IO errors should be set on this, the transport condition, not on
+ * the pn_connection_t condition. The connection's condition is for errors
+ * received via the AMQP protocol, the transport condition is for errors in the
+ * the IO layer such as a socket read or disconnect errors.
+ */
 PN_EXTERN pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t*);
 
-///@}
+/** @}*/
 
 #ifdef __cplusplus
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message