qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jr...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level
Date Mon, 11 Sep 2017 23:18:17 GMT
PROTON-1585: Remove old reactor and messenger examples; promote the proactor examples to the top level


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/564e0ca4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/564e0ca4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/564e0ca4

Branch: refs/heads/master
Commit: 564e0ca4c3367f66824e5ecf417afe70c41fb2d9
Parents: 6888ab5
Author: Justin Ross <jross@apache.org>
Authored: Mon Sep 11 15:31:15 2017 -0700
Committer: Justin Ross <jross@apache.org>
Committed: Mon Sep 11 15:44:30 2017 -0700

----------------------------------------------------------------------
 examples/c/CMakeLists.txt           |  35 ++-
 examples/c/README.dox               |  21 ++
 examples/c/broker.c                 | 424 +++++++++++++++++++++++++++++++
 examples/c/direct.c                 | 326 ++++++++++++++++++++++++
 examples/c/example_test.py          |  88 +++++++
 examples/c/messenger/CMakeLists.txt |  52 ----
 examples/c/messenger/recv-async.c   | 193 --------------
 examples/c/messenger/recv.c         | 154 -----------
 examples/c/messenger/send-async.c   | 170 -------------
 examples/c/messenger/send.c         | 111 --------
 examples/c/proactor/CMakeLists.txt  |  44 ----
 examples/c/proactor/README.dox      |  21 --
 examples/c/proactor/broker.c        | 424 -------------------------------
 examples/c/proactor/direct.c        | 326 ------------------------
 examples/c/proactor/example_test.py |  88 -------
 examples/c/proactor/receive.c       | 188 --------------
 examples/c/proactor/send.c          | 196 --------------
 examples/c/proactor/thread.h        |  49 ----
 examples/c/reactor/CMakeLists.txt   |  45 ----
 examples/c/reactor/README           |  30 ---
 examples/c/reactor/receiver.c       | 286 ---------------------
 examples/c/reactor/sender.c         | 329 ------------------------
 examples/c/receive.c                | 188 ++++++++++++++
 examples/c/send.c                   | 196 ++++++++++++++
 examples/c/thread.h                 |  49 ++++
 proton-c/src/tests/CMakeLists.txt   |   4 +-
 26 files changed, 1320 insertions(+), 2717 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 12867be..c300b00 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -17,16 +17,33 @@
 # under the License.
 #
 
