qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [40/89] [abbrv] [partial] qpid-proton git commit: PROTON-1728: Reorganize the source tree
Date Tue, 03 Jul 2018 22:13:29 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
new file mode 100644
index 0000000..f49886d
--- /dev/null
+++ b/c/src/core/engine.c
@@ -0,0 +1,2343 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "engine-internal.h"
+#include <stdlib.h>
+#include <string.h>
+#include "protocol.h"
+
+#include <assert.h>
+#include <stdarg.h>
+#include <stdio.h>
+
+#include "platform/platform.h"
+#include "platform/platform_fmt.h"
+#include "transport.h"
+
+static void pni_session_bound(pn_session_t *ssn);
+static void pni_link_bound(pn_link_t *link);
+
+
+// endpoints
+
+static pn_connection_t *pni_ep_get_connection(pn_endpoint_t *endpoint)
+{
+  switch (endpoint->type) {
+  case CONNECTION:
+    return (pn_connection_t *) endpoint;
+  case SESSION:
+    return ((pn_session_t *) endpoint)->connection;
+  case SENDER:
+  case RECEIVER:
+    return ((pn_link_t *) endpoint)->session->connection;
+  }
+
+  return NULL;
+}
+
+static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) {
+  switch (type) {
+  case CONNECTION:
+    return open ? PN_CONNECTION_LOCAL_OPEN : PN_CONNECTION_LOCAL_CLOSE;
+  case SESSION:
+    return open ? PN_SESSION_LOCAL_OPEN : PN_SESSION_LOCAL_CLOSE;
+  case SENDER:
+  case RECEIVER:
+    return open ? PN_LINK_LOCAL_OPEN : PN_LINK_LOCAL_CLOSE;
+  default:
+    assert(false);
+    return PN_EVENT_NONE;
+  }
+}
+
+static void pn_endpoint_open(pn_endpoint_t *endpoint)
+{
+  if (!(endpoint->state & PN_LOCAL_ACTIVE)) {
+    PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE);
+    pn_connection_t *conn = pni_ep_get_connection(endpoint);
+    pn_collector_put(conn->collector, PN_OBJECT, endpoint,
+                     endpoint_event(endpoint->type, true));
+    pn_modified(conn, endpoint, true);
+  }
+}
+
+static void pn_endpoint_close(pn_endpoint_t *endpoint)
+{
+  if (!(endpoint->state & PN_LOCAL_CLOSED)) {
+    PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED);
+    pn_connection_t *conn = pni_ep_get_connection(endpoint);
+    pn_collector_put(conn->collector, PN_OBJECT, endpoint,
+                     endpoint_event(endpoint->type, false));
+    pn_modified(conn, endpoint, true);
+  }
+}
+
+void pn_connection_reset(pn_connection_t *connection)
+{
+  assert(connection);
+  pn_endpoint_t *endpoint = &connection->endpoint;
+  endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
+}
+
+void pn_connection_open(pn_connection_t *connection)
+{
+  assert(connection);
+  pn_endpoint_open(&connection->endpoint);
+}
+
+void pn_connection_close(pn_connection_t *connection)
+{
+  assert(connection);
+  pn_endpoint_close(&connection->endpoint);
+}
+
+static void pni_endpoint_tini(pn_endpoint_t *endpoint);
+
+void pn_connection_release(pn_connection_t *connection)
+{
+  assert(!connection->endpoint.freed);
+  // free those endpoints that haven't been freed by the application
+  LL_REMOVE(connection, endpoint, &connection->endpoint);
+  while (connection->endpoint_head) {
+    pn_endpoint_t *ep = connection->endpoint_head;
+    switch (ep->type) {
+    case SESSION:
+      // note: this will free all child links:
+      pn_session_free((pn_session_t *)ep);
+      break;
+    case SENDER:
+    case RECEIVER:
+      pn_link_free((pn_link_t *)ep);
+      break;
+    default:
+      assert(false);
+    }
+  }
+  connection->endpoint.freed = true;
+  if (!connection->transport) {
+    // no transport available to consume transport work items,
+    // so manually clear them:
+    pn_ep_incref(&connection->endpoint);
+    pn_connection_unbound(connection);
+  }
+  pn_ep_decref(&connection->endpoint);
+}
+
+void pn_connection_free(pn_connection_t *connection) {
+  pn_connection_release(connection);
+  pn_decref(connection);
+}
+
+void pn_connection_bound(pn_connection_t *connection)
+{
+  pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND);
+  pn_ep_incref(&connection->endpoint);
+
+  size_t nsessions = pn_list_size(connection->sessions);
+  for (size_t i = 0; i < nsessions; i++) {
+    pni_session_bound((pn_session_t *) pn_list_get(connection->sessions, i));
+  }
+}
+
+// invoked when transport has been removed:
+void pn_connection_unbound(pn_connection_t *connection)
+{
+  connection->transport = NULL;
+  if (connection->endpoint.freed) {
+    // connection has been freed prior to unbinding, thus it
+    // cannot be re-assigned to a new transport.  Clear the
+    // transport work lists to allow the connection to be freed.
+    while (connection->transport_head) {
+        pn_clear_modified(connection, connection->transport_head);
+    }
+    while (connection->tpwork_head) {
+      pn_clear_tpwork(connection->tpwork_head);
+    }
+  }
+  pn_ep_decref(&connection->endpoint);
+}
+
+pn_record_t *pn_connection_attachments(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->context;
+}
+
+void *pn_connection_get_context(pn_connection_t *conn)
+{
+  // XXX: we should really assert on conn here, but this causes
+  // messenger tests to fail
+  return conn ? pn_record_get(conn->context, PN_LEGCTX) : NULL;
+}
+
+void pn_connection_set_context(pn_connection_t *conn, void *context)
+{
+  assert(conn);
+  pn_record_set(conn->context, PN_LEGCTX, context);
+}
+
+pn_transport_t *pn_connection_transport(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->transport;
+}
+
+void pn_condition_init(pn_condition_t *condition)
+{
+  condition->name = pn_string(NULL);
+  condition->description = pn_string(NULL);
+  condition->info = pn_data(0);
+}
+
+pn_condition_t *pn_condition() {
+  pn_condition_t *c = (pn_condition_t*)malloc(sizeof(pn_condition_t));
+  pn_condition_init(c);
+  return c;
+}
+
+void pn_condition_tini(pn_condition_t *condition)
+{
+  pn_data_free(condition->info);
+  pn_free(condition->description);
+  pn_free(condition->name);
+}
+
+void pn_condition_free(pn_condition_t *c) {
+  if (c) {
+    pn_condition_clear(c);
+    pn_condition_tini(c);
+    free(c);
+  }
+}
+
+static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn)
+{
+  pn_list_add(conn->sessions, ssn);
+  ssn->connection = conn;
+  pn_incref(conn);  // keep around until finalized
+  pn_ep_incref(&conn->endpoint);
+}
+
+static void pni_remove_session(pn_connection_t *conn, pn_session_t *ssn)
+{
+  if (pn_list_remove(conn->sessions, ssn)) {
+    pn_ep_decref(&conn->endpoint);
+    LL_REMOVE(conn, endpoint, &ssn->endpoint);
+  }
+}
+
+pn_connection_t *pn_session_connection(pn_session_t *session)
+{
+  if (!session) return NULL;
+  return session->connection;
+}
+
+void pn_session_open(pn_session_t *session)
+{
+  assert(session);
+  pn_endpoint_open(&session->endpoint);
+}
+
+void pn_session_close(pn_session_t *session)
+{
+  assert(session);
+  pn_endpoint_close(&session->endpoint);
+}
+
+void pn_session_free(pn_session_t *session)
+{
+  assert(!session->endpoint.freed);
+  while(pn_list_size(session->links)) {
+    pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0);
+    pn_link_free(link);
+  }
+  pni_remove_session(session->connection, session);
+  pn_list_add(session->connection->freed, session);
+  session->endpoint.freed = true;
+  pn_ep_decref(&session->endpoint);
+
+  // the finalize logic depends on endpoint.freed, so we incref/decref
+  // to give it a chance to rerun
+  pn_incref(session);
+  pn_decref(session);
+}
+
+pn_record_t *pn_session_attachments(pn_session_t *session)
+{
+  assert(session);
+  return session->context;
+}
+
+void *pn_session_get_context(pn_session_t *session)
+{
+  return session ? pn_record_get(session->context, PN_LEGCTX) : 0;
+}
+
+void pn_session_set_context(pn_session_t *session, void *context)
+{
+  assert(context);
+  pn_record_set(session->context, PN_LEGCTX, context);
+}
+
+
+static void pni_add_link(pn_session_t *ssn, pn_link_t *link)
+{
+  pn_list_add(ssn->links, link);
+  link->session = ssn;
+  pn_ep_incref(&ssn->endpoint);
+}
+
+static void pni_remove_link(pn_session_t *ssn, pn_link_t *link)
+{
+  if (pn_list_remove(ssn->links, link)) {
+    pn_ep_decref(&ssn->endpoint);
+    LL_REMOVE(ssn->connection, endpoint, &link->endpoint);
+  }
+}
+
+void pn_link_open(pn_link_t *link)
+{
+  assert(link);
+  pn_endpoint_open(&link->endpoint);
+}
+
+void pn_link_close(pn_link_t *link)
+{
+  assert(link);
+  pn_endpoint_close(&link->endpoint);
+}
+
+void pn_link_detach(pn_link_t *link)
+{
+  assert(link);
+  if (link->detached) return;
+
+  link->detached = true;
+  pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_LOCAL_DETACH);
+  pn_modified(link->session->connection, &link->endpoint, true);
+
+}
+
+static void pni_terminus_free(pn_terminus_t *terminus)
+{
+  pn_free(terminus->address);
+  pn_free(terminus->properties);
+  pn_free(terminus->capabilities);
+  pn_free(terminus->outcomes);
+  pn_free(terminus->filter);
+}
+
+void pn_link_free(pn_link_t *link)
+{
+  assert(!link->endpoint.freed);
+  pni_remove_link(link->session, link);
+  pn_list_add(link->session->freed, link);
+  pn_delivery_t *delivery = link->unsettled_head;
+  while (delivery) {
+    pn_delivery_t *next = delivery->unsettled_next;
+    pn_delivery_settle(delivery);
+    delivery = next;
+  }
+  link->endpoint.freed = true;
+  pn_ep_decref(&link->endpoint);
+
+  // the finalize logic depends on endpoint.freed (modified above), so
+  // we incref/decref to give it a chance to rerun
+  pn_incref(link);
+  pn_decref(link);
+}
+
+void *pn_link_get_context(pn_link_t *link)
+{
+  assert(link);
+  return pn_record_get(link->context, PN_LEGCTX);
+}
+
+void pn_link_set_context(pn_link_t *link, void *context)
+{
+  assert(link);
+  pn_record_set(link->context, PN_LEGCTX, context);
+}
+
+pn_record_t *pn_link_attachments(pn_link_t *link)
+{
+  assert(link);
+  return link->context;
+}
+
+void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
+{
+  endpoint->type = (pn_endpoint_type_t) type;
+  endpoint->referenced = true;
+  endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
+  endpoint->error = pn_error();
+  pn_condition_init(&endpoint->condition);
+  pn_condition_init(&endpoint->remote_condition);
+  endpoint->endpoint_next = NULL;
+  endpoint->endpoint_prev = NULL;
+  endpoint->transport_next = NULL;
+  endpoint->transport_prev = NULL;
+  endpoint->modified = false;
+  endpoint->freed = false;
+  endpoint->refcount = 1;
+  //fprintf(stderr, "initting 0x%lx\n", (uintptr_t) endpoint);
+
+  LL_ADD(conn, endpoint, endpoint);
+}
+
+void pn_ep_incref(pn_endpoint_t *endpoint)
+{
+  endpoint->refcount++;
+}
+
+static pn_event_type_t pn_final_type(pn_endpoint_type_t type) {
+  switch (type) {
+  case CONNECTION:
+    return PN_CONNECTION_FINAL;
+  case SESSION:
+    return PN_SESSION_FINAL;
+  case SENDER:
+  case RECEIVER:
+    return PN_LINK_FINAL;
+  default:
+    assert(false);
+    return PN_EVENT_NONE;
+  }
+}
+
+static pn_endpoint_t *pn_ep_parent(pn_endpoint_t *endpoint) {
+  switch (endpoint->type) {
+  case CONNECTION:
+    return NULL;
+  case SESSION:
+    return &((pn_session_t *) endpoint)->connection->endpoint;
+  case SENDER:
+  case RECEIVER:
+    return &((pn_link_t *) endpoint)->session->endpoint;
+  default:
+    assert(false);
+    return NULL;
+  }
+}
+
+void pn_ep_decref(pn_endpoint_t *endpoint)
+{
+  assert(endpoint->refcount > 0);
+  endpoint->refcount--;
+  if (endpoint->refcount == 0) {
+    pn_connection_t *conn = pni_ep_get_connection(endpoint);
+    pn_collector_put(conn->collector, PN_OBJECT, endpoint, pn_final_type(endpoint->type));
+  }
+}
+
+static void pni_endpoint_tini(pn_endpoint_t *endpoint)
+{
+  pn_error_free(endpoint->error);
+  pn_condition_tini(&endpoint->remote_condition);
+  pn_condition_tini(&endpoint->condition);
+}
+
+static void pni_free_children(pn_list_t *children, pn_list_t *freed)
+{
+  while (pn_list_size(children) > 0) {
+    pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(children, 0);
+    assert(!endpoint->referenced);
+    pn_free(endpoint);
+  }
+
+  while (pn_list_size(freed) > 0) {
+    pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(freed, 0);
+    assert(!endpoint->referenced);
+    pn_free(endpoint);
+  }
+
+  pn_free(children);
+  pn_free(freed);
+}
+
+static void pn_connection_finalize(void *object)
+{
+  pn_connection_t *conn = (pn_connection_t *) object;
+  pn_endpoint_t *endpoint = &conn->endpoint;
+
+  if (conn->transport) {
+    assert(!conn->transport->referenced);
+    pn_free(conn->transport);
+  }
+
+  // freeing the transport could post events
+  if (pn_refcount(conn) > 0) {
+    return;
+  }
+
+  pni_free_children(conn->sessions, conn->freed);
+  pn_free(conn->context);
+  pn_decref(conn->collector);
+
+  pn_free(conn->container);
+  pn_free(conn->hostname);
+  pn_free(conn->auth_user);
+  pn_free(conn->auth_password);
+  pn_free(conn->offered_capabilities);
+  pn_free(conn->desired_capabilities);
+  pn_free(conn->properties);
+  pni_endpoint_tini(endpoint);
+  pn_free(conn->delivery_pool);
+}
+
+#define pn_connection_initialize NULL
+#define pn_connection_hashcode NULL
+#define pn_connection_compare NULL
+#define pn_connection_inspect NULL
+
+pn_connection_t *pn_connection()
+{
+  static const pn_class_t clazz = PN_CLASS(pn_connection);
+  pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t));
+  if (!conn) return NULL;
+
+  conn->endpoint_head = NULL;
+  conn->endpoint_tail = NULL;
+  pn_endpoint_init(&conn->endpoint, CONNECTION, conn);
+  conn->transport_head = NULL;
+  conn->transport_tail = NULL;
+  conn->sessions = pn_list(PN_WEAKREF, 0);
+  conn->freed = pn_list(PN_WEAKREF, 0);
+  conn->transport = NULL;
+  conn->work_head = NULL;
+  conn->work_tail = NULL;
+  conn->tpwork_head = NULL;
+  conn->tpwork_tail = NULL;
+  conn->container = pn_string(NULL);
+  conn->hostname = pn_string(NULL);
+  conn->auth_user = pn_string(NULL);
+  conn->auth_password = pn_string(NULL);
+  conn->offered_capabilities = pn_data(0);
+  conn->desired_capabilities = pn_data(0);
+  conn->properties = pn_data(0);
+  conn->collector = NULL;
+  conn->context = pn_record();
+  conn->delivery_pool = pn_list(PN_OBJECT, 0);
+  conn->driver = NULL;
+
+  return conn;
+}
+
+static const pn_event_type_t endpoint_init_event_map[] = {
+  PN_CONNECTION_INIT,  /* CONNECTION */
+  PN_SESSION_INIT,     /* SESSION */
+  PN_LINK_INIT,        /* SENDER */
+  PN_LINK_INIT};       /* RECEIVER */
+
+void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector)
+{
+  pn_decref(connection->collector);
+  connection->collector = collector;
+  pn_incref(connection->collector);
+  pn_endpoint_t *endpoint = connection->endpoint_head;
+  while (endpoint) {
+    pn_collector_put(connection->collector, PN_OBJECT, endpoint, endpoint_init_event_map[endpoint->type]);
+    endpoint = endpoint->endpoint_next;
+  }
+}
+
+pn_collector_t* pn_connection_collector(pn_connection_t *connection) {
+  return connection->collector;
+}
+
+pn_state_t pn_connection_state(pn_connection_t *connection)
+{
+  return connection ? connection->endpoint.state : 0;
+}
+
+pn_error_t *pn_connection_error(pn_connection_t *connection)
+{
+  return connection ? connection->endpoint.error : NULL;
+}
+
+const char *pn_connection_get_container(pn_connection_t *connection)
+{
+  assert(connection);
+  return pn_string_get(connection->container);
+}
+
+void pn_connection_set_container(pn_connection_t *connection, const char *container)
+{
+  assert(connection);
+  pn_string_set(connection->container, container);
+}
+
+const char *pn_connection_get_hostname(pn_connection_t *connection)
+{
+  assert(connection);
+  return pn_string_get(connection->hostname);
+}
+
+void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname)
+{
+  assert(connection);
+  pn_string_set(connection->hostname, hostname);
+}
+
+const char *pn_connection_get_user(pn_connection_t *connection)
+{
+    assert(connection);
+    return pn_string_get(connection->auth_user);
+}
+
+void pn_connection_set_user(pn_connection_t *connection, const char *user)
+{
+    assert(connection);
+    pn_string_set(connection->auth_user, user);
+}
+
+void pn_connection_set_password(pn_connection_t *connection, const char *password)
+{
+    assert(connection);
+    // Make sure the previous password is erased, if there was one.
+    size_t n = pn_string_size(connection->auth_password);
+    const char* s = pn_string_get(connection->auth_password);
+    if (n > 0 && s) memset((void*)s, 0, n);
+    pn_string_set(connection->auth_password, password);
+}
+
+pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->offered_capabilities;
+}
+
+pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->desired_capabilities;
+}
+
+pn_data_t *pn_connection_properties(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->properties;
+}
+
+pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->transport ? connection->transport->remote_offered_capabilities : NULL;
+}
+
+pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->transport ? connection->transport->remote_desired_capabilities : NULL;
+}
+
+pn_data_t *pn_connection_remote_properties(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->transport ? connection->transport->remote_properties : NULL;
+}
+
+const char *pn_connection_remote_container(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->transport ? connection->transport->remote_container : NULL;
+}
+
+const char *pn_connection_remote_hostname(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->transport ? connection->transport->remote_hostname : NULL;
+}
+
+pn_delivery_t *pn_work_head(pn_connection_t *connection)
+{
+  assert(connection);
+  return connection->work_head;
+}
+
+pn_delivery_t *pn_work_next(pn_delivery_t *delivery)
+{
+  assert(delivery);
+
+  if (delivery->work)
+    return delivery->work_next;
+  else
+    return pn_work_head(delivery->link->session->connection);
+}
+
+static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+  if (!delivery->work)
+  {
+    assert(!delivery->local.settled);   // never allow settled deliveries
+    LL_ADD(connection, work, delivery);
+    delivery->work = true;
+  }
+}
+
+static void pni_clear_work(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+  if (delivery->work)
+  {
+    LL_REMOVE(connection, work, delivery);
+    delivery->work = false;
+  }
+}
+
+void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery)
+{
+  pn_link_t *link = pn_delivery_link(delivery);
+  pn_delivery_t *current = pn_link_current(link);
+  if (delivery->updated && !delivery->local.settled) {
+    pni_add_work(connection, delivery);
+  } else if (delivery == current) {
+    if (link->endpoint.type == SENDER) {
+      if (pn_link_credit(link) > 0) {
+        pni_add_work(connection, delivery);
+      } else {
+        pni_clear_work(connection, delivery);
+      }
+    } else {
+      pni_add_work(connection, delivery);
+    }
+  } else {
+    pni_clear_work(connection, delivery);
+  }
+}
+
+static void pni_add_tpwork(pn_delivery_t *delivery)
+{
+  pn_connection_t *connection = delivery->link->session->connection;
+  if (!delivery->tpwork)
+  {
+    LL_ADD(connection, tpwork, delivery);
+    delivery->tpwork = true;
+  }
+  pn_modified(connection, &connection->endpoint, true);
+}
+
+void pn_clear_tpwork(pn_delivery_t *delivery)
+{
+  pn_connection_t *connection = delivery->link->session->connection;
+  if (delivery->tpwork)
+  {
+    LL_REMOVE(connection, tpwork, delivery);
+    delivery->tpwork = false;
+    if (pn_refcount(delivery) > 0) {
+      pn_incref(delivery);
+      pn_decref(delivery);
+    }
+  }
+}
+
+void pn_dump(pn_connection_t *conn)
+{
+  pn_endpoint_t *endpoint = conn->transport_head;
+  while (endpoint)
+  {
+    printf("%p", (void *) endpoint);
+    endpoint = endpoint->transport_next;
+    if (endpoint)
+      printf(" -> ");
+  }
+  printf("\n");
+}
+
+void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit)
+{
+  if (!endpoint->modified) {
+    LL_ADD(connection, transport, endpoint);
+    endpoint->modified = true;
+  }
+
+  if (emit && connection->transport) {
+    pn_collector_put(connection->collector, PN_OBJECT, connection->transport,
+                     PN_TRANSPORT);
+  }
+}
+
+void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint)
+{
+  if (endpoint->modified) {
+    LL_REMOVE(connection, transport, endpoint);
+    endpoint->transport_next = NULL;
+    endpoint->transport_prev = NULL;
+    endpoint->modified = false;
+  }
+}
+
+static bool pni_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
+{
+  if (endpoint->type != type) return false;
+
+  if (!state) return true;
+
+  int st = endpoint->state;
+  if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0)
+    return st & state;
+  else
+    return st == state;
+}
+
+pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state)
+{
+  while (endpoint)
+  {
+    if (pni_matches(endpoint, type, state))
+      return endpoint;
+    endpoint = endpoint->endpoint_next;
+  }
+  return NULL;
+}
+
+pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state)
+{
+  if (conn)
+    return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state);
+  else
+    return NULL;
+}
+
+pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state)
+{
+  if (ssn)
+    return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state);
+  else
+    return NULL;
+}
+
+pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state)
+{
+  if (!conn) return NULL;
+
+  pn_endpoint_t *endpoint = conn->endpoint_head;
+
+  while (endpoint)
+  {
+    if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state))
+      return (pn_link_t *) endpoint;
+    endpoint = endpoint->endpoint_next;
+  }
+
+  return NULL;
+}
+
+pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state)
+{
+  if (!link) return NULL;
+
+  pn_endpoint_t *endpoint = link->endpoint.endpoint_next;
+
+  while (endpoint)
+  {
+    if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state))
+      return (pn_link_t *) endpoint;
+    endpoint = endpoint->endpoint_next;
+  }
+
+  return NULL;
+}
+
+static void pn_session_incref(void *object)
+{
+  pn_session_t *session = (pn_session_t *) object;
+  if (!session->endpoint.referenced) {
+    session->endpoint.referenced = true;
+    pn_incref(session->connection);
+  } else {
+    pn_object_incref(object);
+  }
+}
+
+static bool pn_ep_bound(pn_endpoint_t *endpoint)
+{
+  pn_connection_t *conn = pni_ep_get_connection(endpoint);
+  pn_session_t *ssn;
+  pn_link_t *lnk;
+
+  if (!conn->transport) return false;
+  if (endpoint->modified) return true;
+
+  switch (endpoint->type) {
+  case CONNECTION:
+    return ((pn_connection_t *)endpoint)->transport;
+  case SESSION:
+    ssn = (pn_session_t *) endpoint;
+    return (((int16_t) ssn->state.local_channel) >= 0 || ((int16_t) ssn->state.remote_channel) >= 0);
+  case SENDER:
+  case RECEIVER:
+    lnk = (pn_link_t *) endpoint;
+    return ((int32_t) lnk->state.local_handle) >= 0 || ((int32_t) lnk->state.remote_handle) >= 0;
+  default:
+    assert(false);
+    return false;
+  }
+}
+
+static bool pni_connection_live(pn_connection_t *conn) {
+  return pn_refcount(conn) > 1;
+}
+
+static bool pni_session_live(pn_session_t *ssn) {
+  return pni_connection_live(ssn->connection) || pn_refcount(ssn) > 1;
+}
+
+static bool pni_link_live(pn_link_t *link) {
+  return pni_session_live(link->session) || pn_refcount(link) > 1;
+}
+
+static bool pni_endpoint_live(pn_endpoint_t *endpoint) {
+  switch (endpoint->type) {
+  case CONNECTION:
+    return pni_connection_live((pn_connection_t *)endpoint);
+  case SESSION:
+    return pni_session_live((pn_session_t *) endpoint);
+  case SENDER:
+  case RECEIVER:
+    return pni_link_live((pn_link_t *) endpoint);
+  default:
+    assert(false);
+    return false;
+  }
+}
+
+static bool pni_preserve_child(pn_endpoint_t *endpoint)
+{
+  pn_connection_t *conn = pni_ep_get_connection(endpoint);
+  pn_endpoint_t *parent = pn_ep_parent(endpoint);
+  if (pni_endpoint_live(parent) && (!endpoint->freed || (pn_ep_bound(endpoint)))
+      && endpoint->referenced) {
+    pn_object_incref(endpoint);
+    endpoint->referenced = false;
+    pn_decref(parent);
+    return true;
+  } else {
+    LL_REMOVE(conn, transport, endpoint);
+    return false;
+  }
+}
+
+static void pn_session_finalize(void *object)
+{
+  pn_session_t *session = (pn_session_t *) object;
+  pn_endpoint_t *endpoint = &session->endpoint;
+
+  if (pni_preserve_child(endpoint)) {
+    return;
+  }
+
+  pn_free(session->context);
+  pni_free_children(session->links, session->freed);
+  pni_endpoint_tini(endpoint);
+  pn_delivery_map_free(&session->state.incoming);
+  pn_delivery_map_free(&session->state.outgoing);
+  pn_free(session->state.local_handles);
+  pn_free(session->state.remote_handles);
+  pni_remove_session(session->connection, session);
+  pn_list_remove(session->connection->freed, session);
+
+  if (session->connection->transport) {
+    pn_transport_t *transport = session->connection->transport;
+    pn_hash_del(transport->local_channels, session->state.local_channel);
+    pn_hash_del(transport->remote_channels, session->state.remote_channel);
+  }
+
+  if (endpoint->referenced) {
+    pn_decref(session->connection);
+  }
+}
+
+#define pn_session_new pn_object_new
+#define pn_session_refcount pn_object_refcount
+#define pn_session_decref pn_object_decref
+#define pn_session_reify pn_object_reify
+#define pn_session_initialize NULL
+#define pn_session_hashcode NULL
+#define pn_session_compare NULL
+#define pn_session_inspect NULL
+
+pn_session_t *pn_session(pn_connection_t *conn)
+{
+  assert(conn);
+#define pn_session_free pn_object_free
+  static const pn_class_t clazz = PN_METACLASS(pn_session);
+#undef pn_session_free
+  pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t));
+  if (!ssn) return NULL;
+  pn_endpoint_init(&ssn->endpoint, SESSION, conn);
+  pni_add_session(conn, ssn);
+  ssn->links = pn_list(PN_WEAKREF, 0);
+  ssn->freed = pn_list(PN_WEAKREF, 0);
+  ssn->context = pn_record();
+  ssn->incoming_capacity = 1024*1024;
+  ssn->incoming_bytes = 0;
+  ssn->outgoing_bytes = 0;
+  ssn->incoming_deliveries = 0;
+  ssn->outgoing_deliveries = 0;
+  ssn->outgoing_window = 2147483647;
+
+  // begin transport state
+  memset(&ssn->state, 0, sizeof(ssn->state));
+  ssn->state.local_channel = (uint16_t)-1;
+  ssn->state.remote_channel = (uint16_t)-1;
+  pn_delivery_map_init(&ssn->state.incoming, 0);
+  pn_delivery_map_init(&ssn->state.outgoing, 0);
+  ssn->state.local_handles = pn_hash(PN_WEAKREF, 0, 0.75);
+  ssn->state.remote_handles = pn_hash(PN_WEAKREF, 0, 0.75);
+  // end transport state
+
+  pn_collector_put(conn->collector, PN_OBJECT, ssn, PN_SESSION_INIT);
+  if (conn->transport) {
+    pni_session_bound(ssn);
+  }
+  pn_decref(ssn);
+  return ssn;
+}
+
+static void pni_session_bound(pn_session_t *ssn)
+{
+  assert(ssn);
+  size_t nlinks = pn_list_size(ssn->links);
+  for (size_t i = 0; i < nlinks; i++) {
+    pni_link_bound((pn_link_t *) pn_list_get(ssn->links, i));
+  }
+}
+
+void pn_session_unbound(pn_session_t* ssn)
+{
+  assert(ssn);
+  ssn->state.local_channel = (uint16_t)-1;
+  ssn->state.remote_channel = (uint16_t)-1;
+  ssn->incoming_bytes = 0;
+  ssn->outgoing_bytes = 0;
+  ssn->incoming_deliveries = 0;
+  ssn->outgoing_deliveries = 0;
+}
+
+size_t pn_session_get_incoming_capacity(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->incoming_capacity;
+}
+
+void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
+{
+  assert(ssn);
+  // XXX: should this trigger a flow?
+  ssn->incoming_capacity = capacity;
+}
+
+size_t pn_session_get_outgoing_window(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->outgoing_window;
+}
+
+void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window)
+{
+  assert(ssn);
+  ssn->outgoing_window = window;
+}
+
+size_t pn_session_outgoing_bytes(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->outgoing_bytes;
+}
+
+size_t pn_session_incoming_bytes(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->incoming_bytes;
+}
+
+pn_state_t pn_session_state(pn_session_t *session)
+{
+  return session->endpoint.state;
+}
+
+pn_error_t *pn_session_error(pn_session_t *session)
+{
+  return session->endpoint.error;
+}
+
+static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type)
+{
+  terminus->type = type;
+  terminus->address = pn_string(NULL);
+  terminus->durability = PN_NONDURABLE;
+  terminus->expiry_policy = PN_EXPIRE_WITH_SESSION;
+  terminus->timeout = 0;
+  terminus->dynamic = false;
+  terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED;
+  terminus->properties = pn_data(0);
+  terminus->capabilities = pn_data(0);
+  terminus->outcomes = pn_data(0);
+  terminus->filter = pn_data(0);
+}
+
+static void pn_link_incref(void *object)
+{
+  pn_link_t *link = (pn_link_t *) object;
+  if (!link->endpoint.referenced) {
+    link->endpoint.referenced = true;
+    pn_incref(link->session);
+  } else {
+    pn_object_incref(object);
+  }
+}
+
+static void pn_link_finalize(void *object)
+{
+  pn_link_t *link = (pn_link_t *) object;
+  pn_endpoint_t *endpoint = &link->endpoint;
+
+  if (pni_preserve_child(endpoint)) {
+    return;
+  }
+
+  while (link->unsettled_head) {
+    assert(!link->unsettled_head->referenced);
+    pn_free(link->unsettled_head);
+  }
+
+  pn_free(link->context);
+  pni_terminus_free(&link->source);
+  pni_terminus_free(&link->target);
+  pni_terminus_free(&link->remote_source);
+  pni_terminus_free(&link->remote_target);
+  pn_free(link->name);
+  pni_endpoint_tini(endpoint);
+  pni_remove_link(link->session, link);
+  pn_hash_del(link->session->state.local_handles, link->state.local_handle);
+  pn_hash_del(link->session->state.remote_handles, link->state.remote_handle);
+  pn_list_remove(link->session->freed, link);
+  if (endpoint->referenced) {
+    pn_decref(link->session);
+  }
+}
+
+#define pn_link_refcount pn_object_refcount
+#define pn_link_decref pn_object_decref
+#define pn_link_reify pn_object_reify
+#define pn_link_initialize NULL
+#define pn_link_hashcode NULL
+#define pn_link_compare NULL
+#define pn_link_inspect NULL
+
+pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
+{
+#define pn_link_new pn_object_new
+#define pn_link_free pn_object_free
+  static const pn_class_t clazz = PN_METACLASS(pn_link);
+#undef pn_link_new
+#undef pn_link_free
+  pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t));
+
+  pn_endpoint_init(&link->endpoint, type, session->connection);
+  pni_add_link(session, link);
+  pn_incref(session);  // keep session until link finalized
+  link->name = pn_string(name);
+  pni_terminus_init(&link->source, PN_SOURCE);
+  pni_terminus_init(&link->target, PN_TARGET);
+  pni_terminus_init(&link->remote_source, PN_UNSPECIFIED);
+  pni_terminus_init(&link->remote_target, PN_UNSPECIFIED);
+  link->unsettled_head = link->unsettled_tail = link->current = NULL;
+  link->unsettled_count = 0;
+  link->max_message_size = 0;
+  link->remote_max_message_size = 0;
+  link->available = 0;
+  link->credit = 0;
+  link->queued = 0;
+  link->drain = false;
+  link->drain_flag_mode = true;
+  link->drained = 0;
+  link->context = pn_record();
+  link->snd_settle_mode = PN_SND_MIXED;
+  link->rcv_settle_mode = PN_RCV_FIRST;
+  link->remote_snd_settle_mode = PN_SND_MIXED;
+  link->remote_rcv_settle_mode = PN_RCV_FIRST;
+  link->detached = false;
+
+  // begin transport state
+  link->state.local_handle = -1;
+  link->state.remote_handle = -1;
+  link->state.delivery_count = 0;
+  link->state.link_credit = 0;
+  // end transport state
+
+  pn_collector_put(session->connection->collector, PN_OBJECT, link, PN_LINK_INIT);
+  if (session->connection->transport) {
+    pni_link_bound(link);
+  }
+  pn_decref(link);
+  return link;
+}
+
+static void pni_link_bound(pn_link_t *link)
+{
+}
+
+void pn_link_unbound(pn_link_t* link)
+{
+  assert(link);
+  link->state.local_handle = -1;
+  link->state.remote_handle = -1;
+  link->state.delivery_count = 0;
+  link->state.link_credit = 0;
+}
+
+pn_terminus_t *pn_link_source(pn_link_t *link)
+{
+  return link ? &link->source : NULL;
+}
+
+pn_terminus_t *pn_link_target(pn_link_t *link)
+{
+  return link ? &link->target : NULL;
+}
+
+pn_terminus_t *pn_link_remote_source(pn_link_t *link)
+{
+  return link ? &link->remote_source : NULL;
+}
+
+pn_terminus_t *pn_link_remote_target(pn_link_t *link)
+{
+  return link ? &link->remote_target : NULL;
+}
+
+int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type)
+{
+  if (!terminus) return PN_ARG_ERR;
+  terminus->type = type;
+  return 0;
+}
+
+pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->type : (pn_terminus_type_t) 0;
+}
+
+const char *pn_terminus_get_address(pn_terminus_t *terminus)
+{
+  assert(terminus);
+  return pn_string_get(terminus->address);
+}
+
+int pn_terminus_set_address(pn_terminus_t *terminus, const char *address)
+{
+  assert(terminus);
+  return pn_string_set(terminus->address, address);
+}
+
+pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->durability : (pn_durability_t) 0;
+}
+
+int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability)
+{
+  if (!terminus) return PN_ARG_ERR;
+  terminus->durability = durability;
+  return 0;
+}
+
+pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->expiry_policy : (pn_expiry_policy_t) 0;
+}
+
+int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy)
+{
+  if (!terminus) return PN_ARG_ERR;
+  terminus->expiry_policy = expiry_policy;
+  return 0;
+}
+
+pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->timeout : 0;
+}
+
+int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout)
+{
+  if (!terminus) return PN_ARG_ERR;
+  terminus->timeout = timeout;
+  return 0;
+}
+
+bool pn_terminus_is_dynamic(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->dynamic : false;
+}
+
+int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic)
+{
+  if (!terminus) return PN_ARG_ERR;
+  terminus->dynamic = dynamic;
+  return 0;
+}
+
+pn_data_t *pn_terminus_properties(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->properties : NULL;
+}
+
+pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->capabilities : NULL;
+}
+
+pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->outcomes : NULL;
+}
+
+pn_data_t *pn_terminus_filter(pn_terminus_t *terminus)
+{
+  return terminus ? terminus->filter : NULL;
+}
+
+pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus)
+{
+  return terminus ? terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED;
+}
+
+int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m)
+{
+  if (!terminus) return PN_ARG_ERR;
+  terminus->distribution_mode = m;
+  return 0;
+}
+
+int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src)
+{
+  if (!terminus || !src) {
+    return PN_ARG_ERR;
+  }
+
+  terminus->type = src->type;
+  int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src));
+  if (err) return err;
+  terminus->durability = src->durability;
+  terminus->expiry_policy = src->expiry_policy;
+  terminus->timeout = src->timeout;
+  terminus->dynamic = src->dynamic;
+  terminus->distribution_mode = src->distribution_mode;
+  err = pn_data_copy(terminus->properties, src->properties);
+  if (err) return err;
+  err = pn_data_copy(terminus->capabilities, src->capabilities);
+  if (err) return err;
+  err = pn_data_copy(terminus->outcomes, src->outcomes);
+  if (err) return err;
+  err = pn_data_copy(terminus->filter, src->filter);
+  if (err) return err;
+  return 0;
+}
+
+pn_link_t *pn_sender(pn_session_t *session, const char *name)
+{
+  return pn_link_new(SENDER, session, name);
+}
+
+pn_link_t *pn_receiver(pn_session_t *session, const char *name)
+{
+  return pn_link_new(RECEIVER, session, name);
+}
+
+pn_state_t pn_link_state(pn_link_t *link)
+{
+  return link->endpoint.state;
+}
+
+pn_error_t *pn_link_error(pn_link_t *link)
+{
+  return link->endpoint.error;
+}
+
+const char *pn_link_name(pn_link_t *link)
+{
+  assert(link);
+  return pn_string_get(link->name);
+}
+
+bool pn_link_is_sender(pn_link_t *link)
+{
+  return link->endpoint.type == SENDER;
+}
+
+bool pn_link_is_receiver(pn_link_t *link)
+{
+  return link->endpoint.type == RECEIVER;
+}
+
+pn_session_t *pn_link_session(pn_link_t *link)
+{
+  assert(link);
+  return link->session;
+}
+
+static void pn_disposition_finalize(pn_disposition_t *ds)
+{
+  pn_free(ds->data);
+  pn_free(ds->annotations);
+  pn_condition_tini(&ds->condition);
+}
+
+static void pn_delivery_incref(void *object)
+{
+  pn_delivery_t *delivery = (pn_delivery_t *) object;
+  if (delivery->link && !delivery->referenced) {
+    delivery->referenced = true;
+    pn_incref(delivery->link);
+  } else {
+    pn_object_incref(object);
+  }
+}
+
+static bool pni_preserve_delivery(pn_delivery_t *delivery)
+{
+  pn_connection_t *conn = delivery->link->session->connection;
+  return !delivery->local.settled || (conn->transport && (delivery->state.init || delivery->tpwork));
+}
+
+static void pn_delivery_finalize(void *object)
+{
+  pn_delivery_t *delivery = (pn_delivery_t *) object;
+  pn_link_t *link = delivery->link;
+  //  assert(!delivery->state.init);
+
+  bool pooled = false;
+  bool referenced = true;
+  if (link) {
+    if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) {
+      delivery->referenced = false;
+      pn_object_incref(delivery);
+      pn_decref(link);
+      return;
+    }
+    referenced = delivery->referenced;
+
+    pn_clear_tpwork(delivery);
+    LL_REMOVE(link, unsettled, delivery);
+    pn_delivery_map_del(pn_link_is_sender(link)
+                        ? &link->session->state.outgoing
+                        : &link->session->state.incoming,
+                        delivery);
+    pn_buffer_clear(delivery->tag);
+    pn_buffer_clear(delivery->bytes);
+    pn_record_clear(delivery->context);
+    delivery->settled = true;
+    pn_connection_t *conn = link->session->connection;
+    assert(pn_refcount(delivery) == 0);
+    if (pni_connection_live(conn)) {
+      pn_list_t *pool = link->session->connection->delivery_pool;
+      delivery->link = NULL;
+      pn_list_add(pool, delivery);
+      pooled = true;
+      assert(pn_refcount(delivery) == 1);
+    }
+  }
+
+  if (!pooled) {
+    pn_free(delivery->context);
+    pn_buffer_free(delivery->tag);
+    pn_buffer_free(delivery->bytes);
+    pn_disposition_finalize(&delivery->local);
+    pn_disposition_finalize(&delivery->remote);
+  }
+
+  if (referenced) {
+    pn_decref(link);
+  }
+}
+
+static void pn_disposition_init(pn_disposition_t *ds)
+{
+  ds->data = pn_data(0);
+  ds->annotations = pn_data(0);
+  pn_condition_init(&ds->condition);
+}
+
+static void pn_disposition_clear(pn_disposition_t *ds)
+{
+  ds->type = 0;
+  ds->section_number = 0;
+  ds->section_offset = 0;
+  ds->failed = false;
+  ds->undeliverable = false;
+  ds->settled = false;
+  pn_data_clear(ds->data);
+  pn_data_clear(ds->annotations);
+  pn_condition_clear(&ds->condition);
+}
+
+#define pn_delivery_new pn_object_new
+#define pn_delivery_refcount pn_object_refcount
+#define pn_delivery_decref pn_object_decref
+#define pn_delivery_free pn_object_free
+#define pn_delivery_reify pn_object_reify
+#define pn_delivery_initialize NULL
+#define pn_delivery_hashcode NULL
+#define pn_delivery_compare NULL
+
+int pn_delivery_inspect(void *obj, pn_string_t *dst) {
+  pn_delivery_t *d = (pn_delivery_t*)obj;
+  const char* dir = pn_link_is_sender(d->link) ? "sending" : "receiving";
+  pn_bytes_t bytes = pn_buffer_bytes(d->tag);
+  int err =
+    pn_string_addf(dst, "pn_delivery<%p>{%s, tag=b\"", obj, dir) ||
+    pn_quote(dst, bytes.start, bytes.size) ||
+    pn_string_addf(dst, "\", local=%s, remote=%s}",
+                   pn_disposition_type_name(d->local.type),
+                   pn_disposition_type_name(d->remote.type));
+  return err;
+}
+
+pn_delivery_tag_t pn_dtag(const char *bytes, size_t size) {
+  pn_delivery_tag_t dtag = {size, bytes};
+  return dtag;
+}
+
+pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag)
+{
+  assert(link);
+  pn_list_t *pool = link->session->connection->delivery_pool;
+  pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool);
+  if (!delivery) {
+    static const pn_class_t clazz = PN_METACLASS(pn_delivery);
+    delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t));
+    if (!delivery) return NULL;
+    delivery->tag = pn_buffer(16);
+    delivery->bytes = pn_buffer(64);
+    pn_disposition_init(&delivery->local);
+    pn_disposition_init(&delivery->remote);
+    delivery->context = pn_record();
+  } else {
+    assert(!delivery->state.init);
+  }
+  delivery->link = link;
+  pn_incref(delivery->link);  // keep link until finalized
+  pn_buffer_clear(delivery->tag);
+  pn_buffer_append(delivery->tag, tag.start, tag.size);
+  pn_disposition_clear(&delivery->local);
+  pn_disposition_clear(&delivery->remote);
+  delivery->updated = false;
+  delivery->settled = false;
+  LL_ADD(link, unsettled, delivery);
+  delivery->referenced = true;
+  delivery->work_next = NULL;
+  delivery->work_prev = NULL;
+  delivery->work = false;
+  delivery->tpwork_next = NULL;
+  delivery->tpwork_prev = NULL;
+  delivery->tpwork = false;
+  pn_buffer_clear(delivery->bytes);
+  delivery->done = false;
+  delivery->aborted = false;
+  pn_record_clear(delivery->context);
+
+  // begin delivery state
+  delivery->state.init = false;
+  delivery->state.sending = false; /* True if we have sent at least 1 frame */
+  delivery->state.sent = false;    /* True if we have sent the entire delivery */
+  // end delivery state
+
+  if (!link->current)
+    link->current = delivery;
+
+  link->unsettled_count++;
+
+  pn_work_update(link->session->connection, delivery);
+
+  // XXX: could just remove incref above
+  pn_decref(delivery);
+
+  return delivery;
+}
+
+bool pn_delivery_buffered(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  if (delivery->settled) return false;
+  if (pn_link_is_sender(delivery->link)) {
+    pn_delivery_state_t *state = &delivery->state;
+    if (state->sent) {
+      return false;
+    } else {
+      return delivery->done || (pn_buffer_size(delivery->bytes) > 0);
+    }
+  } else {
+    return false;
+  }
+}
+
+int pn_link_unsettled(pn_link_t *link)
+{
+  return link->unsettled_count;
+}
+
+pn_delivery_t *pn_unsettled_head(pn_link_t *link)
+{
+  pn_delivery_t *d = link->unsettled_head;
+  while (d && d->local.settled) {
+    d = d->unsettled_next;
+  }
+  return d;
+}
+
+pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery)
+{
+  pn_delivery_t *d = delivery->unsettled_next;
+  while (d && d->local.settled) {
+    d = d->unsettled_next;
+  }
+  return d;
+}
+
+bool pn_delivery_current(pn_delivery_t *delivery)
+{
+  pn_link_t *link = delivery->link;
+  return pn_link_current(link) == delivery;
+}
+
+void pn_delivery_dump(pn_delivery_t *d)
+{
+  char tag[1024];
+  pn_bytes_t bytes = pn_buffer_bytes(d->tag);
+  pn_quote_data(tag, 1024, bytes.start, bytes.size);
+  printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%u, "
+         "remote.settled=%u, updated=%u, current=%u, writable=%u, readable=%u, "
+         "work=%u}",
+         tag, d->local.type, d->remote.type, d->local.settled,
+         d->remote.settled, d->updated, pn_delivery_current(d),
+         pn_delivery_writable(d), pn_delivery_readable(d), d->work);
+}
+
+void *pn_delivery_get_context(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return pn_record_get(delivery->context, PN_LEGCTX);
+}
+
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
+{
+  assert(delivery);
+  pn_record_set(delivery->context, PN_LEGCTX, context);
+}
+
+pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return delivery->context;
+}
+
+uint64_t pn_disposition_type(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->type;
+}
+
+pn_data_t *pn_disposition_data(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->data;
+}
+
+uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->section_number;
+}
+
+void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number)
+{
+  assert(disposition);
+  disposition->section_number = section_number;
+}
+
+uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->section_offset;
+}
+
+void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset)
+{
+  assert(disposition);
+  disposition->section_offset = section_offset;
+}
+
+bool pn_disposition_is_failed(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->failed;
+}
+
+void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed)
+{
+  assert(disposition);
+  disposition->failed = failed;
+}
+
+bool pn_disposition_is_undeliverable(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->undeliverable;
+}
+
+void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable)
+{
+  assert(disposition);
+  disposition->undeliverable = undeliverable;
+}
+
+pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return disposition->annotations;
+}
+
+pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition)
+{
+  assert(disposition);
+  return &disposition->condition;
+}
+
+pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
+{
+  if (delivery) {
+    pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
+    return pn_dtag(tag.start, tag.size);
+  } else {
+    return pn_dtag(0, 0);
+  }
+}
+
+pn_delivery_t *pn_link_current(pn_link_t *link)
+{
+  if (!link) return NULL;
+  return link->current;
+}
+
+static void pni_advance_sender(pn_link_t *link)
+{
+  link->current->done = true;
+  /* Skip accounting if the link is aborted and has not sent any frames.
+     A delivery that was aborted before sending the first frame was not accounted
+     for in pni_process_tpwork_sender() so we don't need to account for it being sent here.
+  */
+  bool skip = link->current->aborted && !link->current->state.sending;
+  if (!skip) {
+    link->queued++;
+    link->credit--;
+    link->session->outgoing_deliveries++;
+  }
+  pni_add_tpwork(link->current);
+  link->current = link->current->unsettled_next;
+}
+
+static void pni_advance_receiver(pn_link_t *link)
+{
+  link->credit--;
+  link->queued--;
+  link->session->incoming_deliveries--;
+
+  pn_delivery_t *current = link->current;
+  link->session->incoming_bytes -= pn_buffer_size(current->bytes);
+  pn_buffer_clear(current->bytes);
+
+  if (!link->session->state.incoming_window) {
+    pni_add_tpwork(current);
+  }
+
+  link->current = link->current->unsettled_next;
+}
+
+bool pn_link_advance(pn_link_t *link)
+{
+  if (link && link->current) {
+    pn_delivery_t *prev = link->current;
+    if (link->endpoint.type == SENDER) {
+      pni_advance_sender(link);
+    } else {
+      pni_advance_receiver(link);
+    }
+    pn_delivery_t *next = link->current;
+    pn_work_update(link->session->connection, prev);
+    if (next) pn_work_update(link->session->connection, next);
+    return prev != next;
+  } else {
+    return false;
+  }
+}
+
+int pn_link_credit(pn_link_t *link)
+{
+  return link ? link->credit : 0;
+}
+
+int pn_link_available(pn_link_t *link)
+{
+  return link ? link->available : 0;
+}
+
+int pn_link_queued(pn_link_t *link)
+{
+  return link ? link->queued : 0;
+}
+
+int pn_link_remote_credit(pn_link_t *link)
+{
+  assert(link);
+  return link->credit - link->queued;
+}
+
+bool pn_link_get_drain(pn_link_t *link)
+{
+  assert(link);
+  return link->drain;
+}
+
+pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link)
+{
+  return link ? (pn_snd_settle_mode_t)link->snd_settle_mode
+      : PN_SND_MIXED;
+}
+
+pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link)
+{
+  return link ? (pn_rcv_settle_mode_t)link->rcv_settle_mode
+      : PN_RCV_FIRST;
+}
+
+pn_snd_settle_mode_t pn_link_remote_snd_settle_mode(pn_link_t *link)
+{
+  return link ? (pn_snd_settle_mode_t)link->remote_snd_settle_mode
+      : PN_SND_MIXED;
+}
+
+pn_rcv_settle_mode_t pn_link_remote_rcv_settle_mode(pn_link_t *link)
+{
+  return link ? (pn_rcv_settle_mode_t)link->remote_rcv_settle_mode
+      : PN_RCV_FIRST;
+}
+void pn_link_set_snd_settle_mode(pn_link_t *link, pn_snd_settle_mode_t mode)
+{
+  if (link)
+    link->snd_settle_mode = (uint8_t)mode;
+}
+void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode)
+{
+  if (link)
+    link->rcv_settle_mode = (uint8_t)mode;
+}
+
+void pn_delivery_settle(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  if (!delivery->local.settled) {
+    pn_link_t *link = delivery->link;
+    if (pn_delivery_current(delivery)) {
+      pn_link_advance(link);
+    }
+
+    link->unsettled_count--;
+    delivery->local.settled = true;
+    pni_add_tpwork(delivery);
+    pn_work_update(delivery->link->session->connection, delivery);
+    pn_incref(delivery);
+    pn_decref(delivery);
+  }
+}
+
+void pn_link_offered(pn_link_t *sender, int credit)
+{
+  sender->available = credit;
+}
+
+ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n)
+{
+  pn_delivery_t *current = pn_link_current(sender);
+  if (!current) return PN_EOS;
+  if (!bytes || !n) return 0;
+  pn_buffer_append(current->bytes, bytes, n);
+  sender->session->outgoing_bytes += n;
+  pni_add_tpwork(current);
+  return n;
+}
+
+int pn_link_drained(pn_link_t *link)
+{
+  assert(link);
+  int drained = 0;
+
+  if (pn_link_is_sender(link)) {
+    if (link->drain && link->credit > 0) {
+      link->drained = link->credit;
+      link->credit = 0;
+      pn_modified(link->session->connection, &link->endpoint, true);
+      drained = link->drained;
+    }
+  } else {
+    drained = link->drained;
+    link->drained = 0;
+  }
+
+  return drained;
+}
+
+ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n)
+{
+  if (!receiver) return PN_ARG_ERR;
+  pn_delivery_t *delivery = receiver->current;
+  if (!delivery) return PN_STATE_ERR;
+  if (delivery->aborted) return PN_ABORTED;
+  size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes);
+  pn_buffer_trim(delivery->bytes, size, 0);
+  if (size) {
+    receiver->session->incoming_bytes -= size;
+    if (!receiver->session->state.incoming_window) {
+      pni_add_tpwork(delivery);
+    }
+    return size;
+  } else {
+    return delivery->done ? PN_EOS : 0;
+  }
+}
+
+
+void pn_link_flow(pn_link_t *receiver, int credit)
+{
+  assert(receiver);
+  assert(pn_link_is_receiver(receiver));
+  receiver->credit += credit;
+  pn_modified(receiver->session->connection, &receiver->endpoint, true);
+  if (!receiver->drain_flag_mode) {
+    pn_link_set_drain(receiver, false);
+    receiver->drain_flag_mode = false;
+  }
+}
+
+void pn_link_drain(pn_link_t *receiver, int credit)
+{
+  assert(receiver);
+  assert(pn_link_is_receiver(receiver));
+  pn_link_set_drain(receiver, true);
+  pn_link_flow(receiver, credit);
+  receiver->drain_flag_mode = false;
+}
+
+void pn_link_set_drain(pn_link_t *receiver, bool drain)
+{
+  assert(receiver);
+  assert(pn_link_is_receiver(receiver));
+  receiver->drain = drain;
+  pn_modified(receiver->session->connection, &receiver->endpoint, true);
+  receiver->drain_flag_mode = true;
+}
+
+bool pn_link_draining(pn_link_t *receiver)
+{
+  assert(receiver);
+  assert(pn_link_is_receiver(receiver));
+  return receiver->drain && (pn_link_credit(receiver) > pn_link_queued(receiver));
+}
+
+uint64_t pn_link_max_message_size(pn_link_t *link)
+{
+  return link->max_message_size;
+}
+
+void pn_link_set_max_message_size(pn_link_t *link, uint64_t size)
+{
+  link->max_message_size = size;
+}
+
+uint64_t pn_link_remote_max_message_size(pn_link_t *link)
+{
+  return link->remote_max_message_size;
+}
+
+pn_link_t *pn_delivery_link(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return delivery->link;
+}
+
+pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return &delivery->local;
+}
+
+uint64_t pn_delivery_local_state(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return delivery->local.type;
+}
+
+pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return &delivery->remote;
+}
+
+uint64_t pn_delivery_remote_state(pn_delivery_t *delivery)
+{
+  assert(delivery);
+  return delivery->remote.type;
+}
+
+bool pn_delivery_settled(pn_delivery_t *delivery)
+{
+  return delivery ? delivery->remote.settled : false;
+}
+
+bool pn_delivery_updated(pn_delivery_t *delivery)
+{
+  return delivery ? delivery->updated : false;
+}
+
+void pn_delivery_clear(pn_delivery_t *delivery)
+{
+  delivery->updated = false;
+  pn_work_update(delivery->link->session->connection, delivery);
+}
+
+void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
+{
+  if (!delivery) return;
+  delivery->local.type = state;
+  pni_add_tpwork(delivery);
+}
+
+bool pn_delivery_writable(pn_delivery_t *delivery)
+{
+  if (!delivery) return false;
+
+  pn_link_t *link = delivery->link;
+  return pn_link_is_sender(link) && pn_delivery_current(delivery) && pn_link_credit(link) > 0;
+}
+
+bool pn_delivery_readable(pn_delivery_t *delivery)
+{
+  if (delivery) {
+    pn_link_t *link = delivery->link;
+    return pn_link_is_receiver(link) && pn_delivery_current(delivery);
+  } else {
+    return false;
+  }
+}
+
+size_t pn_delivery_pending(pn_delivery_t *delivery)
+{
+  /* Aborted deliveries: for clients that don't check pn_delivery_aborted(),
+     return 1 rather than 0. This will force them to call pn_link_recv() and get
+     the PN_ABORTED error return code.
+  */
+  if (delivery->aborted) return 1;
+  return pn_buffer_size(delivery->bytes);
+}
+
+bool pn_delivery_partial(pn_delivery_t *delivery)
+{
+  return !delivery->done;
+}
+
+void pn_delivery_abort(pn_delivery_t *delivery) {
+  if (!delivery->local.settled) { /* Can't abort a settled delivery */
+    delivery->aborted = true;
+    pn_delivery_settle(delivery);
+  }
+}
+
+bool pn_delivery_aborted(pn_delivery_t *delivery) {
+  return delivery->aborted;
+}
+
+pn_condition_t *pn_connection_condition(pn_connection_t *connection)
+{
+  assert(connection);
+  return &connection->endpoint.condition;
+}
+
+pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection)
+{
+  assert(connection);
+  pn_transport_t *transport = connection->transport;
+  return transport ? &transport->remote_condition : NULL;
+}
+
+pn_condition_t *pn_session_condition(pn_session_t *session)
+{
+  assert(session);
+  return &session->endpoint.condition;
+}
+
+pn_condition_t *pn_session_remote_condition(pn_session_t *session)
+{
+  assert(session);
+  return &session->endpoint.remote_condition;
+}
+
+pn_condition_t *pn_link_condition(pn_link_t *link)
+{
+  assert(link);
+  return &link->endpoint.condition;
+}
+
+pn_condition_t *pn_link_remote_condition(pn_link_t *link)
+{
+  assert(link);
+  return &link->endpoint.remote_condition;
+}
+
+bool pn_condition_is_set(pn_condition_t *condition)
+{
+  return condition && pn_string_get(condition->name);
+}
+
+void pn_condition_clear(pn_condition_t *condition)
+{
+  assert(condition);
+  pn_string_clear(condition->name);
+  pn_string_clear(condition->description);
+  pn_data_clear(condition->info);
+}
+
+const char *pn_condition_get_name(pn_condition_t *condition)
+{
+  assert(condition);
+  return pn_string_get(condition->name);
+}
+
+int pn_condition_set_name(pn_condition_t *condition, const char *name)
+{
+  assert(condition);
+  return pn_string_set(condition->name, name);
+}
+
+const char *pn_condition_get_description(pn_condition_t *condition)
+{
+  assert(condition);
+  return pn_string_get(condition->description);
+}
+
+int pn_condition_set_description(pn_condition_t *condition, const char *description)
+{
+  assert(condition);
+  return pn_string_set(condition->description, description);
+}
+
+int pn_condition_vformat(pn_condition_t *condition, const char *name, const char *fmt, va_list ap)
+{
+  assert(condition);
+  int err = pn_condition_set_name(condition, name);
+  if (err)
+      return err;
+
+  char text[1024];
+  size_t n = pni_vsnprintf(text, 1024, fmt, ap);
+  if (n >= sizeof(text))
+      text[sizeof(text)-1] = '\0';
+  err = pn_condition_set_description(condition, text);
+  return err;
+}
+
+int pn_condition_format(pn_condition_t *condition, const char *name, const char *fmt, ...)
+{
+  assert(condition);
+  va_list ap;
+  va_start(ap, fmt);
+  int err = pn_condition_vformat(condition, name, fmt, ap);
+  va_end(ap);
+  return err;
+}
+
+pn_data_t *pn_condition_info(pn_condition_t *condition)
+{
+  assert(condition);
+  return condition->info;
+}
+
+bool pn_condition_is_redirect(pn_condition_t *condition)
+{
+  const char *name = pn_condition_get_name(condition);
+  return name && (!strcmp(name, "amqp:connection:redirect") ||
+                  !strcmp(name, "amqp:link:redirect"));
+}
+
+const char *pn_condition_redirect_host(pn_condition_t *condition)
+{
+  pn_data_t *data = pn_condition_info(condition);
+  pn_data_rewind(data);
+  pn_data_next(data);
+  pn_data_enter(data);
+  pn_data_lookup(data, "network-host");
+  pn_bytes_t host = pn_data_get_bytes(data);
+  pn_data_rewind(data);
+  return host.start;
+}
+
+int pn_condition_redirect_port(pn_condition_t *condition)
+{
+  pn_data_t *data = pn_condition_info(condition);
+  pn_data_rewind(data);
+  pn_data_next(data);
+  pn_data_enter(data);
+  pn_data_lookup(data, "port");
+  int port = pn_data_get_int(data);
+  pn_data_rewind(data);
+  return port;
+}
+
+pn_connection_t *pn_event_connection(pn_event_t *event)
+{
+  pn_session_t *ssn;
+  pn_transport_t *transport;
+
+  switch (pn_class_id(pn_event_class(event))) {
+  case CID_pn_connection:
+    return (pn_connection_t *) pn_event_context(event);
+  case CID_pn_transport:
+    transport = pn_event_transport(event);
+    if (transport)
+      return transport->connection;
+    return NULL;
+  default:
+    ssn = pn_event_session(event);
+    if (ssn)
+     return pn_session_connection(ssn);
+  }
+  return NULL;
+}
+
+pn_session_t *pn_event_session(pn_event_t *event)
+{
+  pn_link_t *link;
+  switch (pn_class_id(pn_event_class(event))) {
+  case CID_pn_session:
+    return (pn_session_t *) pn_event_context(event);
+  default:
+    link = pn_event_link(event);
+    if (link)
+      return pn_link_session(link);
+  }
+  return NULL;
+}
+
+pn_link_t *pn_event_link(pn_event_t *event)
+{
+  pn_delivery_t *dlv;
+  switch (pn_class_id(pn_event_class(event))) {
+  case CID_pn_link:
+    return (pn_link_t *) pn_event_context(event);
+  default:
+    dlv = pn_event_delivery(event);
+    if (dlv)
+      return pn_delivery_link(dlv);
+  }
+  return NULL;
+}
+
+pn_delivery_t *pn_event_delivery(pn_event_t *event)
+{
+  switch (pn_class_id(pn_event_class(event))) {
+  case CID_pn_delivery:
+    return (pn_delivery_t *) pn_event_context(event);
+  default:
+    return NULL;
+  }
+}
+
+pn_transport_t *pn_event_transport(pn_event_t *event)
+{
+  switch (pn_class_id(pn_event_class(event))) {
+  case CID_pn_transport:
+    return (pn_transport_t *) pn_event_context(event);
+  default:
+    {
+      pn_connection_t *conn = pn_event_connection(event);
+      if (conn)
+        return pn_connection_transport(conn);
+      return NULL;
+    }
+  }
+}
+
+int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) {
+  assert(dest);
+  assert(src);
+  int err = 0;
+  if (src != dest) {
+    err = pn_string_copy(dest->name, src->name);
+    if (!err) err = pn_string_copy(dest->description, src->description);
+    if (!err) err = pn_data_copy(dest->info, src->info);
+  }
+  return err;
+}
+
+
+static pn_condition_t *cond_set(pn_condition_t *cond) {
+  return cond && pn_condition_is_set(cond) ? cond : NULL;
+}
+
+static pn_condition_t *cond2_set(pn_condition_t *cond1, pn_condition_t *cond2) {
+  pn_condition_t *cond = cond_set(cond1);
+  if (!cond) cond = cond_set(cond2);
+  return cond;
+}
+
+pn_condition_t *pn_event_condition(pn_event_t *e) {
+  void *ctx = pn_event_context(e);
+  switch (pn_class_id(pn_event_class(e))) {
+   case CID_pn_connection: {
+     pn_connection_t *c = (pn_connection_t*)ctx;
+     return cond2_set(pn_connection_remote_condition(c), pn_connection_condition(c));
+   }
+   case CID_pn_session: {
+     pn_session_t *s = (pn_session_t*)ctx;
+     return cond2_set(pn_session_remote_condition(s), pn_session_condition(s));
+   }
+   case CID_pn_link: {
+     pn_link_t *l = (pn_link_t*)ctx;
+     return cond2_set(pn_link_remote_condition(l), pn_link_condition(l));
+   }
+   case CID_pn_transport:
+    return cond_set(pn_transport_condition((pn_transport_t*)ctx));
+
+   default:
+    return NULL;
+  }
+}
+
+const char *pn_disposition_type_name(uint64_t d) {
+  switch(d) {
+   case PN_RECEIVED: return "received";
+   case PN_ACCEPTED: return "accepted";
+   case PN_REJECTED: return "rejected";
+   case PN_RELEASED: return "released";
+   case PN_MODIFIED: return "modified";
+   default: return "unknown";
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/error.c
----------------------------------------------------------------------
diff --git a/c/src/core/error.c b/c/src/core/error.c
new file mode 100644
index 0000000..070144a
--- /dev/null
+++ b/c/src/core/error.c
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "platform/platform.h"
+#include "util.h"
+
+#include <proton/error.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+
+struct pn_error_t {
+  char *text;
+  pn_error_t *root;
+  int code;
+};
+
+pn_error_t *pn_error()
+{
+  pn_error_t *error = (pn_error_t *) malloc(sizeof(pn_error_t));
+  if (error != NULL) {
+    error->code = 0;
+    error->text = NULL;
+    error->root = NULL;
+  }
+  return error;
+}
+
+void pn_error_free(pn_error_t *error)
+{
+  if (error) {
+    free(error->text);
+    free(error);
+  }
+}
+
+void pn_error_clear(pn_error_t *error)
+{
+  if (error) {
+    error->code = 0;
+    free(error->text);
+    error->text = NULL;
+    error->root = NULL;
+  }
+}
+
+int pn_error_set(pn_error_t *error, int code, const char *text)
+{
+  assert(error);
+  pn_error_clear(error);
+  if (code) {
+    error->code = code;
+    error->text = pn_strdup(text);
+  }
+  return code;
+}
+
+int pn_error_vformat(pn_error_t *error, int code, const char *fmt, va_list ap)
+{
+  assert(error);
+  char text[1024];
+  int n = pni_vsnprintf(text, 1024, fmt, ap);
+  if (n >= 1024) {
+    text[1023] = '\0';
+  }
+  return pn_error_set(error, code, text);
+}
+
+int pn_error_format(pn_error_t *error, int code, const char *fmt, ...)
+{
+  assert(error);
+  va_list ap;
+  va_start(ap, fmt);
+  int rcode = pn_error_vformat(error, code, fmt, ap);
+  va_end(ap);
+  return rcode;
+}
+
+int pn_error_code(pn_error_t *error)
+{
+  assert(error);
+  return error->code;
+}
+
+const char *pn_error_text(pn_error_t *error)
+{
+  assert(error);
+  return error->text;
+}
+
+int pn_error_copy(pn_error_t *error, pn_error_t *src)
+{
+  assert(error);
+  if (src) {
+    return pn_error_set(error, pn_error_code(src), pn_error_text(src));
+  } else {
+    pn_error_clear(error);
+    return 0;
+  }
+}
+
+const char *pn_code(int code)
+{
+  switch (code)
+  {
+  case 0: return "<ok>";
+  case PN_EOS: return "PN_EOS";
+  case PN_ERR: return "PN_ERR";
+  case PN_OVERFLOW: return "PN_OVERFLOW";
+  case PN_UNDERFLOW: return "PN_UNDERFLOW";
+  case PN_STATE_ERR: return "PN_STATE_ERR";
+  case PN_ARG_ERR: return "PN_ARG_ERR";
+  case PN_TIMEOUT: return "PN_TIMEOUT";
+  case PN_INTR: return "PN_INTR";
+  case PN_INPROGRESS: return "PN_INPROGRESS";
+  case PN_OUT_OF_MEMORY: return "PN_OUT_OF_MEMORY";
+  case PN_ABORTED: return "PN_ABORTED";
+  default: return "<unknown>";
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/event.c
----------------------------------------------------------------------
diff --git a/c/src/core/event.c b/c/src/core/event.c
new file mode 100644
index 0000000..c24131d
--- /dev/null
+++ b/c/src/core/event.c
@@ -0,0 +1,410 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <stdio.h>
+#include <proton/object.h>
+#include <proton/event.h>
+#include <proton/reactor.h>
+#include <assert.h>
+
+struct pn_collector_t {
+  pn_list_t *pool;
+  pn_event_t *head;
+  pn_event_t *tail;
+  pn_event_t *prev;         /* event returned by previous call to pn_collector_next() */
+  bool freed;
+};
+
+struct pn_event_t {
+  pn_list_t *pool;
+  const pn_class_t *clazz;
+  void *context;    // depends on clazz
+  pn_record_t *attachments;
+  pn_event_t *next;
+  pn_event_type_t type;
+};
+
+static void pn_collector_initialize(pn_collector_t *collector)
+{
+  collector->pool = pn_list(PN_OBJECT, 0);
+  collector->head = NULL;
+  collector->tail = NULL;
+  collector->prev = NULL;
+  collector->freed = false;
+}
+
+void pn_collector_drain(pn_collector_t *collector)
+{
+  assert(collector);
+  while (pn_collector_next(collector))
+    ;
+  assert(!collector->head);
+  assert(!collector->tail);
+}
+
+static void pn_collector_shrink(pn_collector_t *collector)
+{
+  assert(collector);
+  pn_list_clear(collector->pool);
+}
+
+static void pn_collector_finalize(pn_collector_t *collector)
+{
+  pn_collector_drain(collector);
+  pn_decref(collector->pool);
+}
+
+static int pn_collector_inspect(pn_collector_t *collector, pn_string_t *dst)
+{
+  assert(collector);
+  int err = pn_string_addf(dst, "EVENTS[");
+  if (err) return err;
+  pn_event_t *event = collector->head;
+  bool first = true;
+  while (event) {
+    if (first) {
+      first = false;
+    } else {
+      err = pn_string_addf(dst, ", ");
+      if (err) return err;
+    }
+    err = pn_inspect(event, dst);
+    if (err) return err;
+    event = event->next;
+  }
+  return pn_string_addf(dst, "]");
+}
+
+#define pn_collector_hashcode NULL
+#define pn_collector_compare NULL
+
+PN_CLASSDEF(pn_collector)
+
+pn_collector_t *pn_collector(void)
+{
+  return pn_collector_new();
+}
+
+void pn_collector_free(pn_collector_t *collector)
+{
+  assert(collector);
+  pn_collector_release(collector);
+  pn_decref(collector);
+}
+
+void pn_collector_release(pn_collector_t *collector)
+{
+  assert(collector);
+  if (!collector->freed) {
+    collector->freed = true;
+    pn_collector_drain(collector);
+    pn_collector_shrink(collector);
+  }
+}
+
+pn_event_t *pn_event(void);
+
+pn_event_t *pn_collector_put(pn_collector_t *collector,
+                             const pn_class_t *clazz, void *context,
+                             pn_event_type_t type)
+{
+  if (!collector) {
+    return NULL;
+  }
+
+  assert(context);
+
+  if (collector->freed) {
+    return NULL;
+  }
+
+  pn_event_t *tail = collector->tail;
+  if (tail && tail->type == type && tail->context == context) {
+    return NULL;
+  }
+
+  clazz = clazz->reify(context);
+
+  pn_event_t *event = (pn_event_t *) pn_list_pop(collector->pool);
+
+  if (!event) {
+    event = pn_event();
+  }
+
+  event->pool = collector->pool;
+  pn_incref(event->pool);
+
+  if (tail) {
+    tail->next = event;
+    collector->tail = event;
+  } else {
+    collector->tail = event;
+    collector->head = event;
+  }
+
+  event->clazz = clazz;
+  event->context = context;
+  event->type = type;
+  pn_class_incref(clazz, event->context);
+
+  return event;
+}
+
+pn_event_t *pn_collector_peek(pn_collector_t *collector)
+{
+  return collector->head;
+}
+
+// Advance head pointer for pop or next, return the old head.
+static pn_event_t *pop_internal(pn_collector_t *collector) {
+  pn_event_t *event = collector->head;
+  if (event) {
+    collector->head = event->next;
+    if (!collector->head) {
+      collector->tail = NULL;
+    }
+  }
+  return event;
+}
+
+bool pn_collector_pop(pn_collector_t *collector) {
+  pn_event_t *event = pop_internal(collector);
+  if (event) {
+    pn_decref(event);
+  }
+  return event;
+}
+
+pn_event_t *pn_collector_next(pn_collector_t *collector) {
+  if (collector->prev) {
+    pn_decref(collector->prev);
+  }
+  collector->prev = pop_internal(collector);
+  return collector->prev;
+}
+
+pn_event_t *pn_collector_prev(pn_collector_t *collector) {
+  return collector->prev;
+}
+
+bool pn_collector_more(pn_collector_t *collector)
+{
+  assert(collector);
+  return collector->head && collector->head->next;
+}
+
+static void pn_event_initialize(pn_event_t *event)
+{
+  event->pool = NULL;
+  event->type = PN_EVENT_NONE;
+  event->clazz = NULL;
+  event->context = NULL;
+  event->next = NULL;
+  event->attachments = pn_record();
+}
+
+static void pn_event_finalize(pn_event_t *event) {
+  // decref before adding to the free list
+  if (event->clazz && event->context) {
+    pn_class_decref(event->clazz, event->context);
+  }
+
+  pn_list_t *pool = event->pool;
+
+  if (pool && pn_refcount(pool) > 1) {
+    event->pool = NULL;
+    event->type = PN_EVENT_NONE;
+    event->clazz = NULL;
+    event->context = NULL;
+    event->next = NULL;
+    pn_record_clear(event->attachments);
+    pn_list_add(pool, event);
+  } else {
+    pn_decref(event->attachments);
+  }
+
+  pn_decref(pool);
+}
+
+static int pn_event_inspect(pn_event_t *event, pn_string_t *dst)
+{
+  assert(event);
+  assert(dst);
+  const char *name = pn_event_type_name(event->type);
+  int err;
+  if (name) {
+    err = pn_string_addf(dst, "(%s", pn_event_type_name(event->type));
+  } else {
+    err = pn_string_addf(dst, "(<%u>", (unsigned int) event->type);
+  }
+  if (err) return err;
+  if (event->context) {
+    err = pn_string_addf(dst, ", ");
+    if (err) return err;
+    err = pn_class_inspect(event->clazz, event->context, dst);
+    if (err) return err;
+  }
+
+  return pn_string_addf(dst, ")");
+}
+
+#define pn_event_hashcode NULL
+#define pn_event_compare NULL
+
+PN_CLASSDEF(pn_event)
+
+pn_event_t *pn_event(void)
+{
+  return pn_event_new();
+}
+
+pn_event_type_t pn_event_type(pn_event_t *event)
+{
+  return event ? event->type : PN_EVENT_NONE;
+}
+
+const pn_class_t *pn_event_class(pn_event_t *event)
+{
+  assert(event);
+  return event->clazz;
+}
+
+void *pn_event_context(pn_event_t *event)
+{
+  assert(event);
+  return event->context;
+}
+
+pn_record_t *pn_event_attachments(pn_event_t *event)
+{
+  assert(event);
+  return event->attachments;
+}
+
+const char *pn_event_type_name(pn_event_type_t type)
+{
+  switch (type) {
+  case PN_EVENT_NONE:
+    return "PN_EVENT_NONE";
+  case PN_REACTOR_INIT:
+    return "PN_REACTOR_INIT";
+  case PN_REACTOR_QUIESCED:
+    return "PN_REACTOR_QUIESCED";
+  case PN_REACTOR_FINAL:
+    return "PN_REACTOR_FINAL";
+  case PN_TIMER_TASK:
+    return "PN_TIMER_TASK";
+  case PN_CONNECTION_INIT:
+    return "PN_CONNECTION_INIT";
+  case PN_CONNECTION_BOUND:
+    return "PN_CONNECTION_BOUND";
+  case PN_CONNECTION_UNBOUND:
+    return "PN_CONNECTION_UNBOUND";
+  case PN_CONNECTION_REMOTE_OPEN:
+    return "PN_CONNECTION_REMOTE_OPEN";
+  case PN_CONNECTION_LOCAL_OPEN:
+    return "PN_CONNECTION_LOCAL_OPEN";
+  case PN_CONNECTION_REMOTE_CLOSE:
+    return "PN_CONNECTION_REMOTE_CLOSE";
+  case PN_CONNECTION_LOCAL_CLOSE:
+    return "PN_CONNECTION_LOCAL_CLOSE";
+  case PN_CONNECTION_FINAL:
+    return "PN_CONNECTION_FINAL";
+  case PN_SESSION_INIT:
+    return "PN_SESSION_INIT";
+  case PN_SESSION_REMOTE_OPEN:
+    return "PN_SESSION_REMOTE_OPEN";
+  case PN_SESSION_LOCAL_OPEN:
+    return "PN_SESSION_LOCAL_OPEN";
+  case PN_SESSION_REMOTE_CLOSE:
+    return "PN_SESSION_REMOTE_CLOSE";
+  case PN_SESSION_LOCAL_CLOSE:
+    return "PN_SESSION_LOCAL_CLOSE";
+  case PN_SESSION_FINAL:
+    return "PN_SESSION_FINAL";
+  case PN_LINK_INIT:
+    return "PN_LINK_INIT";
+  case PN_LINK_REMOTE_OPEN:
+    return "PN_LINK_REMOTE_OPEN";
+  case PN_LINK_LOCAL_OPEN:
+    return "PN_LINK_LOCAL_OPEN";
+  case PN_LINK_REMOTE_CLOSE:
+    return "PN_LINK_REMOTE_CLOSE";
+  case PN_LINK_LOCAL_DETACH:
+    return "PN_LINK_LOCAL_DETACH";
+  case PN_LINK_REMOTE_DETACH:
+    return "PN_LINK_REMOTE_DETACH";
+  case PN_LINK_LOCAL_CLOSE:
+    return "PN_LINK_LOCAL_CLOSE";
+  case PN_LINK_FLOW:
+    return "PN_LINK_FLOW";
+  case PN_LINK_FINAL:
+    return "PN_LINK_FINAL";
+  case PN_DELIVERY:
+    return "PN_DELIVERY";
+  case PN_TRANSPORT:
+    return "PN_TRANSPORT";
+  case PN_TRANSPORT_AUTHENTICATED:
+    return "PN_TRANSPORT_AUTHENTICATED";
+  case PN_TRANSPORT_ERROR:
+    return "PN_TRANSPORT_ERROR";
+  case PN_TRANSPORT_HEAD_CLOSED:
+    return "PN_TRANSPORT_HEAD_CLOSED";
+  case PN_TRANSPORT_TAIL_CLOSED:
+    return "PN_TRANSPORT_TAIL_CLOSED";
+  case PN_TRANSPORT_CLOSED:
+    return "PN_TRANSPORT_CLOSED";
+  case PN_SELECTABLE_INIT:
+    return "PN_SELECTABLE_INIT";
+  case PN_SELECTABLE_UPDATED:
+    return "PN_SELECTABLE_UPDATED";
+  case PN_SELECTABLE_READABLE:
+    return "PN_SELECTABLE_READABLE";
+  case PN_SELECTABLE_WRITABLE:
+    return "PN_SELECTABLE_WRITABLE";
+  case PN_SELECTABLE_ERROR:
+    return "PN_SELECTABLE_ERROR";
+  case PN_SELECTABLE_EXPIRED:
+    return "PN_SELECTABLE_EXPIRED";
+  case PN_SELECTABLE_FINAL:
+    return "PN_SELECTABLE_FINAL";
+   case PN_CONNECTION_WAKE:
+    return "PN_CONNECTION_WAKE";
+   case PN_LISTENER_ACCEPT:
+    return "PN_LISTENER_ACCEPT";
+   case PN_LISTENER_CLOSE:
+    return "PN_LISTENER_CLOSE";
+   case PN_PROACTOR_INTERRUPT:
+    return "PN_PROACTOR_INTERRUPT";
+   case PN_PROACTOR_TIMEOUT:
+    return "PN_PROACTOR_TIMEOUT";
+   case PN_PROACTOR_INACTIVE:
+    return "PN_PROACTOR_INACTIVE";
+   case PN_LISTENER_OPEN:
+    return "PN_LISTENER_OPEN";
+   default:
+    return "PN_UNKNOWN";
+  }
+  return NULL;
+}
+
+pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) {
+  return batch->next_event(batch);
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/framing.c
----------------------------------------------------------------------
diff --git a/c/src/core/framing.c b/c/src/core/framing.c
new file mode 100644
index 0000000..9f78666
--- /dev/null
+++ b/c/src/core/framing.c
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include "framing.h"
+
+// TODO: These are near duplicates of code in codec.c - they should be
+// deduplicated.
+static inline void pn_i_write16(char *bytes, uint16_t value)
+{
+    bytes[0] = 0xFF & (value >> 8);
+    bytes[1] = 0xFF & (value     );
+}
+
+
+static inline void pn_i_write32(char *bytes, uint32_t value)
+{
+    bytes[0] = 0xFF & (value >> 24);
+    bytes[1] = 0xFF & (value >> 16);
+    bytes[2] = 0xFF & (value >>  8);
+    bytes[3] = 0xFF & (value      );
+}
+
+static inline uint16_t pn_i_read16(const char *bytes)
+{
+    uint16_t a = (uint8_t) bytes[0];
+    uint16_t b = (uint8_t) bytes[1];
+    uint16_t r = a << 8
+    | b;
+    return r;
+}
+
+static inline uint32_t pn_i_read32(const char *bytes)
+{
+    uint32_t a = (uint8_t) bytes[0];
+    uint32_t b = (uint8_t) bytes[1];
+    uint32_t c = (uint8_t) bytes[2];
+    uint32_t d = (uint8_t) bytes[3];
+    uint32_t r = a << 24
+    | b << 16
+    | c <<  8
+    | d;
+    return r;
+}
+
+
+ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max)
+{
+  if (available < AMQP_HEADER_SIZE) return 0;
+  uint32_t size = pn_i_read32(&bytes[0]);
+  if (max && size > max) return PN_ERR;
+  if (available < size) return 0;
+  unsigned int doff = 4 * (uint8_t)bytes[4];
+  if (doff < AMQP_HEADER_SIZE || doff > size) return PN_ERR;
+
+  frame->size = size - doff;
+  frame->ex_size = doff - AMQP_HEADER_SIZE;
+  frame->type = bytes[5];
+  frame->channel = pn_i_read16(&bytes[6]);
+  frame->extended = bytes + AMQP_HEADER_SIZE;
+  frame->payload = bytes + doff;
+
+  return size;
+}
+
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame)
+{
+  size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size;
+  if (size <= pn_buffer_available(buffer))
+  {
+    // Prepare header
+    char bytes[8];
+    pn_i_write32(&bytes[0], size);
+    int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1;
+    bytes[4] = doff;
+    bytes[5] = frame.type;
+    pn_i_write16(&bytes[6], frame.channel);
+
+    // Write header then rest of frame
+    pn_buffer_append(buffer, bytes, 8);
+    if (frame.extended)
+        pn_buffer_append(buffer, frame.extended, frame.ex_size);
+    pn_buffer_append(buffer, frame.payload, frame.size);
+    return size;
+  } else {
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/framing.h
----------------------------------------------------------------------
diff --git a/c/src/core/framing.h b/c/src/core/framing.h
new file mode 100644
index 0000000..792d664
--- /dev/null
+++ b/c/src/core/framing.h
@@ -0,0 +1,46 @@
+#ifndef PROTON_FRAMING_H
+#define PROTON_FRAMING_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "buffer.h"
+
+#include <proton/import_export.h>
+#include <proton/type_compat.h>
+#include <proton/error.h>
+
+#define AMQP_HEADER_SIZE (8)
+#define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame
+
+typedef struct {
+  uint8_t type;
+  uint16_t channel;
+  size_t ex_size;
+  const char *extended;
+  size_t size;
+  const char *payload;
+} pn_frame_t;
+
+ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max);
+size_t pn_write_frame(pn_buffer_t* buffer, pn_frame_t frame);
+
+#endif /* framing.h */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/log.c
----------------------------------------------------------------------
diff --git a/c/src/core/log.c b/c/src/core/log.c
new file mode 100644
index 0000000..754eed3
--- /dev/null
+++ b/c/src/core/log.c
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/log.h>
+#include <proton/object.h>
+#include <stdio.h>
+#include "log_private.h"
+#include "util.h"
+
+
+static void stderr_logger(const char *message) {
+    fprintf(stderr, "%s\n", message);
+}
+
+static pn_logger_t logger = stderr_logger;
+static int enabled_env  = -1;   /* Set from environment variable. */
+static int enabled_call = -1;   /* set by pn_log_enable */
+
+void pn_log_enable(bool value) {
+    enabled_call = value;
+}
+
+bool pni_log_enabled(void) {
+    if (enabled_call != -1) return enabled_call; /* Takes precedence */
+    if (enabled_env == -1) 
+        enabled_env = pn_env_bool("PN_TRACE_LOG");
+    return enabled_env;
+}
+
+void pn_log_logger(pn_logger_t new_logger) {
+    logger = new_logger;
+    if (!logger) pn_log_enable(false);
+}
+
+void pni_vlogf_impl(const char *fmt, va_list ap) {
+    vfprintf(stderr, fmt, ap);
+    fprintf(stderr, "\n");
+}
+
+/**@internal
+ *
+ * Note: We check pni_log_enabled() in the pn_logf macro *before* calling
+ * pni_logf_impl because evaluating the arguments to that call could have
+ * side-effects with performance impact (e.g. calling functions to construct
+ * complicated messages.) It is important that a disabled log statement results
+ * in nothing more than a call to pni_log_enabled().
+ */
+void pni_logf_impl(const char *fmt, ...) {
+  va_list ap;
+  va_start(ap, fmt);
+  pni_vlogf_impl(fmt, ap);
+  va_end(ap);
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/log_private.h
----------------------------------------------------------------------
diff --git a/c/src/core/log_private.h b/c/src/core/log_private.h
new file mode 100644
index 0000000..75044ed
--- /dev/null
+++ b/c/src/core/log_private.h
@@ -0,0 +1,54 @@
+#ifndef LOG_PRIVATE_H
+#define LOG_PRIVATE_H
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**@file
+ *
+ * Log messages that are not associated with a transport.
+ */
+
+#include <proton/log.h>
+#include <stdarg.h>
+
+/** Log a printf style message */
+#define pn_logf(...)                            \
+    do {                                        \
+        if (pni_log_enabled())                   \
+            pni_logf_impl(__VA_ARGS__);          \
+    } while(0)
+
+/** va_list version of pn_logf */
+#define pn_vlogf(fmt, ap)                       \
+    do {                                        \
+        if (pni_log_enabled())                   \
+            pni_vlogf_impl(fmt, ap);             \
+    } while(0)
+
+/** Return true if logging is enabled. */
+PN_EXTERN bool pni_log_enabled(void);
+
+/**@internal*/
+PN_EXTERN void pni_logf_impl(const char* fmt, ...);
+/**@internal*/
+PN_EXTERN void pni_vlogf_impl(const char *fmt, va_list ap);
+
+
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/37136940/c/src/core/max_align.h
----------------------------------------------------------------------
diff --git a/c/src/core/max_align.h b/c/src/core/max_align.h
new file mode 100644
index 0000000..ff9a6e5
--- /dev/null
+++ b/c/src/core/max_align.h
@@ -0,0 +1,47 @@
+#ifndef MAX_ALIGN_H
+#define MAX_ALIGN_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/type_compat.h>
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#if __STDC_VERSION__ >= 201112
+/* Use standard max_align_t for alignment on c11 */
+typedef max_align_t pn_max_align_t;
+#else
+/* Align on a union of likely largest types for older compilers */
+typedef union pn_max_align_t {
+  uint64_t i;
+  long double d;
+  void *v;
+  void (*fp)(void);
+} pn_max_align_t;
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // MAX_ALIGN_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message