-find_package(Proton REQUIRED)
+find_package(Proton REQUIRED Core Proactor)
+set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
+find_package(Threads REQUIRED)
+
 include(CheckCCompilerFlag)
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
+
+add_definitions(${COMPILE_LANGUAGE_FLAGS} ${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
+
+# Add a test with the correct environment to find test executables and valgrind.
+if(WIN32)
+  set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
+else()
+  set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
+endif()
 
-if(Proton_Proactor_FOUND)
-  if(WIN32)
-    message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
-  else(WIN32)
-    add_subdirectory(proactor)
-  endif(WIN32)
+if(WIN32)
+  message(STATUS "Windows IOCP proactor examples temporarily disabled for build")
+else()
+  foreach (name broker send receive direct)
+    add_executable(c-${name} ${name}.c)
+    target_link_libraries(c-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
+    set_target_properties(c-${name} PROPERTIES OUTPUT_NAME ${name})
+  endforeach()
 endif()
-add_subdirectory(messenger)
-add_subdirectory(reactor)
+
+set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
+
+add_test(c-example-tests ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/README.dox b/examples/c/README.dox
new file mode 100644
index 0000000..a548d35
--- /dev/null
+++ b/examples/c/README.dox
@@ -0,0 +1,21 @@
+/**
+ * @example send.c
+ *
+ * Send a fixed number of messages to the "examples" node.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example receive.c
+ *
+ * Subscribes to the 'example' node and prints the message bodies received.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example direct.c
+ *
+ * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
+ * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
+ * and will act as the directly-connected counterpart (receive or send)
+ *
+ * @example broker.c
+ *
+ * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
+ */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/broker.c b/examples/c/broker.c
new file mode 100644
index 0000000..e0d9672
--- /dev/null
+++ b/examples/c/broker.c
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "thread.h"
+
+#include <proton/engine.h>
+#include <proton/listener.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/* Simple re-sizable vector that acts as a queue */
+#define VEC(T) struct { T* data; size_t len, cap; }
+
+#define VEC_INIT(V)                             \
+  do {                                          \
+    V.len = 0;                                  \
+    V.cap = 16;                                 \
+    void **vp = (void**)&V.data;                \
+    *vp = malloc(V.cap * sizeof(*V.data));      \
+  } while(0)
+
+#define VEC_FINAL(V) free(V.data)
+
+#define VEC_PUSH(V, X)                                  \
+  do {                                                  \
+    if (V.len == V.cap) {                               \
+      V.cap *= 2;                                       \
+      void **vp = (void**)&V.data;                      \
+      *vp = realloc(V.data, V.cap * sizeof(*V.data));   \
+    }                                                   \
+    V.data[V.len++] = X;                                \
+  } while(0)                                            \
+
+#define VEC_POP(V)                                              \
+  do {                                                          \
+    if (V.len > 0)                                              \
+      memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data));     \
+  } while(0)
+
+/* Simple thread-safe queue implementation */
+typedef struct queue_t {
+  pthread_mutex_t lock;
+  char name[256];
+  VEC(pn_rwbytes_t) messages;   /* Messages on the queue_t */
+  VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
+  struct queue_t *next;            /* Next queue in chain */
+  size_t sent;                     /* Count of messages sent, used as delivery tag */
+} queue_t;
+
+static void queue_init(queue_t *q, const char* name, queue_t *next) {
+  pthread_mutex_init(&q->lock, NULL);
+  strncpy(q->name, name, sizeof(q->name));
+  VEC_INIT(q->messages);
+  VEC_INIT(q->waiting);
+  q->next = next;
+  q->sent = 0;
+}
+
+static void queue_destroy(queue_t *q) {
+  pthread_mutex_destroy(&q->lock);
+  free(q->name);
+  for (size_t i = 0; i < q->messages.len; ++i)
+    free(q->messages.data[i].start);
+  VEC_FINAL(q->messages);
+  for (size_t i = 0; i < q->waiting.len; ++i)
+    pn_decref(q->waiting.data[i]);
+  VEC_FINAL(q->waiting);
+}
+
+/* Send a message on s, or record s as eating if no messages.
+   Called in s dispatch loop, assumes s has credit.
+*/
+static void queue_send(queue_t *q, pn_link_t *s) {
+  pn_rwbytes_t m = { 0 };
+  size_t tag = 0;
+  pthread_mutex_lock(&q->lock);
+  if (q->messages.len == 0) { /* Empty, record connection as waiting */
+    /* Record connection for wake-up if not already on the list. */
+    pn_connection_t *c = pn_session_connection(pn_link_session(s));
+    size_t i = 0;
+    for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
+      ;
+    if (i == q->waiting.len) {
+      VEC_PUSH(q->waiting, c);
+    }
+  } else {
+    m = q->messages.data[0];
+    VEC_POP(q->messages);
+    tag = ++q->sent;
+  }
+  pthread_mutex_unlock(&q->lock);
+  if (m.start) {
+    pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
+    pn_link_send(s, m.start, m.size);
+    pn_link_advance(s);
+    pn_delivery_settle(d);  /* Pre-settled: unreliable, there will bea no ack/ */
+    free(m.start);
+  }
+}
+
+/* Data associated with each broker connection */
+typedef struct broker_data_t {
+  bool check_queues;          /* Check senders on the connection for available data in queues. */
+} broker_data_t;
+
+/* Use the context pointer as a boolean flag to indicate we need to check queues */
+void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
+  pn_connection_set_context(c, (void*)check);
+}
+
+bool pn_connection_get_check_queues(pn_connection_t *c) {
+  return (bool)pn_connection_get_context(c);
+}
+
+/* Put a message on the queue, called in receiver dispatch loop.
+   If the queue was previously empty, notify waiting senders.
+*/
+static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
+  pthread_mutex_lock(&q->lock);
+  VEC_PUSH(q->messages, m);
+  if (q->messages.len == 1) { /* Was empty, notify waiting connections */
+    for (size_t i = 0; i < q->waiting.len; ++i) {
+      pn_connection_t *c = q->waiting.data[i];
+      pn_connection_set_check_queues(c, true);
+      pn_connection_wake(c); /* Wake the connection */
+    }
+    q->waiting.len = 0;
+  }
+  pthread_mutex_unlock(&q->lock);
+}
+
+/* Thread safe set of queues */
+typedef struct queues_t {
+  pthread_mutex_t lock;
+  queue_t *queues;
+  size_t sent;
+} queues_t;
+
+void queues_init(queues_t *qs) {
+  pthread_mutex_init(&qs->lock, NULL);
+  qs->queues = NULL;
+}
+
+void queues_destroy(queues_t *qs) {
+  for (queue_t *q = qs->queues; q; q = q->next) {
+    queue_destroy(q);
+    free(q);
+  }
+  pthread_mutex_destroy(&qs->lock);
+}
+
+/** Get or create the named queue. */
+queue_t* queues_get(queues_t *qs, const char* name) {
+  pthread_mutex_lock(&qs->lock);
+  queue_t *q;
+  for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
+    ;
+  if (!q) {
+    q = (queue_t*)malloc(sizeof(queue_t));
+    queue_init(q, name, qs->queues);
+    qs->queues = q;
+  }
+  pthread_mutex_unlock(&qs->lock);
+  return q;
+}
+
+/* The broker implementation */
+typedef struct broker_t {
+  pn_proactor_t *proactor;
+  size_t threads;
+  const char *container_id;     /* AMQP container-id */
+  queues_t queues;
+  bool finished;
+} broker_t;
+
+void broker_stop(broker_t *b) {
+  /* Interrupt the proactor to stop the working threads. */
+  pn_proactor_interrupt(b->proactor);
+}
+
+/* Try to send if link is sender and has credit */
+static void link_send(broker_t *b, pn_link_t *s) {
+  if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
+    const char *qname = pn_terminus_get_address(pn_link_source(s));
+    queue_t *q = queues_get(&b->queues, qname);
+    queue_send(q, s);
+  }
+}
+
+static void queue_unsub(queue_t *q, pn_connection_t *c) {
+  pthread_mutex_lock(&q->lock);
+  for (size_t i = 0; i < q->waiting.len; ++i) {
+    if (q->waiting.data[i] == c){
+      q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
+      VEC_POP(q->waiting);
+      break;
+    }
+  }
+  pthread_mutex_unlock(&q->lock);
+}
+
+/* Unsubscribe from the queue of interest to this link. */
+static void link_unsub(broker_t *b, pn_link_t *s) {
+  if (pn_link_is_sender(s)) {
+    const char *qname = pn_terminus_get_address(pn_link_source(s));
+    if (qname) {
+      queue_t *q = queues_get(&b->queues, qname);
+      queue_unsub(q, pn_session_connection(pn_link_session(s)));
+    }
+  }
+}
+
+/* Called in connection's event loop when a connection is woken for messages.*/
+static void connection_unsub(broker_t *b, pn_connection_t *c) {
+  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
+    link_unsub(b, l);
+}
+
+static void session_unsub(broker_t *b, pn_session_t *ssn) {
+  pn_connection_t *c = pn_session_connection(ssn);
+  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
+    if (pn_link_session(l) == ssn)
+      link_unsub(b, l);
+  }
+}
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    exit_code = 1;              /* Remeber there was an unexpected error */
+  }
+}
+
+const int WINDOW=10;            /* Incoming credit window */
+
+static void handle(broker_t* b, pn_event_t* e) {
+  pn_connection_t *c = pn_event_connection(e);
+
+  switch (pn_event_type(e)) {
+
+   case PN_LISTENER_OPEN:
+    printf("listening\n");
+    fflush(stdout);
+    break;
+
+   case PN_LISTENER_ACCEPT:
+    pn_listener_accept(pn_event_listener(e), pn_connection());
+    break;
+
+   case PN_CONNECTION_INIT:
+     pn_connection_set_container(c, b->container_id);
+     break;
+
+   case PN_CONNECTION_BOUND: {
+     /* Turn off security */
+     pn_transport_t *t = pn_connection_transport(c);
+     pn_transport_require_auth(t, false);
+     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+     break;
+   }
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_connection_open(pn_event_connection(e)); /* Complete the open */
+     break;
+   }
+   case PN_CONNECTION_WAKE: {
+     if (pn_connection_get_check_queues(c)) {
+       pn_connection_set_check_queues(c, false);
+       int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
+       for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
+         link_send(b, l);
+     }
+     break;
+   }
+   case PN_SESSION_REMOTE_OPEN: {
+     pn_session_open(pn_event_session(e));
+     break;
+   }
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t *l = pn_event_link(e);
+     if (pn_link_is_sender(l)) {
+       const char *source = pn_terminus_get_address(pn_link_remote_source(l));
+       pn_terminus_set_address(pn_link_source(l), source);
+     } else {
+       const char* target = pn_terminus_get_address(pn_link_remote_target(l));
+       pn_terminus_set_address(pn_link_target(l), target);
+       pn_link_flow(l, WINDOW);
+     }
+     pn_link_open(l);
+     break;
+   }
+   case PN_LINK_FLOW: {
+     link_send(b, pn_event_link(e));
+     break;
+   }
+   case PN_DELIVERY: {
+     pn_delivery_t *d = pn_event_delivery(e);
+     pn_link_t *r = pn_delivery_link(d);
+     if (pn_link_is_receiver(r) &&
+         pn_delivery_readable(d) && !pn_delivery_partial(d))
+     {
+       size_t size = pn_delivery_pending(d);
+       /* The broker does not decode the message, just forwards it. */
+       pn_rwbytes_t m = { size, (char*)malloc(size) };
+       pn_link_recv(r, m.start, m.size);
+       const char *qname = pn_terminus_get_address(pn_link_target(r));
+       queue_receive(b->proactor, queues_get(&b->queues, qname), m);
+       pn_delivery_update(d, PN_ACCEPTED);
+       pn_delivery_settle(d);
+       pn_link_flow(r, WINDOW - pn_link_credit(r));
+     }
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(e, pn_transport_condition(pn_event_transport(e)));
+    connection_unsub(b, pn_event_connection(e));
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
+    pn_connection_close(pn_event_connection(e));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(e, pn_session_remote_condition(pn_event_session(e)));
+    session_unsub(b, pn_event_session(e));
+    pn_session_close(pn_event_session(e));
+    pn_session_free(pn_event_session(e));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+    check_condition(e, pn_link_remote_condition(pn_event_link(e)));
+    link_unsub(b, pn_event_link(e));
+    pn_link_close(pn_event_link(e));
+    pn_link_free(pn_event_link(e));
+    break;
+
+   case PN_LISTENER_CLOSE:
+    check_condition(e, pn_listener_condition(pn_event_listener(e)));
+    broker_stop(b);
+    break;
+
+ break;
+
+   case PN_PROACTOR_INACTIVE:   /* listener and all connections closed */
+    broker_stop(b);
+    break;
+
+   case PN_PROACTOR_INTERRUPT:
+    b->finished = true;
+    pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */
+    break;
+
+   default:
+    break;
+  }
+}
+
+static void* broker_thread(void *void_broker) {
+  broker_t *b = (broker_t*)void_broker;
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(b->proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(b, e);
+    }
+    pn_proactor_done(b->proactor, events);
+  } while(!b->finished);
+  return NULL;
+}
+
+int main(int argc, char **argv) {
+  broker_t b = {0};
+  b.proactor = pn_proactor();
+  queues_init(&b.queues);
+  b.container_id = argv[0];
+  b.threads = 4;
+  int i = 1;
+  const char *host = (argc > i) ? argv[i++] : "";
+  const char *port = (argc > i) ? argv[i++] : "amqp";
+
+  /* Listen on addr */
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), host, port);
+  pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
+
+  /* Start n-1 threads */
+  pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
+  for (size_t i = 0; i < b.threads-1; ++i) {
+    pthread_create(&threads[i], NULL, broker_thread, &b);
+  }
+  broker_thread(&b);            /* Use the main thread too. */
+  /* Join the other threads */
+  for (size_t i = 0; i < b.threads-1; ++i) {
+    pthread_join(threads[i], NULL);
+  }
+  pn_proactor_free(b.proactor);
+  free(threads);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/direct.c b/examples/c/direct.c
new file mode 100644
index 0000000..15550e6
--- /dev/null
+++ b/examples/c/direct.c
@@ -0,0 +1,326 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/condition.h>
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/link.h>
+#include <proton/listener.h>
+#include <proton/message.h>
+#include <proton/proactor.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+
+typedef struct app_data_t {
+  const char *host, *port;
+  const char *amqp_address;
+  const char *container_id;
+  int message_count;
+
+  pn_proactor_t *proactor;
+  pn_listener_t *listener;
+  pn_rwbytes_t message_buffer;
+
+  /* Sender values */
+  int sent;
+  int acknowledged;
+  pn_link_t *sender;
+
+  /* Receiver values */
+  int received;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+static int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+    pn_connection_close(pn_event_connection(e));
+    exit_code = 1;
+  }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+  /* Construct a message with the map { "sequence": app.sent } */
+  pn_message_t* message = pn_message();
+  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+  pn_data_t* body = pn_message_body(message);
+  pn_data_put_map(body);
+  pn_data_enter(body);
+  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+  pn_data_put_int(body, app->sent); /* The sequence number */
+  pn_data_exit(body);
+
+  /* encode the message, expanding the encode buffer as needed */
+  if (app->message_buffer.start == NULL) {
+    static const size_t initial_size = 128;
+    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+  }
+  /* app->message_buffer is the total buffer space available. */
+  /* mbuf wil point at just the portion used by the encoded message */
+  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  int status = 0;
+  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+    app->message_buffer.size *= 2;
+    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+    mbuf.size = app->message_buffer.size;
+  }
+  if (status != 0) {
+    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+    exit(1);
+  }
+  pn_message_free(message);
+  return pn_bytes(mbuf.size, mbuf.start);
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+  static char buffer[MAX_SIZE];
+  ssize_t len;
+  // try to decode the message body
+  if (pn_delivery_pending(dlv) < MAX_SIZE) {
+    // read in the raw data
+    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+    if (len > 0) {
+      // decode it into a proton message
+      pn_message_t *m = pn_message();
+      if (PN_OK == pn_message_decode(m, buffer, len)) {
+        pn_string_t *s = pn_string(NULL);
+        pn_inspect(pn_message_body(m), s);
+        printf("%s\n", pn_string_get(s));
+        pn_free(s);
+      }
+      pn_message_free(m);
+    }
+  }
+}
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t *l = pn_event_link(event);
+     pn_link_open(l);
+     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+   } break;
+
+   case PN_DELIVERY: {
+     /* A message has been received */
+     pn_link_t *link = NULL;
+     pn_delivery_t *dlv = pn_event_delivery(event);
+     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+       link = pn_delivery_link(dlv);
+       decode_message(dlv);
+       /* Accept the delivery */
+       pn_delivery_update(dlv, PN_ACCEPTED);
+       /* done with the delivery, move to the next and free it */
+       pn_link_advance(link);
+       pn_delivery_settle(dlv);  /* dlv is now freed */
+
+       if (app->message_count == 0) {
+         /* receive forever - see if more credit is needed */
+         if (pn_link_credit(link) < BATCH/2) {
+           /* Grant enough credit to bring it up to BATCH: */
+           pn_link_flow(link, BATCH - pn_link_credit(link));
+         }
+       } else if (++app->received >= app->message_count) {
+         /* done receiving, close the endpoints */
+         printf("%d messages received\n", app->received);
+         pn_session_t *ssn = pn_link_session(link);
+         pn_link_close(link);
+         pn_session_close(ssn);
+         pn_connection_close(pn_session_connection(ssn));
+       }
+     }
+   } break;
+
+   default:
+    break;
+  }
+}
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t* l = pn_event_link(event);
+     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
+     pn_link_open(l);
+   } break;
+
+   case PN_LINK_FLOW: {
+     /* The peer has given us some credit, now we can send messages */
+     pn_link_t *sender = pn_event_link(event);
+     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
+       ++app->sent;
+       // Use sent counter as unique delivery tag.
+       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+       pn_bytes_t msgbuf = encode_message(app);
+       pn_link_send(sender, msgbuf.start, msgbuf.size);
+       pn_link_advance(sender);
+     }
+     break;
+   }
+
+   case PN_DELIVERY: {
+     /* We received acknowledgedment from the peer that a message was delivered. */
+     pn_delivery_t* d = pn_event_delivery(event);
+     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+       if (++app->acknowledged == app->message_count) {
+         printf("%d messages sent and acknowledged\n", app->acknowledged);
+         pn_connection_close(pn_event_connection(event));
+         /* Continue handling events till we receive TRANSPORT_CLOSED */
+       }
+     }
+   } break;
+
+   default:
+    break;
+  }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
+   Return true to continue, false to exit
+*/
+static bool handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LISTENER_OPEN:
+    printf("listening\n");
+    fflush(stdout);
+    break;
+
+   case PN_LISTENER_ACCEPT:
+    pn_listener_accept(pn_event_listener(event), pn_connection());
+    break;
+
+   case PN_CONNECTION_INIT:
+    pn_connection_set_container(pn_event_connection(event), app->container_id);
+    break;
+
+   case PN_CONNECTION_BOUND: {
+     /* Turn off security */
+     pn_transport_t *t = pn_event_transport(event);
+     pn_transport_require_auth(t, false);
+     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+   }
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_connection_open(pn_event_connection(event)); /* Complete the open */
+     break;
+   }
+
+   case PN_SESSION_REMOTE_OPEN: {
+     pn_session_open(pn_event_session(event));
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    pn_listener_close(app->listener); /* Finished */
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_TIMEOUT:
+    /* Wake the sender's connection */
+    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
+    break;
+
+   case PN_LISTENER_CLOSE:
+    check_condition(event, pn_listener_condition(pn_event_listener(event)));
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    return false;
+    break;
+
+   default: {
+     pn_link_t *l = pn_event_link(event);
+     if (l) {                      /* Only delegate link-related events */
+       if (pn_link_is_sender(l)) {
+         handle_send(app, event);
+       } else {
+         handle_receive(app, event);
+       }
+     }
+   }
+  }
+  return true;
+}
+
+void run(app_data_t *app) {
+  /* Loop and handle events */
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
+    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
+      if (!handle(app, e)) {
+        return;
+      }
+    }
+    pn_proactor_done(app->proactor, events);
+  } while(true);
+}
+
+int main(int argc, char **argv) {
+  struct app_data_t app = {0};
+  int i = 0;
+  app.container_id = argv[i++];   /* Should be unique */
+  app.host = (argc > 1) ? argv[i++] : "";
+  app.port = (argc > 1) ? argv[i++] : "amqp";
+  app.amqp_address = (argc > i) ? argv[i++] : "examples";
+  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
+
+  /* Create the proactor and connect */
+  app.proactor = pn_proactor();
+  app.listener = pn_listener();
+  char addr[PN_MAX_ADDR];
+  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
+  pn_proactor_listen(app.proactor, app.listener, addr, 16);
+  run(&app);
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/example_test.py b/examples/c/example_test.py
new file mode 100644
index 0000000..02bb1fd
--- /dev/null
+++ b/examples/c/example_test.py
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# This is a test script to run the examples and verify that they behave as expected.
+
+import unittest, sys, time
+from proctest import *
+
+def python_cmd(name):
+    dir = os.path.dirname(__file__)
+    return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
+
+def receive_expect(n):
+    return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
+
+class Broker(object):
+    def __init__(self, test):
+        self.test = test
+
+    def __enter__(self):
+        with TestPort() as tp:
+            self.port = tp.port
+            self.host = tp.host
+            self.addr = tp.addr
+            self.proc = self.test.proc(["broker", "", self.port])
+            self.proc.wait_re("listening")
+            return self
+
+    def __exit__(self, *args):
+        b = getattr(self, "proc")
+        if b:
+            if b.poll() !=  None: # Broker crashed
+                raise ProcError(b, "broker crash")
+            b.kill()
+
+class CExampleTest(ProcTestCase):
+
+    def test_send_receive(self):
+        """Send first then receive"""
+        with Broker(self) as b:
+            s = self.proc(["send", "", b.port])
+            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
+            r = self.proc(["receive", "", b.port])
+            self.assertEqual(receive_expect(10), r.wait_exit())
+
+    def test_receive_send(self):
+        """Start receiving  first, then send."""
+        with Broker(self) as b:
+            r = self.proc(["receive", "", b.port]);
+            s = self.proc(["send", "", b.port]);
+            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
+            self.assertEqual(receive_expect(10), r.wait_exit())
+
+    def test_send_direct(self):
+        """Send to direct server"""
+        with TestPort() as tp:
+            d = self.proc(["direct", "", tp.port])
+            d.wait_re("listening")
+            self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
+            self.assertIn(receive_expect(10), d.wait_exit())
+
+    def test_receive_direct(self):
+        """Receive from direct server"""
+        with TestPort() as tp:
+            d = self.proc(["direct", "", tp.port])
+            d.wait_re("listening")
+            self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
+            self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
+
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/messenger/CMakeLists.txt b/examples/c/messenger/CMakeLists.txt
deleted file mode 100644
index d4fec71..0000000
--- a/examples/c/messenger/CMakeLists.txt
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED)
-
-set (messenger-examples
-  recv.c
-  send.c
-  recv-async.c
-  send-async.c
-  )
-
-set_source_files_properties (
-  ${messenger-examples}
-  PROPERTIES
-  COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}"
-  )
-
-if (BUILD_WITH_CXX)
-  set_source_files_properties (
-    ${messenger-examples}
-    PROPERTIES LANGUAGE CXX
-    )
-endif (BUILD_WITH_CXX)
-
-add_executable(recv recv.c)
-add_executable(send send.c)
-add_executable(recv-async recv-async.c)
-add_executable(send-async send-async.c)
-
-include_directories(${Proton_INCLUDE_DIRS})
-
-target_link_libraries(recv ${Proton_LIBRARIES})
-target_link_libraries(send ${Proton_LIBRARIES})
-target_link_libraries(recv-async ${Proton_LIBRARIES})
-target_link_libraries(send-async ${Proton_LIBRARIES})

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/recv-async.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/recv-async.c b/examples/c/messenger/recv-async.c
deleted file mode 100644
index 1f49166..0000000
--- a/examples/c/messenger/recv-async.c
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// This is a re-implementation of recv.c using non-blocking/asynchronous calls.
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <ctype.h>
-
-#if EMSCRIPTEN
-#include <emscripten.h>
-#endif
-
-pn_message_t * message;
-pn_messenger_t * messenger;
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-    fprintf(stderr, "%s:%i: %s\n", file, line, message);
-    exit(1);
-}
-
-void usage(void)
-{
-    printf("Usage: recv [options] <addr>\n");
-    printf("-c    \tPath to the certificate file.\n");
-    printf("-k    \tPath to the private key file.\n");
-    printf("-p    \tPassword for the private key.\n");
-    printf("<addr>\tAn address.\n");
-    exit(0);
-}
-
-void process(void) {
-    while(pn_messenger_incoming(messenger))
-    {
-        pn_messenger_get(messenger, message);
-        check(messenger);
-
-        {
-        pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger);
-        char buffer[1024];
-        size_t buffsize = sizeof(buffer);
-        const char* subject = pn_message_get_subject(message);
-        pn_data_t* body = pn_message_body(message);
-        pn_data_format(body, buffer, &buffsize);
-
-        printf("Address: %s\n", pn_message_get_address(message));
-        printf("Subject: %s\n", subject ? subject : "(no subject)");
-        printf("Content: %s\n", buffer);
-
-        pn_messenger_accept(messenger, tracker, 0);
-        }
-    }
-}
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-void pump(int fd, void* userData) {
-    while (pn_messenger_work(messenger, 0) >= 0) {
-        process();
-    }
-}
-
-void onclose(int fd, void* userData) {
-    process();
-}
-
-void onerror(int fd, int errno, const char* msg, void* userData) {
-    printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg);
-}
-#endif
-
-int main(int argc, char** argv)
-{
-    char* certificate = NULL;
-    char* privatekey = NULL;
-    char* password = NULL;
-    char* address = (char *) "amqp://~0.0.0.0";
-    int c;
-
-    message = pn_message();
-    messenger = pn_messenger(NULL);
-    pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously.
-
-    opterr = 0;
-
-    while((c = getopt(argc, argv, "hc:k:p:")) != -1)
-    {
-        switch(c)
-        {
-            case 'h':
-                usage();
-                break;
-
-            case 'c': certificate = optarg; break;
-            case 'k': privatekey = optarg; break;
-            case 'p': password = optarg; break;
-
-            case '?':
-                if (optopt == 'c' ||
-                    optopt == 'k' ||
-                    optopt == 'p')
-                {
-                    fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-                }
-                else if(isprint(optopt))
-                {
-                    fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-                }
-                else
-                {
-                    fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-                }
-                return 1;
-            default:
-                abort();
-        }
-    }
-
-    if (optind < argc)
-    {
-        address = argv[optind];
-    }
-
-    /* load the various command line options if they're set */
-    if(certificate)
-    {
-        pn_messenger_set_certificate(messenger, certificate);
-    }
-
-    if(privatekey)
-    {
-        pn_messenger_set_private_key(messenger, privatekey);
-    }
-
-    if(password)
-    {
-        pn_messenger_set_password(messenger, password);
-    }
-
-    pn_messenger_start(messenger);
-    check(messenger);
-
-    pn_messenger_subscribe(messenger, address);
-    check(messenger);
-
-    pn_messenger_recv(messenger, -1); // Set to receive as many messages as messenger can buffer.
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-    emscripten_set_socket_error_callback(NULL, onerror);
-
-    emscripten_set_socket_open_callback(NULL, pump);
-    emscripten_set_socket_connection_callback(NULL, pump);
-    emscripten_set_socket_message_callback(NULL, pump);
-    emscripten_set_socket_close_callback(NULL, onclose);
-#else // For native compiler.
-    while (1) {
-        pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
-        process();
-    }
-#endif
-
-    return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/recv.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/recv.c b/examples/c/messenger/recv.c
deleted file mode 100644
index 16e8321..0000000
--- a/examples/c/messenger/recv.c
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <ctype.h>
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-  fprintf(stderr, "%s:%i: %s\n", file, line, message);
-  exit(1);
-}
-
-void usage(void)
-{
-  printf("Usage: recv [options] <addr>\n");
-  printf("-c    \tPath to the certificate file.\n");
-  printf("-k    \tPath to the private key file.\n");
-  printf("-p    \tPassword for the private key.\n");
-  printf("<addr>\tAn address.\n");
-  exit(0);
-}
-
-int main(int argc, char** argv)
-{
-  char* certificate = NULL;
-  char* privatekey = NULL;
-  char* password = NULL;
-  char* address = (char *) "amqp://~0.0.0.0";
-  int c;
-
-  pn_message_t * message;
-  pn_messenger_t * messenger;
-
-  message = pn_message();
-  messenger = pn_messenger(NULL);
-
-  opterr = 0;
-
-  while((c = getopt(argc, argv, "hc:k:p:")) != -1)
-  {
-    switch(c)
-    {
-    case 'h':
-      usage();
-      break;
-
-    case 'c': certificate = optarg; break;
-    case 'k': privatekey = optarg; break;
-    case 'p': password = optarg; break;
-
-    case '?':
-      if(optopt == 'c' ||
-         optopt == 'k' ||
-         optopt == 'p')
-      {
-        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-      }
-      else if(isprint(optopt))
-      {
-        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-      }
-      else
-      {
-        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-      }
-      return 1;
-    default:
-      abort();
-    }
-  }
-
-  if (optind < argc)
-  {
-    address = argv[optind];
-  }
-
-  /* load the various command line options if they're set */
-  if(certificate)
-  {
-    pn_messenger_set_certificate(messenger, certificate);
-  }
-
-  if(privatekey)
-  {
-    pn_messenger_set_private_key(messenger, privatekey);
-  }
-
-  if(password)
-  {
-    pn_messenger_set_password(messenger, password);
-  }
-
-  pn_messenger_start(messenger);
-  check(messenger);
-
-  pn_messenger_subscribe(messenger, address);
-  check(messenger);
-
-  for(;;)
-  {
-    pn_messenger_recv(messenger, 1024);
-    check(messenger);
-
-    while(pn_messenger_incoming(messenger))
-    {
-      pn_messenger_get(messenger, message);
-      check(messenger);
-
-      {
-      char buffer[1024];
-      size_t buffsize = sizeof(buffer);
-      const char* subject = pn_message_get_subject(message);
-      pn_data_t *body = pn_message_body(message);
-      pn_data_format(body, buffer, &buffsize);
-
-      printf("Address: %s\n", pn_message_get_address(message));
-      printf("Subject: %s\n", subject ? subject : "(no subject)");
-      printf("Content: %s\n", buffer);
-      }
-    }
-  }
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/send-async.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/send-async.c b/examples/c/messenger/send-async.c
deleted file mode 100644
index de9b023..0000000
--- a/examples/c/messenger/send-async.c
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// This is a re-implementation of send.c using non-blocking/asynchronous calls.
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <ctype.h>
-
-#if EMSCRIPTEN
-#include <emscripten.h>
-#endif
-
-pn_message_t * message;
-pn_messenger_t * messenger;
-pn_tracker_t tracker;
-int running = 1;
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-    fprintf(stderr, "%s:%i: %s\n", file, line, message);
-    exit(1);
-}
-
-void usage(void)
-{
-    printf("Usage: send [-a addr] [message]\n");
-    printf("-a     \tThe target address [amqp[s]://domain[/name]]\n");
-    printf("message\tA text string to send.\n");
-    exit(0);
-}
-
-void process(void) {
-    pn_status_t status = pn_messenger_status(messenger, tracker);
-    if (status != PN_STATUS_PENDING) {
-        if (running) {
-            pn_messenger_stop(messenger);
-            running = 0;
-        } 
-    }
-
-    if (pn_messenger_stopped(messenger)) {
-        pn_message_free(message);
-        pn_messenger_free(messenger);
-        message = NULL;
-        messenger = NULL;
-    }
-}
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-void pump(int fd, void* userData) {
-    while (pn_messenger_work(messenger, 0) >= 0) {
-        process();
-    }
-}
-
-void onclose(int fd, void* userData) {
-    process();
-}
-
-void onerror(int fd, int errno, const char* msg, void* userData) {
-    printf("error callback fd = %d, errno = %d, msg = %s\n", fd, errno, msg);
-}
-#endif
-
-int main(int argc, char** argv)
-{
-    int c;
-    char * address = (char *) "amqp://0.0.0.0";
-    char * msgtext = (char *) "Hello World!";
-    pn_data_t* body;
-
-    opterr = 0;
-
-    while((c = getopt(argc, argv, "ha:b:c:")) != -1)
-    {
-        switch(c)
-        {
-            case 'a': address = optarg; break;
-            case 'h': usage(); break;
-
-            case '?':
-                if(optopt == 'a')
-                {
-                    fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-                }
-                else if(isprint(optopt))
-                {
-                    fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-                }
-                else
-                {
-                    fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-                }
-                return 1;
-            default:
-                abort();
-        }
-    }
-
-    if (optind < argc) msgtext = argv[optind];
-
-    message = pn_message();
-    messenger = pn_messenger(NULL);
-    pn_messenger_set_blocking(messenger, false); // Needs to be set non-blocking to behave asynchronously.
-    pn_messenger_set_outgoing_window(messenger, 1024); 
-
-    pn_messenger_start(messenger);
-
-    pn_message_set_address(message, address);
-    body = pn_message_body(message);
-    pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
-
-    pn_messenger_put(messenger, message);
-    check(messenger);
-
-    tracker = pn_messenger_outgoing_tracker(messenger);
-
-#if EMSCRIPTEN // For emscripten C/C++ to JavaScript compiler.
-    emscripten_set_socket_error_callback(NULL, onerror);
-
-    emscripten_set_socket_open_callback(NULL, pump);
-    emscripten_set_socket_connection_callback(NULL, pump);
-    emscripten_set_socket_message_callback(NULL, pump);
-    emscripten_set_socket_close_callback(NULL, onclose);
-#else // For native compiler.
-    while (running) {
-        pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
-        process();
-    }
-
-    while (messenger && !pn_messenger_stopped(messenger)) {
-        pn_messenger_work(messenger, 0);
-        process();
-    }
-#endif
-
-    return 0;
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/messenger/send.c
----------------------------------------------------------------------
diff --git a/examples/c/messenger/send.c b/examples/c/messenger/send.c
deleted file mode 100644
index 11b47ff..0000000
--- a/examples/c/messenger/send.c
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/message.h"
-#include "proton/messenger.h"
-
-#include "pncompat/misc_funcs.inc"
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <ctype.h>
-
-#define check(messenger)                                                     \
-  {                                                                          \
-    if(pn_messenger_errno(messenger))                                        \
-    {                                                                        \
-      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
-    }                                                                        \
-  }                                                                          \
-
-void die(const char *file, int line, const char *message)
-{
-  fprintf(stderr, "%s:%i: %s\n", file, line, message);
-  exit(1);
-}
-
-void usage(void)
-{
-  printf("Usage: send [-a addr] [message]\n");
-  printf("-a     \tThe target address [amqp[s]://domain[/name]]\n");
-  printf("message\tA text string to send.\n");
-  exit(0);
-}
-
-int main(int argc, char** argv)
-{
-  int c;
-  char * address = (char *) "amqp://0.0.0.0";
-  char * msgtext = (char *) "Hello World!";
-  opterr = 0;
-
-  while((c = getopt(argc, argv, "ha:b:c:")) != -1)
-  {
-    switch(c)
-    {
-    case 'a': address = optarg; break;
-    case 'h': usage(); break;
-
-    case '?':
-      if(optopt == 'a')
-      {
-        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
-      }
-      else if(isprint(optopt))
-      {
-        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
-      }
-      else
-      {
-        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
-      }
-      return 1;
-    default:
-      abort();
-    }
-  }
-
-  if (optind < argc) msgtext = argv[optind];
-
-  {
-  pn_message_t * message;
-  pn_messenger_t * messenger;
-  pn_data_t * body;
-
-  message = pn_message();
-  messenger = pn_messenger(NULL);
-
-  pn_messenger_start(messenger);
-
-  pn_message_set_address(message, address);
-  body = pn_message_body(message);
-  pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
-  pn_messenger_put(messenger, message);
-  check(messenger);
-  pn_messenger_send(messenger, -1);
-  check(messenger);
-
-  pn_messenger_stop(messenger);
-  pn_messenger_free(messenger);
-  pn_message_free(message);
-  }
-
-  return 0;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
deleted file mode 100644
index 6ea8aaf..0000000
--- a/examples/c/proactor/CMakeLists.txt
+++ /dev/null
@@ -1,44 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-find_package(Proton REQUIRED Core Proactor)
-set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
-find_package(Threads REQUIRED)
-
-include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
-include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS})
-
-add_definitions(${COMPILE_LANGUAGE_FLAGS} ${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION})
-
-# Add a test with the correct environment to find test executables and valgrind.
-if(WIN32)
-  set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>")
-else(WIN32)
-  set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}")
-endif(WIN32)
-
-foreach(name broker send receive direct)
-  add_executable(proactor-${name} ${name}.c)
-  target_link_libraries(proactor-${name} ${Proton_Proactor_LIBRARIES} ${Proton_Core_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
-  set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name})
-endforeach()
-
-set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV})
-
-add_test(c-example-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
deleted file mode 100644
index a548d35..0000000
--- a/examples/c/proactor/README.dox
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * @example send.c
- *
- * Send a fixed number of messages to the "examples" node.
- * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
- *
- * @example receive.c
- *
- * Subscribes to the 'example' node and prints the message bodies received.
- * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
- *
- * @example direct.c
- *
- * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
- * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
- * and will act as the directly-connected counterpart (receive or send)
- *
- * @example broker.c
- *
- * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
- */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
deleted file mode 100644
index e0d9672..0000000
--- a/examples/c/proactor/broker.c
+++ /dev/null
@@ -1,424 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "thread.h"
-
-#include <proton/engine.h>
-#include <proton/listener.h>
-#include <proton/proactor.h>
-#include <proton/sasl.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-/* Simple re-sizable vector that acts as a queue */
-#define VEC(T) struct { T* data; size_t len, cap; }
-
-#define VEC_INIT(V)                             \
-  do {                                          \
-    V.len = 0;                                  \
-    V.cap = 16;                                 \
-    void **vp = (void**)&V.data;                \
-    *vp = malloc(V.cap * sizeof(*V.data));      \
-  } while(0)
-
-#define VEC_FINAL(V) free(V.data)
-
-#define VEC_PUSH(V, X)                                  \
-  do {                                                  \
-    if (V.len == V.cap) {                               \
-      V.cap *= 2;                                       \
-      void **vp = (void**)&V.data;                      \
-      *vp = realloc(V.data, V.cap * sizeof(*V.data));   \
-    }                                                   \
-    V.data[V.len++] = X;                                \
-  } while(0)                                            \
-
-#define VEC_POP(V)                                              \
-  do {                                                          \
-    if (V.len > 0)                                              \
-      memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data));     \
-  } while(0)
-
-/* Simple thread-safe queue implementation */
-typedef struct queue_t {
-  pthread_mutex_t lock;
-  char name[256];
-  VEC(pn_rwbytes_t) messages;   /* Messages on the queue_t */
-  VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */
-  struct queue_t *next;            /* Next queue in chain */
-  size_t sent;                     /* Count of messages sent, used as delivery tag */
-} queue_t;
-
-static void queue_init(queue_t *q, const char* name, queue_t *next) {
-  pthread_mutex_init(&q->lock, NULL);
-  strncpy(q->name, name, sizeof(q->name));
-  VEC_INIT(q->messages);
-  VEC_INIT(q->waiting);
-  q->next = next;
-  q->sent = 0;
-}
-
-static void queue_destroy(queue_t *q) {
-  pthread_mutex_destroy(&q->lock);
-  free(q->name);
-  for (size_t i = 0; i < q->messages.len; ++i)
-    free(q->messages.data[i].start);
-  VEC_FINAL(q->messages);
-  for (size_t i = 0; i < q->waiting.len; ++i)
-    pn_decref(q->waiting.data[i]);
-  VEC_FINAL(q->waiting);
-}
-
-/* Send a message on s, or record s as eating if no messages.
-   Called in s dispatch loop, assumes s has credit.
-*/
-static void queue_send(queue_t *q, pn_link_t *s) {
-  pn_rwbytes_t m = { 0 };
-  size_t tag = 0;
-  pthread_mutex_lock(&q->lock);
-  if (q->messages.len == 0) { /* Empty, record connection as waiting */
-    /* Record connection for wake-up if not already on the list. */
-    pn_connection_t *c = pn_session_connection(pn_link_session(s));
-    size_t i = 0;
-    for (; i < q->waiting.len && q->waiting.data[i] != c; ++i)
-      ;
-    if (i == q->waiting.len) {
-      VEC_PUSH(q->waiting, c);
-    }
-  } else {
-    m = q->messages.data[0];
-    VEC_POP(q->messages);
-    tag = ++q->sent;
-  }
-  pthread_mutex_unlock(&q->lock);
-  if (m.start) {
-    pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag)));
-    pn_link_send(s, m.start, m.size);
-    pn_link_advance(s);
-    pn_delivery_settle(d);  /* Pre-settled: unreliable, there will bea no ack/ */
-    free(m.start);
-  }
-}
-
-/* Data associated with each broker connection */
-typedef struct broker_data_t {
-  bool check_queues;          /* Check senders on the connection for available data in queues. */
-} broker_data_t;
-
-/* Use the context pointer as a boolean flag to indicate we need to check queues */
-void pn_connection_set_check_queues(pn_connection_t *c, bool check) {
-  pn_connection_set_context(c, (void*)check);
-}
-
-bool pn_connection_get_check_queues(pn_connection_t *c) {
-  return (bool)pn_connection_get_context(c);
-}
-
-/* Put a message on the queue, called in receiver dispatch loop.
-   If the queue was previously empty, notify waiting senders.
-*/
-static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) {
-  pthread_mutex_lock(&q->lock);
-  VEC_PUSH(q->messages, m);
-  if (q->messages.len == 1) { /* Was empty, notify waiting connections */
-    for (size_t i = 0; i < q->waiting.len; ++i) {
-      pn_connection_t *c = q->waiting.data[i];
-      pn_connection_set_check_queues(c, true);
-      pn_connection_wake(c); /* Wake the connection */
-    }
-    q->waiting.len = 0;
-  }
-  pthread_mutex_unlock(&q->lock);
-}
-
-/* Thread safe set of queues */
-typedef struct queues_t {
-  pthread_mutex_t lock;
-  queue_t *queues;
-  size_t sent;
-} queues_t;
-
-void queues_init(queues_t *qs) {
-  pthread_mutex_init(&qs->lock, NULL);
-  qs->queues = NULL;
-}
-
-void queues_destroy(queues_t *qs) {
-  for (queue_t *q = qs->queues; q; q = q->next) {
-    queue_destroy(q);
-    free(q);
-  }
-  pthread_mutex_destroy(&qs->lock);
-}
-
-/** Get or create the named queue. */
-queue_t* queues_get(queues_t *qs, const char* name) {
-  pthread_mutex_lock(&qs->lock);
-  queue_t *q;
-  for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next)
-    ;
-  if (!q) {
-    q = (queue_t*)malloc(sizeof(queue_t));
-    queue_init(q, name, qs->queues);
-    qs->queues = q;
-  }
-  pthread_mutex_unlock(&qs->lock);
-  return q;
-}
-
-/* The broker implementation */
-typedef struct broker_t {
-  pn_proactor_t *proactor;
-  size_t threads;
-  const char *container_id;     /* AMQP container-id */
-  queues_t queues;
-  bool finished;
-} broker_t;
-
-void broker_stop(broker_t *b) {
-  /* Interrupt the proactor to stop the working threads. */
-  pn_proactor_interrupt(b->proactor);
-}
-
-/* Try to send if link is sender and has credit */
-static void link_send(broker_t *b, pn_link_t *s) {
-  if (pn_link_is_sender(s) && pn_link_credit(s) > 0) {
-    const char *qname = pn_terminus_get_address(pn_link_source(s));
-    queue_t *q = queues_get(&b->queues, qname);
-    queue_send(q, s);
-  }
-}
-
-static void queue_unsub(queue_t *q, pn_connection_t *c) {
-  pthread_mutex_lock(&q->lock);
-  for (size_t i = 0; i < q->waiting.len; ++i) {
-    if (q->waiting.data[i] == c){
-      q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */
-      VEC_POP(q->waiting);
-      break;
-    }
-  }
-  pthread_mutex_unlock(&q->lock);
-}
-
-/* Unsubscribe from the queue of interest to this link. */
-static void link_unsub(broker_t *b, pn_link_t *s) {
-  if (pn_link_is_sender(s)) {
-    const char *qname = pn_terminus_get_address(pn_link_source(s));
-    if (qname) {
-      queue_t *q = queues_get(&b->queues, qname);
-      queue_unsub(q, pn_session_connection(pn_link_session(s)));
-    }
-  }
-}
-
-/* Called in connection's event loop when a connection is woken for messages.*/
-static void connection_unsub(broker_t *b, pn_connection_t *c) {
-  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0))
-    link_unsub(b, l);
-}
-
-static void session_unsub(broker_t *b, pn_session_t *ssn) {
-  pn_connection_t *c = pn_session_connection(ssn);
-  for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) {
-    if (pn_link_session(l) == ssn)
-      link_unsub(b, l);
-  }
-}
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    exit_code = 1;              /* Remeber there was an unexpected error */
-  }
-}
-
-const int WINDOW=10;            /* Incoming credit window */
-
-static void handle(broker_t* b, pn_event_t* e) {
-  pn_connection_t *c = pn_event_connection(e);
-
-  switch (pn_event_type(e)) {
-
-   case PN_LISTENER_OPEN:
-    printf("listening\n");
-    fflush(stdout);
-    break;
-
-   case PN_LISTENER_ACCEPT:
-    pn_listener_accept(pn_event_listener(e), pn_connection());
-    break;
-
-   case PN_CONNECTION_INIT:
-     pn_connection_set_container(c, b->container_id);
-     break;
-
-   case PN_CONNECTION_BOUND: {
-     /* Turn off security */
-     pn_transport_t *t = pn_connection_transport(c);
-     pn_transport_require_auth(t, false);
-     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
-     break;
-   }
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_connection_open(pn_event_connection(e)); /* Complete the open */
-     break;
-   }
-   case PN_CONNECTION_WAKE: {
-     if (pn_connection_get_check_queues(c)) {
-       pn_connection_set_check_queues(c, false);
-       int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE;
-       for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags))
-         link_send(b, l);
-     }
-     break;
-   }
-   case PN_SESSION_REMOTE_OPEN: {
-     pn_session_open(pn_event_session(e));
-     break;
-   }
-   case PN_LINK_REMOTE_OPEN: {
-     pn_link_t *l = pn_event_link(e);
-     if (pn_link_is_sender(l)) {
-       const char *source = pn_terminus_get_address(pn_link_remote_source(l));
-       pn_terminus_set_address(pn_link_source(l), source);
-     } else {
-       const char* target = pn_terminus_get_address(pn_link_remote_target(l));
-       pn_terminus_set_address(pn_link_target(l), target);
-       pn_link_flow(l, WINDOW);
-     }
-     pn_link_open(l);
-     break;
-   }
-   case PN_LINK_FLOW: {
-     link_send(b, pn_event_link(e));
-     break;
-   }
-   case PN_DELIVERY: {
-     pn_delivery_t *d = pn_event_delivery(e);
-     pn_link_t *r = pn_delivery_link(d);
-     if (pn_link_is_receiver(r) &&
-         pn_delivery_readable(d) && !pn_delivery_partial(d))
-     {
-       size_t size = pn_delivery_pending(d);
-       /* The broker does not decode the message, just forwards it. */
-       pn_rwbytes_t m = { size, (char*)malloc(size) };
-       pn_link_recv(r, m.start, m.size);
-       const char *qname = pn_terminus_get_address(pn_link_target(r));
-       queue_receive(b->proactor, queues_get(&b->queues, qname), m);
-       pn_delivery_update(d, PN_ACCEPTED);
-       pn_delivery_settle(d);
-       pn_link_flow(r, WINDOW - pn_link_credit(r));
-     }
-     break;
-   }
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(e, pn_transport_condition(pn_event_transport(e)));
-    connection_unsub(b, pn_event_connection(e));
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(e, pn_connection_remote_condition(pn_event_connection(e)));
-    pn_connection_close(pn_event_connection(e));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(e, pn_session_remote_condition(pn_event_session(e)));
-    session_unsub(b, pn_event_session(e));
-    pn_session_close(pn_event_session(e));
-    pn_session_free(pn_event_session(e));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-    check_condition(e, pn_link_remote_condition(pn_event_link(e)));
-    link_unsub(b, pn_event_link(e));
-    pn_link_close(pn_event_link(e));
-    pn_link_free(pn_event_link(e));
-    break;
-
-   case PN_LISTENER_CLOSE:
-    check_condition(e, pn_listener_condition(pn_event_listener(e)));
-    broker_stop(b);
-    break;
-
- break;
-
-   case PN_PROACTOR_INACTIVE:   /* listener and all connections closed */
-    broker_stop(b);
-    break;
-
-   case PN_PROACTOR_INTERRUPT:
-    b->finished = true;
-    pn_proactor_interrupt(b->proactor); /* Pass along the interrupt to the other threads */
-    break;
-
-   default:
-    break;
-  }
-}
-
-static void* broker_thread(void *void_broker) {
-  broker_t *b = (broker_t*)void_broker;
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(b->proactor);
-    pn_event_t *e;
-    while ((e = pn_event_batch_next(events))) {
-      handle(b, e);
-    }
-    pn_proactor_done(b->proactor, events);
-  } while(!b->finished);
-  return NULL;
-}
-
-int main(int argc, char **argv) {
-  broker_t b = {0};
-  b.proactor = pn_proactor();
-  queues_init(&b.queues);
-  b.container_id = argv[0];
-  b.threads = 4;
-  int i = 1;
-  const char *host = (argc > i) ? argv[i++] : "";
-  const char *port = (argc > i) ? argv[i++] : "amqp";
-
-  /* Listen on addr */
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), host, port);
-  pn_proactor_listen(b.proactor, pn_listener(), addr, 16);
-
-  /* Start n-1 threads */
-  pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads);
-  for (size_t i = 0; i < b.threads-1; ++i) {
-    pthread_create(&threads[i], NULL, broker_thread, &b);
-  }
-  broker_thread(&b);            /* Use the main thread too. */
-  /* Join the other threads */
-  for (size_t i = 0; i < b.threads-1; ++i) {
-    pthread_join(threads[i], NULL);
-  }
-  pn_proactor_free(b.proactor);
-  free(threads);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
deleted file mode 100644
index 15550e6..0000000
--- a/examples/c/proactor/direct.c
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/condition.h>
-#include <proton/connection.h>
-#include <proton/delivery.h>
-#include <proton/link.h>
-#include <proton/listener.h>
-#include <proton/message.h>
-#include <proton/proactor.h>
-#include <proton/sasl.h>
-#include <proton/session.h>
-#include <proton/transport.h>
-
-#include <stdio.h>
-#include <stdlib.h>
-
-typedef struct app_data_t {
-  const char *host, *port;
-  const char *amqp_address;
-  const char *container_id;
-  int message_count;
-
-  pn_proactor_t *proactor;
-  pn_listener_t *listener;
-  pn_rwbytes_t message_buffer;
-
-  /* Sender values */
-  int sent;
-  int acknowledged;
-  pn_link_t *sender;
-
-  /* Receiver values */
-  int received;
-} app_data_t;
-
-static const int BATCH = 1000; /* Batch size for unlimited receive */
-
-static int exit_code = 0;
-
-static void check_condition(pn_event_t *e, pn_condition_t *cond) {
-  if (pn_condition_is_set(cond)) {
-    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
-            pn_condition_get_name(cond), pn_condition_get_description(cond));
-    pn_connection_close(pn_event_connection(e));
-    exit_code = 1;
-  }
-}
-
-/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
-static pn_bytes_t encode_message(app_data_t* app) {
-  /* Construct a message with the map { "sequence": app.sent } */
-  pn_message_t* message = pn_message();
-  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
-  pn_data_t* body = pn_message_body(message);
-  pn_data_put_map(body);
-  pn_data_enter(body);
-  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
-  pn_data_put_int(body, app->sent); /* The sequence number */
-  pn_data_exit(body);
-
-  /* encode the message, expanding the encode buffer as needed */
-  if (app->message_buffer.start == NULL) {
-    static const size_t initial_size = 128;
-    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
-  }
-  /* app->message_buffer is the total buffer space available. */
-  /* mbuf wil point at just the portion used by the encoded message */
-  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
-  int status = 0;
-  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
-    app->message_buffer.size *= 2;
-    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
-    mbuf.size = app->message_buffer.size;
-  }
-  if (status != 0) {
-    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
-    exit(1);
-  }
-  pn_message_free(message);
-  return pn_bytes(mbuf.size, mbuf.start);
-}
-
-#define MAX_SIZE 1024
-
-static void decode_message(pn_delivery_t *dlv) {
-  static char buffer[MAX_SIZE];
-  ssize_t len;
-  // try to decode the message body
-  if (pn_delivery_pending(dlv) < MAX_SIZE) {
-    // read in the raw data
-    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
-    if (len > 0) {
-      // decode it into a proton message
-      pn_message_t *m = pn_message();
-      if (PN_OK == pn_message_decode(m, buffer, len)) {
-        pn_string_t *s = pn_string(NULL);
-        pn_inspect(pn_message_body(m), s);
-        printf("%s\n", pn_string_get(s));
-        pn_free(s);
-      }
-      pn_message_free(m);
-    }
-  }
-}
-
-/* This function handles events when we are acting as the receiver */
-static void handle_receive(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_LINK_REMOTE_OPEN: {
-     pn_link_t *l = pn_event_link(event);
-     pn_link_open(l);
-     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
-   } break;
-
-   case PN_DELIVERY: {
-     /* A message has been received */
-     pn_link_t *link = NULL;
-     pn_delivery_t *dlv = pn_event_delivery(event);
-     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
-       link = pn_delivery_link(dlv);
-       decode_message(dlv);
-       /* Accept the delivery */
-       pn_delivery_update(dlv, PN_ACCEPTED);
-       /* done with the delivery, move to the next and free it */
-       pn_link_advance(link);
-       pn_delivery_settle(dlv);  /* dlv is now freed */
-
-       if (app->message_count == 0) {
-         /* receive forever - see if more credit is needed */
-         if (pn_link_credit(link) < BATCH/2) {
-           /* Grant enough credit to bring it up to BATCH: */
-           pn_link_flow(link, BATCH - pn_link_credit(link));
-         }
-       } else if (++app->received >= app->message_count) {
-         /* done receiving, close the endpoints */
-         printf("%d messages received\n", app->received);
-         pn_session_t *ssn = pn_link_session(link);
-         pn_link_close(link);
-         pn_session_close(ssn);
-         pn_connection_close(pn_session_connection(ssn));
-       }
-     }
-   } break;
-
-   default:
-    break;
-  }
-}
-
-/* This function handles events when we are acting as the sender */
-static void handle_send(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_LINK_REMOTE_OPEN: {
-     pn_link_t* l = pn_event_link(event);
-     pn_terminus_set_address(pn_link_target(l), app->amqp_address);
-     pn_link_open(l);
-   } break;
-
-   case PN_LINK_FLOW: {
-     /* The peer has given us some credit, now we can send messages */
-     pn_link_t *sender = pn_event_link(event);
-     while (pn_link_credit(sender) > 0 && app->sent < app->message_count) {
-       ++app->sent;
-       // Use sent counter as unique delivery tag.
-       pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
-       pn_bytes_t msgbuf = encode_message(app);
-       pn_link_send(sender, msgbuf.start, msgbuf.size);
-       pn_link_advance(sender);
-     }
-     break;
-   }
-
-   case PN_DELIVERY: {
-     /* We received acknowledgedment from the peer that a message was delivered. */
-     pn_delivery_t* d = pn_event_delivery(event);
-     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
-       if (++app->acknowledged == app->message_count) {
-         printf("%d messages sent and acknowledged\n", app->acknowledged);
-         pn_connection_close(pn_event_connection(event));
-         /* Continue handling events till we receive TRANSPORT_CLOSED */
-       }
-     }
-   } break;
-
-   default:
-    break;
-  }
-}
-
-/* Handle all events, delegate to handle_send or handle_receive depending on link mode.
-   Return true to continue, false to exit
-*/
-static bool handle(app_data_t* app, pn_event_t* event) {
-  switch (pn_event_type(event)) {
-
-   case PN_LISTENER_OPEN:
-    printf("listening\n");
-    fflush(stdout);
-    break;
-
-   case PN_LISTENER_ACCEPT:
-    pn_listener_accept(pn_event_listener(event), pn_connection());
-    break;
-
-   case PN_CONNECTION_INIT:
-    pn_connection_set_container(pn_event_connection(event), app->container_id);
-    break;
-
-   case PN_CONNECTION_BOUND: {
-     /* Turn off security */
-     pn_transport_t *t = pn_event_transport(event);
-     pn_transport_require_auth(t, false);
-     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
-   }
-   case PN_CONNECTION_REMOTE_OPEN: {
-     pn_connection_open(pn_event_connection(event)); /* Complete the open */
-     break;
-   }
-
-   case PN_SESSION_REMOTE_OPEN: {
-     pn_session_open(pn_event_session(event));
-     break;
-   }
-
-   case PN_TRANSPORT_CLOSED:
-    check_condition(event, pn_transport_condition(pn_event_transport(event)));
-    pn_listener_close(app->listener); /* Finished */
-    break;
-
-   case PN_CONNECTION_REMOTE_CLOSE:
-    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_SESSION_REMOTE_CLOSE:
-    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_LINK_REMOTE_CLOSE:
-   case PN_LINK_REMOTE_DETACH:
-    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
-    pn_connection_close(pn_event_connection(event));
-    break;
-
-   case PN_PROACTOR_TIMEOUT:
-    /* Wake the sender's connection */
-    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
-    break;
-
-   case PN_LISTENER_CLOSE:
-    check_condition(event, pn_listener_condition(pn_event_listener(event)));
-    break;
-
-   case PN_PROACTOR_INACTIVE:
-    return false;
-    break;
-
-   default: {
-     pn_link_t *l = pn_event_link(event);
-     if (l) {                      /* Only delegate link-related events */
-       if (pn_link_is_sender(l)) {
-         handle_send(app, event);
-       } else {
-         handle_receive(app, event);
-       }
-     }
-   }
-  }
-  return true;
-}
-
-void run(app_data_t *app) {
-  /* Loop and handle events */
-  do {
-    pn_event_batch_t *events = pn_proactor_wait(app->proactor);
-    for (pn_event_t *e = pn_event_batch_next(events); e; e = pn_event_batch_next(events)) {
-      if (!handle(app, e)) {
-        return;
-      }
-    }
-    pn_proactor_done(app->proactor, events);
-  } while(true);
-}
-
-int main(int argc, char **argv) {
-  struct app_data_t app = {0};
-  int i = 0;
-  app.container_id = argv[i++];   /* Should be unique */
-  app.host = (argc > 1) ? argv[i++] : "";
-  app.port = (argc > 1) ? argv[i++] : "amqp";
-  app.amqp_address = (argc > i) ? argv[i++] : "examples";
-  app.message_count = (argc > i) ? atoi(argv[i++]) : 10;
-
-  /* Create the proactor and connect */
-  app.proactor = pn_proactor();
-  app.listener = pn_listener();
-  char addr[PN_MAX_ADDR];
-  pn_proactor_addr(addr, sizeof(addr), app.host, app.port);
-  pn_proactor_listen(app.proactor, app.listener, addr, 16);
-  run(&app);
-  pn_proactor_free(app.proactor);
-  free(app.message_buffer.start);
-  return exit_code;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/564e0ca4/examples/c/proactor/example_test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/example_test.py b/examples/c/proactor/example_test.py
deleted file mode 100644
index 02bb1fd..0000000
--- a/examples/c/proactor/example_test.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License
-#
-
-# This is a test script to run the examples and verify that they behave as expected.
-
-import unittest, sys, time
-from proctest import *
-
-def python_cmd(name):
-    dir = os.path.dirname(__file__)
-    return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
-
-def receive_expect(n):
-    return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n
-
-class Broker(object):
-    def __init__(self, test):
-        self.test = test
-
-    def __enter__(self):
-        with TestPort() as tp:
-            self.port = tp.port
-            self.host = tp.host
-            self.addr = tp.addr
-            self.proc = self.test.proc(["broker", "", self.port])
-            self.proc.wait_re("listening")
-            return self
-
-    def __exit__(self, *args):
-        b = getattr(self, "proc")
-        if b:
-            if b.poll() !=  None: # Broker crashed
-                raise ProcError(b, "broker crash")
-            b.kill()
-
-class CExampleTest(ProcTestCase):
-
-    def test_send_receive(self):
-        """Send first then receive"""
-        with Broker(self) as b:
-            s = self.proc(["send", "", b.port])
-            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
-            r = self.proc(["receive", "", b.port])
-            self.assertEqual(receive_expect(10), r.wait_exit())
-
-    def test_receive_send(self):
-        """Start receiving  first, then send."""
-        with Broker(self) as b:
-            r = self.proc(["receive", "", b.port]);
-            s = self.proc(["send", "", b.port]);
-            self.assertEqual("10 messages sent and acknowledged\n", s.wait_exit())
-            self.assertEqual(receive_expect(10), r.wait_exit())
-
-    def test_send_direct(self):
-        """Send to direct server"""
-        with TestPort() as tp:
-            d = self.proc(["direct", "", tp.port])
-            d.wait_re("listening")
-            self.assertEqual("10 messages sent and acknowledged\n", self.proc(["send", "", tp.port]).wait_exit())
-            self.assertIn(receive_expect(10), d.wait_exit())
-
-    def test_receive_direct(self):
-        """Receive from direct server"""
-        with TestPort() as tp:
-            d = self.proc(["direct", "", tp.port])
-            d.wait_re("listening")
-            self.assertEqual(receive_expect(10), self.proc(["receive", "", tp.port]).wait_exit())
-            self.assertIn("10 messages sent and acknowledged\n", d.wait_exit())
-
-
-if __name__ == "__main__":
-    unittest.main()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message