qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [1/2] qpid-proton git commit: PROTON-1062: c++: proton::connection_engine with client and server examples.
Date Sun, 24 Jan 2016 07:19:16 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master e39baba53 -> b0c665448


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_engine.cpp b/proton-c/bindings/cpp/src/connection_engine.cpp
index 2c54725..b287db4 100644
--- a/proton-c/bindings/cpp/src/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/connection_engine.cpp
@@ -21,7 +21,11 @@
 #include "proton/error.hpp"
 #include "proton/handler.hpp"
 
+#include "contexts.hpp"
 #include "messaging_adapter.hpp"
+#include "uuid.hpp"
+#include "msg.hpp"
+#include "proton_bits.hpp"
 #include "messaging_event.hpp"
 #include "proton_bits.hpp"
 #include "uuid.hpp"
@@ -30,106 +34,171 @@
 #include <proton/transport.h>
 #include <proton/event.h>
 
+#include <algorithm>
+
+#include <iosfwd>
+
 namespace proton {
 
-struct connection_engine::impl {
+namespace {
+void set_error(connection_engine_context *ctx_, const std::string& reason) {
+    pn_condition_t *c = pn_transport_condition(ctx_->transport);
+    pn_condition_set_name(c, "io_error");
+    pn_condition_set_description(c, reason.c_str());
+}
 
-    impl(class proton_handler& h, pn_transport_t *t) :
-        handler(h), transport(t), connection(pn_connection()), collector(pn_collector())
-    {}
+void close_transport(connection_engine_context *ctx_) {
+    if (pn_transport_pending(ctx_->transport) >= 0)
+        pn_transport_close_head(ctx_->transport);
+    if (pn_transport_capacity(ctx_->transport) >= 0)
+        pn_transport_close_tail(ctx_->transport);
+}
 
-    ~impl() {
-        pn_transport_free(transport);
-        pn_connection_free(connection);
-        pn_collector_free(collector);
-    }
+std::string  make_id(const std::string s) { return s.empty() ? uuid().str() : s; }
+}
 
-    void check(int err, const std::string& msg) {
-        if (err)
-            throw proton::error(msg + error_str(pn_transport_error(transport), err));
-    }
+connection_engine::container::container(const std::string& s) : id_(make_id(s)) {}
 
-    pn_event_t *peek() { return pn_collector_peek(collector); }
-    void pop() { pn_collector_pop(collector); }
+std::string connection_engine::container::id() const { return id_; }
 
-    class proton_handler& handler;
-    pn_transport_t *transport;
-    pn_connection_t *connection;
-    pn_collector_t * collector;
-};
+connection_options connection_engine::container::make_options() {
+    connection_options opts = options_;
+    opts.container_id(id()).link_prefix(id_gen_.next()+"/");
+    return opts;
+}
 
-connection_engine::connection_engine(handler &h, const std::string& id_) :
-    impl_(new impl(*h.messaging_adapter_.get(), pn_transport())) {
-    if (!impl_->transport || !impl_->connection || !impl_->collector)
-        throw error("connection_engine setup failed");
-    std::string id = id_.empty() ? uuid().str() : id_;
-    pn_connection_set_container(impl_->connection, id.c_str());
-    impl_->check(pn_transport_bind(impl_->transport, impl_->connection), "connection_engine bind: ");
-    pn_connection_collect(impl_->connection, impl_->collector);
+void connection_engine::container::options(const connection_options &opts) {
+    options_ = opts;
 }
 
-connection_engine::~connection_engine() {}
+connection_engine::connection_engine(class handler &h, const connection_options& opts) {
+    connection_ = proton::connection(take_ownership(pn_connection()).get());
+    pn_ptr<pn_transport_t> transport = take_ownership(pn_transport());
+    pn_ptr<pn_collector_t> collector = take_ownership(pn_collector());
+    if (!connection_ || !transport || !collector)
+        throw proton::error("engine create");
+    int err = pn_transport_bind(transport.get(), connection_.pn_object());
+    if (err)
+        throw error(msg() << "transport bind:" << pn_code(err));
+    pn_connection_collect(connection_.pn_object(), collector.get());
+
+    ctx_ = &connection_engine_context::get(connection_); // Creates context
+    ctx_->engine_handler = &h;
+    ctx_->transport = transport.release();
+    ctx_->collector = collector.release();
+    opts.apply(connection_);
+}
 
-buffer<char> connection_engine::input() {
-    ssize_t n = pn_transport_capacity(impl_->transport);
-    if (n <= 0)
-        return buffer<char>();
-    return buffer<char>(pn_transport_tail(impl_->transport), size_t(n));
+connection_engine::~connection_engine() {
+    pn_transport_unbind(ctx_->transport);
+    pn_transport_free(ctx_->transport);
+    pn_ptr<pn_connection_t> c(connection_.pn_object());
+    connection_ = proton::connection();
+    pn_connection_free(c.release());
+    pn_collector_free(ctx_->collector);
 }
 
-void connection_engine::close_input() {
-    pn_transport_close_tail(impl_->transport);
-    run();
+bool connection_engine::process(int flags) {
+    if (closed()) throw closed_error("engine closed");
+    bool ok = process_nothrow(flags);
+    if (!ok && !error_str().empty()) throw io_error(error_str());
+    return ok;
 }
 
-void connection_engine::received(size_t n) {
-    impl_->check(pn_transport_process(impl_->transport, n), "connection_engine process: ");
-    run();
+bool connection_engine::process_nothrow(int flags) {
+    if (closed()) return false;
+    if (flags & WRITE) try_write();
+    dispatch();
+    if (flags & READ) try_read();
+    dispatch();
+
+    if (connection_.closed() && !closed()) {
+        dispatch();
+        while (can_write()) {
+            try_write(); // Flush final data.
+        }
+        // no transport errors.
+        close_transport(ctx_);
+    }
+    if (closed()) {
+        pn_transport_unbind(ctx_->transport);
+        dispatch();
+        try { io_close(); } catch(const io_error&) {} // Tell the IO to close.
+    }
+    return !closed();
 }
 
-void connection_engine::run() {
-    for (pn_event_t *e = impl_->peek(); e; e = impl_->peek()) {
-        switch (pn_event_type(e)) {
-          case PN_CONNECTION_REMOTE_CLOSE:
-            pn_transport_close_tail(impl_->transport);
-            break;
-          case PN_CONNECTION_LOCAL_CLOSE:
-            pn_transport_close_head(impl_->transport);
-            break;
-          default:
-            break;
+void connection_engine::dispatch() {
+    proton_handler& h = *ctx_->engine_handler->messaging_adapter_;
+    pn_collector_t* c = ctx_->collector;
+    for (pn_event_t *e = pn_collector_peek(c); e; e = pn_collector_peek(c)) {
+        if (pn_event_type(e) == PN_CONNECTION_INIT) {
+            // Make the messaging_adapter issue a START event.
+            proton_event(e, PN_REACTOR_INIT, 0).dispatch(h);
         }
-        proton_event pevent(e, pn_event_type(e), 0);
-        pevent.dispatch(impl_->handler);
-        impl_->pop();
+        proton_event(e, pn_event_type(e), 0).dispatch(h);
+        pn_collector_pop(c);
     }
 }
 
-buffer<const char> connection_engine::output() {
-    ssize_t n = pn_transport_pending(impl_->transport);
-    if (n <= 0)
-        return buffer<const char>();
-    return buffer<const char>(pn_transport_head(impl_->transport), size_t(n));
+size_t connection_engine::can_read() const {
+    return std::max(ssize_t(0), pn_transport_capacity(ctx_->transport));
+}
+
+void connection_engine::try_read() {
+    size_t max = can_read();
+    if (max == 0) return;
+    try {
+        size_t n = io_read(pn_transport_tail(ctx_->transport), max);
+        if (n > max)
+            throw io_error(msg() << "read invalid size: " << n << " > " << max);
+        pn_transport_process(ctx_->transport, n);
+    } catch (const closed_error&) {
+        pn_transport_close_tail(ctx_->transport);
+    } catch (const io_error& e) {
+        set_error(ctx_, e.what());
+        pn_transport_close_tail(ctx_->transport);
+    }
 }
 
-void connection_engine::sent(size_t n) {
-    pn_transport_pop(impl_->transport, n);
-    run();
+size_t connection_engine::can_write() const {
+    return std::max(ssize_t(0), pn_transport_pending(ctx_->transport));
 }
 
-void connection_engine::close_output() {
-    pn_transport_close_head(impl_->transport);
-    run();
+void connection_engine::try_write() {
+    size_t max = can_write();
+    if (max == 0) return;
+    try {
+        size_t n = io_write(pn_transport_head(ctx_->transport), max);
+        if (n > max) {
+            throw io_error(msg() << "write invalid size: " << n << " > " << max);
+        }
+        pn_transport_pop(ctx_->transport, n);
+    } catch (const closed_error&) {
+        pn_transport_close_head(ctx_->transport);
+    } catch (const io_error& e) {
+        set_error(ctx_, e.what());
+        pn_transport_close_head(ctx_->transport);
+    }
 }
 
 bool connection_engine::closed() const {
-    return pn_transport_closed(impl_->transport);
+    return pn_transport_closed(ctx_->transport);
 }
 
-class connection connection_engine::connection() const {
-    return impl_->connection;
+std::string connection_engine::error_str() const {
+    pn_condition_t *c = pn_connection_remote_condition(connection_.pn_object());
+    if (!c || !pn_condition_is_set(c)) c = pn_transport_condition(ctx_->transport);
+    if (c && pn_condition_is_set(c)) {
+        std::ostringstream os;
+        os << pn_condition_get_name(c) << ": " << pn_condition_get_description(c);
+        return os.str();
+    }
+    return "";
 }
 
-std::string connection_engine::id() const { return connection().container_id(); }
+connection connection_engine::connection() const { return connection_.pn_object(); }
+
+const connection_options connection_engine::no_opts;
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index 45afda2..121d101 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -51,6 +51,7 @@ class connection_options::impl {
     option<duration> idle_timeout;
     option<duration> heartbeat;
     option<std::string> container_id;
+    option<std::string> link_prefix;
     option<reconnect_timer> reconnect;
     option<class client_domain> client_domain;
     option<class server_domain> server_domain;
@@ -85,11 +86,13 @@ class connection_options::impl {
                         throw error(MSG("error in SSL/TLS peer hostname \"") << peer_hostname.value << '"');
             } else if (!outbound) {
                 pn_acceptor_t *pnp = pn_connection_acceptor(pnc);
-                listener_context &lc(listener_context::get(pnp));
-                if (lc.ssl) {
-                    pn_ssl_t *ssl = pn_ssl(pnt);
-                    if (pn_ssl_init(ssl, server_domain.value.pn_domain(), NULL))
-                        throw error(MSG("server SSL/TLS initialization error"));
+                if (pnp) {
+                    listener_context &lc(listener_context::get(pnp));
+                    if (lc.ssl) {
+                        pn_ssl_t *ssl = pn_ssl(pnt);
+                        if (pn_ssl_init(ssl, server_domain.value.pn_domain(), NULL))
+                            throw error(MSG("server SSL/TLS initialization error"));
+                    }
                 }
             }
 
@@ -121,6 +124,8 @@ class connection_options::impl {
                 outbound->reconnect_timer(reconnect.value);
             if (container_id.set)
                 pn_connection_set_container(pnc, container_id.value.c_str());
+            if (link_prefix.set)
+                connection_context::get(pnc).link_gen.prefix(link_prefix.value);
         }
     }
 
@@ -131,6 +136,7 @@ class connection_options::impl {
         idle_timeout.override(x.idle_timeout);
         heartbeat.override(x.heartbeat);
         container_id.override(x.container_id);
+        link_prefix.override(x.link_prefix);
         reconnect.override(x.reconnect);
         client_domain.override(x.client_domain);
         server_domain.override(x.server_domain);
@@ -164,6 +170,7 @@ connection_options& connection_options::max_channels(uint16_t n) { impl_->max_fr
 connection_options& connection_options::idle_timeout(duration t) { impl_->idle_timeout = t; return *this; }
 connection_options& connection_options::heartbeat(duration t) { impl_->heartbeat = t; return *this; }
 connection_options& connection_options::container_id(const std::string &id) { impl_->container_id = id; return *this; }
+connection_options& connection_options::link_prefix(const std::string &id) { impl_->link_prefix = id; return *this; }
 connection_options& connection_options::reconnect(const reconnect_timer &rc) { impl_->reconnect = rc; return *this; }
 connection_options& connection_options::client_domain(const class client_domain &c) { impl_->client_domain = c; return *this; }
 connection_options& connection_options::server_domain(const class server_domain &c) { impl_->server_domain = c; return *this; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/connector.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connector.cpp b/proton-c/bindings/cpp/src/connector.cpp
index 4e5139c..e69559d 100644
--- a/proton-c/bindings/cpp/src/connector.cpp
+++ b/proton-c/bindings/cpp/src/connector.cpp
@@ -61,7 +61,6 @@ void connector::reconnect_timer(const class reconnect_timer &rt) {
 }
 
 void connector::connect() {
-    connection_.container_id(connection_.container().id());
     connection_.host(address_.host_port());
     pn_transport_t *pnt = pn_transport();
     transport t(pnt);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp
index 8954441..8527b68 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -124,10 +124,9 @@ pn_ptr<pn_handler_t> container_impl::cpp_handler(proton_handler *h) {
 }
 
 container_impl::container_impl(container& c, messaging_adapter *h, const std::string& id) :
-    container_(c), reactor_(reactor::create()), handler_(h), id_(id),
-    link_id_(0)
+    container_(c), reactor_(reactor::create()), handler_(h),
+    id_(id.empty() ? uuid().str() : id), id_gen_()
 {
-    if (id_.empty()) id_ = uuid().str();
     container_context::set(reactor_, container_);
 
     // Set our own global handler that "subclasses" the existing one
@@ -139,7 +138,6 @@ container_impl::container_impl(container& c, messaging_adapter *h, const std::st
         reactor_.pn_handler(cpp_handler(handler_).get());
     }
 
-
     // Note: we have just set up the following handlers that see events in this order:
     // messaging_handler (Proton C events), pn_flowcontroller (optional), messaging_adapter,
     // messaging_handler (Messaging events from the messaging_adapter, i.e. the delegate),
@@ -158,8 +156,10 @@ connection container_impl::connect(const proton::url &url, const connection_opti
     pn_unique_ptr<connector> ctor(new connector(conn, opts));
     ctor->address(url);  // TODO: url vector
     connection_context& cc(connection_context::get(conn));
-    cc.container_impl = this;
     cc.handler.reset(ctor.release());
+    cc.link_gen.prefix(id_gen_.next() + "/");
+    pn_connection_set_container(conn.pn_object(), id_.c_str());
+
     conn.open();
     return conn;
 }
@@ -171,7 +171,7 @@ sender container_impl::open_sender(const proton::url &url, const proton::link_op
     copts.override(o2);
     connection conn = connect(url, copts);
     std::string path = url.path();
-    sender snd = conn.default_session().create_sender(id_ + '-' + path);
+    sender snd = conn.default_session().create_sender();
     snd.local_target().address(path);
     snd.open(lopts);
     return snd;
@@ -184,7 +184,7 @@ receiver container_impl::open_receiver(const proton::url &url, const proton::lin
     copts.override(o2);
     connection conn = connect(url, copts);
     std::string path = url.path();
-    receiver rcv = conn.default_session().create_receiver(id_ + '-' + path);
+    receiver rcv = conn.default_session().create_receiver();
     rcv.local_source().address(path);
     rcv.open(lopts);
     return rcv;
@@ -208,13 +208,6 @@ acceptor container_impl::listen(const proton::url& url, const connection_options
     return acceptor(acptr);
 }
 
-std::string container_impl::next_link_name() {
-    std::ostringstream s;
-    // TODO aconway 2015-09-01: atomic operation
-    s << std::hex << ++link_id_ << "@" << id_;
-    return s.str();
-}
-
 task container_impl::schedule(int delay, proton_handler *h) {
     pn_ptr<pn_handler_t> task_handler;
     if (h)
@@ -237,6 +230,8 @@ void container_impl::link_options(const proton::link_options &opts) {
 void container_impl::configure_server_connection(connection &c) {
     pn_acceptor_t *pnp = pn_connection_acceptor(connection_options::pn_connection(c));
     listener_context &lc(listener_context::get(pnp));
+    connection_context::get(c).link_gen.prefix(id_gen_.next() + "/");
+    pn_connection_set_container(c.pn_object(), id_.c_str());
     lc.connection_options.apply(c);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp
index d46250e..8d4eb9d 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -27,6 +27,7 @@
 #include "proton/link.hpp"
 #include "proton/duration.hpp"
 #include "proton/reactor.hpp"
+#include "proton/id_generator.hpp"
 
 #include "proton_handler.hpp"
 
@@ -76,7 +77,7 @@ class container_impl
     pn_unique_ptr<proton_handler> override_handler_;
     pn_unique_ptr<proton_handler> flow_controller_;
     std::string id_;
-    uint64_t link_id_;
+    id_generator id_gen_;
     connection_options client_connection_options_;
     connection_options server_connection_options_;
     proton::link_options link_options_;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index fba68c6..4a7c569 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -63,21 +63,15 @@ T* get_context(pn_record_t* record, pn_handle_t handle) {
 }
 
 context::~context() {}
+
 void *context::alloc(size_t n) { return pn_object_new(&cpp_context_class, n); }
+
 pn_class_t* context::pn_class() { return &cpp_context_class; }
 
-connection_context& connection_context::get(pn_connection_t* c) {
-    connection_context* ctx =
-        get_context<connection_context>(pn_connection_attachments(c), CONNECTION_CONTEXT);
-    if (!ctx) {
-        ctx =  context::create<connection_context>();
-        set_context(pn_connection_attachments(c), CONNECTION_CONTEXT, context::pn_class(), ctx);
-        pn_decref(ctx);
-    }
-    return *ctx;
-}
 
-connection_context& connection_context::get(const connection& c) { return get(c.pn_object()); }
+context::id connection_context::id(pn_connection_t* c) {
+    return context::id(pn_connection_attachments(c), CONNECTION_CONTEXT);
+}
 
 void container_context::set(const reactor& r, container& c) {
     set_context(pn_reactor_attachments(r.pn_object()), CONTAINER_CONTEXT, PN_VOID, &c);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index 98dd328..e558158 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -26,6 +26,8 @@
 #include "proton/message.hpp"
 #include "proton/connection.hpp"
 #include "proton/container.hpp"
+#include "proton/connection_engine.hpp"
+#include "proton/id_generator.hpp"
 
 #include "proton_handler.hpp"
 
@@ -37,37 +39,75 @@ struct pn_acceptor_t;
 namespace proton {
 
 class proton_handler;
-class container_impl;
 
 // Base class for C++ classes that are used as proton contexts.
-// contexts are pn_objects managed by pn reference counts.
+// Contexts are pn_objects managed by pn reference counts, the C++ value is allocated in-place.
 class context {
   public:
+    // identifies a context, contains a record pointer and a handle.
+    typedef std::pair<pn_record_t*, pn_handle_t> id;
+
     virtual ~context();
 
-    // Allocate a default-constructed T as a proton object. T must be a subclass of context.
+    // Allocate a default-constructed T as a proton object.
+    // T must be a subclass of context.
     template <class T> static T *create() { return new(alloc(sizeof(T))) T(); }
 
-    // Allocate a copy-constructed T as a proton object. T must be a subclass of context.
-    template <class T> static T *create(const T& x) { return new(alloc(sizeof(T))) T(x); }
-
+    // The pn_class for a context
     static pn_class_t* pn_class();
 
+    // Get the context identified by id as a C++ T*, return null pointer if not present.
+    template <class T> static T* ptr(id id_) {
+        return reinterpret_cast<T*>(pn_record_get(id_.first, id_.second));
+    }
+
+    // If the context is not present, create it with value x.
+    template <class T> static T& ref(id id_) {
+        T* ctx = context::ptr<T>(id_);
+        if (!ctx) {
+            ctx = create<T>();
+            pn_record_def(id_.first, id_.second, pn_class());
+            pn_record_set(id_.first, id_.second, ctx);
+            pn_decref(ctx);
+        }
+        return *ctx;
+    }
+
   private:
     static void *alloc(size_t n);
 };
 
+// Connection context used by all connections.
 class connection_context : public context {
   public:
-    static connection_context& get(pn_connection_t*);
-    static connection_context& get(const connection&);
+    connection_context() : default_session(0) {}
 
-    connection_context() : default_session(0), container_impl(0) {}
+    // Used by all connections
+    pn_session_t *default_session; // Owned by connection.
+    message event_message;      // re-used by messaging_adapter for performance.
+    id_generator link_gen;      // Link name generator.
 
     pn_unique_ptr<proton_handler> handler;
-    pn_session_t *default_session;   // Owned by connection
-    class container_impl* container_impl;
-    message event_message;  // re-used by messaging_adapter for performance
+
+    static connection_context& get(pn_connection_t *c) { return ref<connection_context>(id(c)); }
+    static connection_context& get(const connection& c) { return ref<connection_context>(id(c)); }
+
+  protected:
+    static context::id id(pn_connection_t*);
+    static context::id id(const connection& c) { return id(c.pn_object()); }
+};
+
+// Connection context with information used by the connection_engine.
+class connection_engine_context : public connection_context {
+  public:
+    connection_engine_context() :  engine_handler(0), transport(0), collector(0) {}
+
+    class handler *engine_handler;
+    pn_transport_t  *transport;
+    pn_collector_t  *collector;
+    static connection_engine_context& get(const connection &c) {
+        return ref<connection_engine_context>(id(c));
+    }
 };
 
 void container_context(const reactor&, container&);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/encoder.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/encoder.cpp b/proton-c/bindings/cpp/src/encoder.cpp
index acf4db9..58b41b3 100644
--- a/proton-c/bindings/cpp/src/encoder.cpp
+++ b/proton-c/bindings/cpp/src/encoder.cpp
@@ -28,6 +28,8 @@
 
 #include <proton/codec.h>
 
+#include <algorithm>
+
 namespace proton {
 
 namespace {
@@ -63,7 +65,8 @@ bool encoder::encode(char* buffer, size_t& size) {
 }
 
 void encoder::encode(std::string& s) {
-    size_t size = s.size();
+    s.resize(std::max(s.capacity(), size_t(1))); // Use full capacity, ensure not empty
+	size_t size = s.size();
     if (!encode(&s[0], size)) {
         s.resize(size);
         encode(&s[0], size);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
new file mode 100644
index 0000000..1456331
--- /dev/null
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#include "test_bits.hpp"
+#include "uuid.hpp"
+#include <proton/connection_engine.hpp>
+#include <proton/handler.hpp>
+#include <proton/event.hpp>
+#include <proton/types.hpp>
+#include <deque>
+#include <algorithm>
+
+using namespace std;
+using namespace proton;
+using namespace test;
+
+// One end of an in-memory connection
+struct mem_pipe {
+    mem_pipe(deque<char>& r, deque<char>& w) : read(r), write(w) {}
+    struct deque<char>  &read, &write;
+};
+
+struct mem_queues : public pair<deque<char>, deque<char> > {
+    mem_pipe a() { return mem_pipe(first, second); }
+    mem_pipe b() { return mem_pipe(second, first); }
+};
+
+// In memory connection_engine
+struct mem_engine : public connection_engine {
+    mem_pipe socket;
+    std::string read_error;
+    std::string write_error;
+
+    mem_engine(mem_pipe s, handler &h, const connection_options &opts)
+        : connection_engine(h, opts), socket(s) {}
+
+    size_t io_read(char* buf, size_t size) {
+        if (!read_error.empty()) throw io_error(read_error);
+        size = min(socket.read.size(), size);
+        copy(socket.read.begin(), socket.read.begin()+size, buf);
+        socket.read.erase(socket.read.begin(), socket.read.begin()+size);
+        return size;
+    }
+
+    size_t io_write(const char* buf, size_t size) {
+        if (!write_error.empty()) throw io_error(write_error);
+        socket.write.insert(socket.write.begin(), buf, buf+size);
+        return size;
+    }
+
+    void io_close() {
+        read_error = write_error = "closed";
+    }
+};
+
+struct debug_handler : handler {
+    void on_unhandled(event& e) {
+        std::cout << e.name() << std::endl;
+    }
+};
+
+struct record_handler : handler {
+    vector<string> events;
+    void on_unhandled(event& e) {
+        events.push_back(e.name());
+    }
+};
+
+template <class HA=record_handler, class HB=record_handler> struct engine_pair {
+    connection_engine::container cont;
+    mem_queues queues;
+    HA ha;
+    HB hb;
+    mem_engine a, b;
+    engine_pair() : a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options()) {}
+    engine_pair(const std::string& id)
+        : cont(id), a(queues.a(), ha, cont.make_options()), b(queues.b(), hb, cont.make_options())
+    {}
+    engine_pair(const connection_options &aopts, connection_options &bopts)
+        : a(queues.a(), ha, aopts), b(queues.b(), hb, bopts)
+    {}
+
+    void process() { a.process(); b.process(); }
+};
+
+void test_process_amqp() {
+    engine_pair<> e;
+
+    e.a.process(connection_engine::READ); // Don't write unlesss writable
+    ASSERT(e.a.socket.write.empty());
+    e.a.process(connection_engine::WRITE);
+
+    string wrote(e.a.socket.write.begin(), e.a.socket.write.end());
+    e.a.process(connection_engine::WRITE);
+    ASSERT_EQUAL(8, wrote.size());
+    ASSERT_EQUAL("AMQP", wrote.substr(0,4));
+
+    e.b.process();              // Read and write AMQP
+    ASSERT_EQUAL("AMQP", string(e.b.socket.write.begin(), e.b.socket.write.begin()+4));
+    ASSERT(e.b.socket.read.empty());
+    ASSERT(e.a.socket.write.empty());
+    ASSERT_EQUAL(many<string>() + "START", e.ha.events);
+}
+
+
+struct link_handler : public record_handler {
+    std::deque<link> links;
+    void on_link_open(event& e) {
+        links.push_back(e.link());
+    }
+
+    link pop() {
+        link l;
+        if (!links.empty()) {
+            l = links.front();
+            links.pop_front();
+        }
+        return l;
+    }
+};
+
+void test_engine_prefix() {
+    engine_pair<link_handler, link_handler> e(
+        connection_options().container_id("a").link_prefix("a/"),
+        connection_options().container_id("b").link_prefix("b/"));
+    e.a.connection().open();
+
+    e.a.connection().open_sender("x");
+    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
+    ASSERT_EQUAL("a/1", e.ha.pop().name());
+    ASSERT_EQUAL("a/1", e.hb.pop().name());
+
+    e.a.connection().open_receiver("y");
+    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
+    ASSERT_EQUAL("a/2", e.ha.pop().name());
+    ASSERT_EQUAL("a/2", e.hb.pop().name());
+
+    e.b.connection().open_receiver("z");
+    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
+    ASSERT_EQUAL("b/1", e.ha.pop().name());
+    ASSERT_EQUAL("b/1", e.hb.pop().name());
+}
+
+void test_container_prefix() {
+    engine_pair<link_handler, link_handler> e;
+    e.a.connection().open();
+
+    e.a.connection().open_sender("x");
+    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
+    ASSERT_EQUAL("1/1", e.ha.pop().name());
+    ASSERT_EQUAL("1/1", e.hb.pop().name());
+
+    e.a.connection().open_receiver("y");
+    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
+    ASSERT_EQUAL("1/2", e.ha.pop().name());
+    ASSERT_EQUAL("1/2", e.hb.pop().name());
+
+    e.b.connection().open_receiver("z");
+    while (e.ha.links.size() + e.hb.links.size() < 2) e.process();
+    ASSERT_EQUAL("2/1", e.ha.pop().name());
+    ASSERT_EQUAL("2/1", e.hb.pop().name());
+
+    // TODO aconway 2016-01-22: check we respect name set in link-options.
+};
+
+int main(int, char**) {
+    int failed = 0;
+    RUN_TEST(failed, test_process_amqp());
+    RUN_TEST(failed, test_engine_prefix());
+    RUN_TEST(failed, test_container_prefix());
+    return failed;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/error.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/error.cpp b/proton-c/bindings/cpp/src/error.cpp
index d33bb72..1dedab1 100644
--- a/proton-c/bindings/cpp/src/error.cpp
+++ b/proton-c/bindings/cpp/src/error.cpp
@@ -21,9 +21,7 @@
 
 namespace proton {
 
-static const std::string prefix("proton: ");
-
-error::error(const std::string& msg) : std::runtime_error(prefix+msg) {}
+error::error(const std::string& msg) : std::runtime_error(msg) {}
 
 timeout_error::timeout_error(const std::string& msg) : error(msg) {}
 
@@ -31,5 +29,9 @@ decode_error::decode_error(const std::string& msg) : error("decode: "+msg) {}
 
 encode_error::encode_error(const std::string& msg) : error("encode: "+msg) {}
 
+io_error::io_error(const std::string& msg) : error(msg) {}
+
+closed_error::closed_error(const std::string& msg) : io_error(msg) {}
+const std::string closed_error::default_msg("closed");
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/id_generator.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/id_generator.cpp b/proton-c/bindings/cpp/src/id_generator.cpp
new file mode 100644
index 0000000..0b0917c
--- /dev/null
+++ b/proton-c/bindings/cpp/src/id_generator.cpp
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/id_generator.hpp"
+#include <sstream>
+
+namespace proton {
+
+id_generator::id_generator(const std::string& s) : prefix_(s), count_(0) {}
+
+void id_generator::prefix(const std::string& s) { prefix_ = s; }
+
+std::string id_generator::next() {
+    // TODO aconway 2016-01-19: more efficient conversion, fixed buffer.
+    std::ostringstream o;
+    o << prefix_ << std::hex << ++count_;
+    return o.str();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/interop_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/interop_test.cpp b/proton-c/bindings/cpp/src/interop_test.cpp
index 21a748f..fe2d58c 100644
--- a/proton-c/bindings/cpp/src/interop_test.cpp
+++ b/proton-c/bindings/cpp/src/interop_test.cpp
@@ -29,6 +29,7 @@
 
 using namespace std;
 using namespace proton;
+using namespace test;
 
 std::string tests_dir;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp
index deab0b2..47bfb09 100644
--- a/proton-c/bindings/cpp/src/link.cpp
+++ b/proton-c/bindings/cpp/src/link.cpp
@@ -81,8 +81,7 @@ class session link::session() const {
 
 void link::handler(proton_handler &h) {
     pn_record_t *record = pn_link_attachments(pn_object());
-    connection_context& cc(connection_context::get(connection()));
-    pn_ptr<pn_handler_t> chandler = cc.container_impl->cpp_handler(&h);
+    pn_ptr<pn_handler_t> chandler = connection().container().impl_->cpp_handler(&h);
     pn_record_set_handler(record, chandler.get());
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/message_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/message_test.cpp b/proton-c/bindings/cpp/src/message_test.cpp
index 0e25bb5..988aa34 100644
--- a/proton-c/bindings/cpp/src/message_test.cpp
+++ b/proton-c/bindings/cpp/src/message_test.cpp
@@ -27,6 +27,7 @@
 
 using namespace std;
 using namespace proton;
+using namespace test;
 
 
 #define CHECK_STR(ATTR) \

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/messaging_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_event.hpp b/proton-c/bindings/cpp/src/messaging_event.hpp
index 64e8975..381d375 100644
--- a/proton-c/bindings/cpp/src/messaging_event.hpp
+++ b/proton-c/bindings/cpp/src/messaging_event.hpp
@@ -67,6 +67,7 @@ class messaging_event : public event
     };
 
     messaging_event(event_type t, proton_event &parent);
+    messaging_event(event_type t, pn_event_t*);
     ~messaging_event();
 
     PN_CPP_EXTERN class container& container() const;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/posix/io.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/posix/io.cpp b/proton-c/bindings/cpp/src/posix/io.cpp
new file mode 100644
index 0000000..9e17fd0
--- /dev/null
+++ b/proton-c/bindings/cpp/src/posix/io.cpp
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+#include <proton/io.hpp>
+#include <proton/url.hpp>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+namespace proton {
+namespace io {
+
+const descriptor INVALID_DESCRIPTOR = -1;
+
+std::string error_str() {
+#ifdef USE_STRERROR_R
+    char buf[256];
+    strerror_r(errno, buf, sizeof(buf));
+    return buf;
+#elifdef USE_STRERROR_S
+    char buf[256];
+    strerror_s(buf, sizeof(buf), errno);
+    return buf;
+#elifdef USE_OLD_STRERROR
+    char buf[256];
+    strncpy(buf, strerror(errno), sizeof(buf));
+    return buf;
+#else
+    std::ostringstream os;
+    os <<  "system error (" << errno << ")";
+    return os.str();
+#endif
+}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+    if (result < 0) throw io_error(msg + error_str());
+    return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+    if (result) throw io_error(msg + gai_strerror(result));
+}
+
+}
+
+void socket_engine::init() {
+    check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock: ");
+}
+
+socket_engine::socket_engine(descriptor fd, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(fd)
+{
+    init();
+}
+
+socket_engine::socket_engine(const url& u, handler& h, const connection_options& opts)
+    : connection_engine(h, opts), socket_(connect(u))
+{
+    init();
+}
+
+size_t socket_engine::io_read(char *buf, size_t size) {
+    ssize_t n = ::read(socket_, buf, size);
+    if (n > 0)
+        return n;
+    if (n == 0)
+        throw proton::closed_error();
+    if (errno == EAGAIN || errno == EWOULDBLOCK)
+        return 0;
+    check(n, "read: ");
+    return n;
+}
+
+size_t socket_engine::io_write(const char *buf, size_t size) {
+    ssize_t n = ::write(socket_, buf, size);
+    if (n == EAGAIN || n == EWOULDBLOCK) return 0;
+    if (n < 0) check(n, "write: ");
+    return n;
+}
+
+void socket_engine::io_close() { ::close(socket_); }
+
+void socket_engine::run() {
+    fd_set self;
+    FD_ZERO(&self);
+    FD_SET(socket_, &self);
+    while (!closed()) {
+        process();
+        if (!closed()) {
+            int n = select(FD_SETSIZE,
+                           can_read() ? &self : NULL,
+                           can_write() ? &self : NULL,
+                           NULL, NULL);
+            check(n, "select: ");
+        }
+    }
+}
+
+namespace {
+struct auto_addrinfo {
+    struct addrinfo *ptr;
+    auto_addrinfo() : ptr(0) {}
+    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+    addrinfo* operator->() const { return ptr; }
+};
+}
+
+descriptor connect(const proton::url& u) {
+    descriptor fd = INVALID_DESCRIPTOR;
+    try{
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(),
+                                u.port().empty() ? 0 : u.port().c_str(),
+                                0, &addr.ptr));
+        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: ");
+        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+        return fd;
+    } catch (...) {
+        if (fd >= 0) close(fd);
+        throw;
+    }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR) {
+    try {
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+                  "listener address invalid: ");
+        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: ");
+        int yes = 1;
+        check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt: ");
+        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: ");
+        check(::listen(socket_, 32), "listen: ");
+    } catch (...) {
+        if (socket_ >= 0) close(socket_);
+        throw;
+    }
+}
+
+listener::~listener() { ::close(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+    struct sockaddr_in addr;
+    ::memset(&addr, 0, sizeof(addr));
+    socklen_t size = sizeof(addr);
+    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+    char host[NI_MAXHOST], port[NI_MAXSERV];
+    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+                          host, sizeof(host), port, sizeof(port), 0),
+              "accept invalid remote address: ");
+    host_str = host;
+    port_str = port;
+    return fd;
+}
+
+// Empty stubs, only needed on windows.
+void initialize() {}
+void finalize() {}
+}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/proton_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proton_event.hpp b/proton-c/bindings/cpp/src/proton_event.hpp
index f40e02a..5bee7db 100644
--- a/proton-c/bindings/cpp/src/proton_event.hpp
+++ b/proton-c/bindings/cpp/src/proton_event.hpp
@@ -31,6 +31,7 @@ namespace proton {
 class proton_handler;
 class container;
 class connection;
+class connection_engine;
 
 /** Event information for a proton::proton_handler */
 class proton_event
@@ -290,6 +291,7 @@ class proton_event
     event_type type_;
     class container *container_;
   friend class messaging_event;
+  friend class connection_engine;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/scalar_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/scalar_test.cpp b/proton-c/bindings/cpp/src/scalar_test.cpp
index cb66e41..a9700b0 100644
--- a/proton-c/bindings/cpp/src/scalar_test.cpp
+++ b/proton-c/bindings/cpp/src/scalar_test.cpp
@@ -29,6 +29,7 @@
 
 using namespace std;
 using namespace proton;
+using namespace test;
 
 // Inserting and extracting simple C++ values.
 template <class T> void type_test(T x, type_id tid, T y) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/session.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/session.cpp b/proton-c/bindings/cpp/src/session.cpp
index 9602e40..6b3628d 100644
--- a/proton-c/bindings/cpp/src/session.cpp
+++ b/proton-c/bindings/cpp/src/session.cpp
@@ -27,6 +27,8 @@
 #include "contexts.hpp"
 #include "container_impl.hpp"
 
+#include <string>
+
 namespace proton {
 
 void session::open() {
@@ -38,19 +40,19 @@ connection session::connection() const {
 }
 
 namespace {
-std::string set_name(const std::string& name, session* s) {
-    if (name.empty())
-        return connection_context::get(s->connection()).container_impl->next_link_name();
-    return name;
+std::string link_name(const std::string& name, session* s) {
+    if (!name.empty()) return name;
+    std::string gen(connection_context::get(s->connection()).link_gen.next());
+    return gen;
 }
 }
 
 receiver session::create_receiver(const std::string& name) {
-    return pn_receiver(pn_object(), set_name(name, this).c_str());
+    return pn_receiver(pn_object(), link_name(name, this).c_str());
 }
 
 sender session::create_sender(const std::string& name) {
-    return pn_sender(pn_object(), set_name(name, this).c_str());
+    return pn_sender(pn_object(), link_name(name, this).c_str());
 }
 
 sender session::open_sender(const std::string &addr, const link_options &lo) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/test_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/test_bits.hpp b/proton-c/bindings/cpp/src/test_bits.hpp
index 49b67e9..39d4be5 100644
--- a/proton-c/bindings/cpp/src/test_bits.hpp
+++ b/proton-c/bindings/cpp/src/test_bits.hpp
@@ -21,16 +21,20 @@
 
 #include <stdexcept>
 #include <iostream>
+#include <vector>
+#include <deque>
+#include <iterator>
 #include <sstream>
 #include "msg.hpp"
 
-namespace {
+namespace test {
 
 struct fail : public std::logic_error { fail(const std::string& what) : logic_error(what) {} };
+
 #define FAIL(WHAT) throw fail(MSG(__FILE__ << ":" << __LINE__ << ": " << WHAT))
 #define ASSERT(TEST) do { if (!(TEST)) FAIL("assert failed: " << #TEST); } while(false)
 #define ASSERT_EQUAL(WANT, GOT) if (!((WANT) == (GOT))) \
-        FAIL(#WANT << " !=  " << #GOT << ": " << WANT << " != " << GOT)
+        FAIL(#WANT << " !=  " << #GOT << ": " << (WANT) << " != " << (GOT))
 
 #define RUN_TEST(BAD_COUNT, TEST)                                       \
     do {                                                                \
@@ -47,5 +51,40 @@ struct fail : public std::logic_error { fail(const std::string& what) : logic_er
 
 template<class T> std::string str(const T& x) { std::ostringstream s; s << x; return s.str(); }
 
+// A way to easily create literal collections that can be compared to std:: collections
+// and to print std collections
+// e.g.
+//     std::vector<string> v = ...;
+//     ASSERT_EQUAL(many<string>() + "a" + "b" + "c", v);
+template <class T> struct many : public std::vector<T> {
+    many() {}
+    template<class S> explicit many(const S& s) : std::vector<T>(s.begin(), s.end()) {}
+    many operator+(const T& t) { many<T> l(*this); l.push_back(t); return l; }
+};
+
+template <class T, class S> bool operator==(const many<T>& m, const S& s) {
+    return S(m.begin(), m.end()) == s;
+}
+
+template <class T, class S> bool operator==(const S& s, const many<T>& m) {
+    return S(m.begin(), m.end()) == s;
+}
+
+template <class T> std::ostream& operator<<(std::ostream& o, const many<T>& m) {
+    std::ostream_iterator<T> oi(o, " ");
+    std::copy(m.begin(), m.end(), oi);
+    return o;
+}
+
+}
+
+namespace std {
+template <class T> std::ostream& operator<<(std::ostream& o, const std::vector<T>& s) {
+    return o << test::many<T>(s);
+}
+
+template <class T> std::ostream& operator<<(std::ostream& o, const std::deque<T>& s) {
+    return o << test::many<T>(s);
+}
 }
 #endif // TEST_BITS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/uuid.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/uuid.cpp b/proton-c/bindings/cpp/src/uuid.cpp
index ca6dec9..f2c3335 100644
--- a/proton-c/bindings/cpp/src/uuid.cpp
+++ b/proton-c/bindings/cpp/src/uuid.cpp
@@ -46,13 +46,35 @@ uuid::uuid() {
     bytes[8] = (bytes[8] & 0x3F) | 0x80;
 }
 
-std::string uuid::str() {
-    // UUID standard format: 8-4-4-4-12 (36 chars, 32 alphanumeric and 4 hypens)
+namespace {
+
+struct ios_guard {
+    std::ios &guarded;
+    std::ios old;
+    ios_guard(std::ios& x) : guarded(x), old(0) { old.copyfmt(guarded); }
+    ~ios_guard() { guarded.copyfmt(old); }
+};
+
+}
+
+/// UUID standard format: 8-4-4-4-12 (36 chars, 32 alphanumeric and 4 hypens)
+std::ostream& operator<<(std::ostream& o, const uuid& u) {
+    ios_guard guard(o);
+    o << std::hex << std::setfill('0');
+    static const int segments[] = {4,2,2,2,6}; // 1 byte is 2 hex chars.
+    const uint8_t *p = u.bytes;
+    for (size_t i = 0; i < sizeof(segments)/sizeof(segments[0]); ++i) {
+        if (i > 0)
+            o << '-';
+        for (int j = 0; j < segments[i]; ++j)
+            o << std::setw(2) << int(*(p++));
+    }
+    return o;
+}
+
+std::string uuid::str() const {
     std::ostringstream s;
-    s << std::hex << std::setw(2) << std::setfill('0');
-    s << b(0) << b(1) << b(2) << b(3);
-    s << '-' << b(4) << b(5) << '-' << b(6) << b(7) << '-' << b(8) << b(9);
-    s << '-' << b(10) << b(11) << b(12) << b(13) << b(14) << b(15);
+    s << *this;
     return s.str();
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/uuid.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/uuid.hpp b/proton-c/bindings/cpp/src/uuid.hpp
index 12e891e..e8ee908 100644
--- a/proton-c/bindings/cpp/src/uuid.hpp
+++ b/proton-c/bindings/cpp/src/uuid.hpp
@@ -22,18 +22,20 @@
 #include <proton/types.hpp>
 
 #include <string>
+#include <iosfwd>
 
 namespace proton {
 
-/// A simple random UUID-like value. Fallback if user does not provide a container id.
+/// A random UUID.
 struct uuid {
-    uuid();
+    PN_CPP_EXTERN uuid();
     uint8_t bytes[16];
-    std::string str();
-  private:
-    int b(int i) { return bytes[i]; }
+    PN_CPP_EXTERN std::string str()  const;
 };
 
+/// UUID standard format: 8-4-4-4-12 (36 chars, 32 alphanumeric and 4 hypens)
+PN_CPP_EXTERN std::ostream& operator<<(std::ostream&, const uuid&);
+
 }
 
 #endif // UUID_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/value_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/value_test.cpp b/proton-c/bindings/cpp/src/value_test.cpp
index 2c6a64c..b19ea76 100644
--- a/proton-c/bindings/cpp/src/value_test.cpp
+++ b/proton-c/bindings/cpp/src/value_test.cpp
@@ -29,6 +29,7 @@
 
 using namespace std;
 using namespace proton;
+using namespace test;
 
 // Inserting and extracting simple C++ values.
 template <class T> void value_test(T x, type_id tid, const std::string& s, T y) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/windows/io.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/windows/io.cpp b/proton-c/bindings/cpp/src/windows/io.cpp
new file mode 100644
index 0000000..ce05e99
--- /dev/null
+++ b/proton-c/bindings/cpp/src/windows/io.cpp
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "msg.hpp"
+#include <proton/io.hpp>
+#include <proton/url.hpp>
+
+#define FD_SETSIZE 2048
+#ifndef _WIN32_WINNT
+#define _WIN32_WINNT 0x0501
+#endif
+#if _WIN32_WINNT < 0x0501
+#error "Proton requires Windows API support for XP or later."
+#endif
+#include <winsock2.h>
+#include <mswsock.h>
+#include <Ws2tcpip.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <assert.h>
+
+namespace proton {
+namespace io {
+
+const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
+
+std::string error_str() {
+    HRESULT code = WSAGetLastError();
+    char err[1024] = {0};
+    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
+                  FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
+    return err;
+}
+
+namespace {
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+    if (result == SOCKET_ERROR)
+        throw io_error(msg + error_str());
+    return result;
+}
+
+void gai_check(int result, const std::string& msg="") {
+    if (result)
+        throw io_error(msg + gai_strerror(result));
+}
+} // namespace
+
+void initialize() {
+    WSADATA unused;
+    check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
+}
+
+void finalize() {
+    WSACleanup();
+}
+
+void socket_engine::init() {
+    u_long nonblock = 1;
+    check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
+}
+
+socket_engine::socket_engine(descriptor fd, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(fd)
+{
+    init();
+}
+
+socket_engine::socket_engine(const url& u, handler& h, const connection_options &opts)
+    : connection_engine(h, opts), socket_(connect(u))
+{
+    init();
+}
+
+size_t socket_engine::io_read(char *buf, size_t size) {
+    int n = ::recv(socket_, buf, size, 0);
+    if (n == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK) return 0;
+    return check(n, "read: ");
+}
+
+size_t socket_engine::io_write(const char *buf, size_t size) {
+    int n = ::send(socket_, buf, size, 0);
+    if (n == SOCKET_ERROR && n == WSAEWOULDBLOCK) return 0;
+    return check(n, "write: ");
+}
+
+void socket_engine::io_close() { ::closesocket(socket_); }
+
+void socket_engine::run() {
+    fd_set self;
+    FD_ZERO(&self);
+    FD_SET(socket_, &self);
+    while (!closed()) {
+        process();
+        if (!closed()) {
+            int n = ::select(FD_SETSIZE,
+                           can_read() ? &self : NULL,
+                           can_write() ? &self : NULL,
+                           NULL, NULL);
+            check(n, "select: ");
+        }
+    }
+}
+
+namespace {
+struct auto_addrinfo {
+    struct addrinfo *ptr;
+    auto_addrinfo() : ptr(0) {}
+    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
+    addrinfo* operator->() const { return ptr; }
+};
+
+static const char *amqp_service(const char *port) {
+  // Help older Windows to know about amqp[s] ports
+  if (port) {
+    if (!strcmp("amqp", port)) return "5672";
+    if (!strcmp("amqps", port)) return "5671";
+  }
+  return port;
+}
+}
+
+descriptor connect(const proton::url& u) {
+    // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
+    std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
+    descriptor fd = INVALID_SOCKET;
+    try{
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                amqp_service(u.port().empty() ? 0 : u.port().c_str()),
+                                0, &addr.ptr),
+                  "connect address invalid: ");
+        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
+        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
+        return fd;
+    } catch (...) {
+        if (fd != INVALID_SOCKET) ::closesocket(fd);
+        throw;
+    }
+}
+
+listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET) {
+    try {
+        auto_addrinfo addr;
+        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
+                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
+                  "listener address invalid: ");
+        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
+        bool yes = true;
+        check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes, sizeof(yes)), "setsockopt: ");
+        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
+        check(::listen(socket_, 32), "listener listen: ");
+    } catch (...) {
+        if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
+        throw;
+    }
+}
+
+listener::~listener() { ::closesocket(socket_); }
+
+descriptor listener::accept(std::string& host_str, std::string& port_str) {
+    struct sockaddr_in addr;
+    ::memset(&addr, 0, sizeof(addr));
+    socklen_t size = sizeof(addr);
+    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
+    char host[NI_MAXHOST], port[NI_MAXSERV];
+    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
+                          host, sizeof(host), port, sizeof(port), 0),
+              "accept invalid remote address: ");
+    host_str = host;
+    port_str = port;
+    return fd;
+}
+
+}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/include/proton/error.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/error.h b/proton-c/include/proton/error.h
index 94ce449..122e4b6 100644
--- a/proton-c/include/proton/error.h
+++ b/proton-c/include/proton/error.h
@@ -29,7 +29,7 @@
 extern "C" {
 #endif
 
-/** A pn_error_t has an int error `code` and some string `text` to describe the error */ 
+/** A pn_error_t has an int error `code` and some string `text` to describe the error */
 typedef struct pn_error_t pn_error_t;
 
 #define PN_OK (0)
@@ -44,7 +44,7 @@ typedef struct pn_error_t pn_error_t;
 #define PN_INPROGRESS (-9)
 #define PN_OUT_OF_MEMORY (-10)
 
-/** @return name of the error code. Returned pointer is to a static constant, do not delete.*/ 
+/** @return name of the error code. Returned pointer is to a static constant, do not delete.*/
 PN_EXTERN const char *pn_code(int code);
 
 PN_EXTERN pn_error_t *pn_error(void);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 052e8b9..cf7e593 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -46,13 +46,13 @@ static ssize_t transport_consume(pn_transport_t *transport);
 /*
  * Call this any time anything happens that may affect channel_max:
  * i.e. when the app indicates a preference, or when we receive the
- * OPEN frame from the remote peer.  And call it to do the final 
- * calculation just before we communicate our limit to the remote 
+ * OPEN frame from the remote peer.  And call it to do the final
+ * calculation just before we communicate our limit to the remote
  * peer by sending our OPEN frame.
  */
 static void pni_calculate_channel_max(pn_transport_t *transport) {
   /*
-   * The application cannot make the limit larger than 
+   * The application cannot make the limit larger than
    * what this library will allow.
    */
   transport->channel_max = (PN_IMPL_CHANNEL_MAX < transport->local_channel_max)
@@ -60,7 +60,7 @@ static void pni_calculate_channel_max(pn_transport_t *transport) {
                            : transport->local_channel_max;
 
   /*
-   * The remote peer's constraint is not valid until the 
+   * The remote peer's constraint is not valid until the
    * peer's open frame has been received.
    */
   if(transport->open_rcvd) {
@@ -409,18 +409,18 @@ static void pn_transport_initialize(void *object)
   transport->remote_max_frame = (uint32_t) 0xffffffff;
 
   /*
-   * We set the local limit on channels to 2^15, because 
+   * We set the local limit on channels to 2^15, because
    * parts of the code use the topmost bit (of a short)
    * as a flag.
-   * The peer that this transport connects to may also 
+   * The peer that this transport connects to may also
    * place its own limit on max channel number, and the
    * application may also set a limit.
-   * The maximum that we use will be the minimum of all 
+   * The maximum that we use will be the minimum of all
    * these constraints.
    */
-  // There is no constraint yet from remote peer, 
+  // There is no constraint yet from remote peer,
   // so set to max possible.
-  transport->remote_channel_max = 65535;  
+  transport->remote_channel_max = 65535;
   transport->local_channel_max  = PN_IMPL_CHANNEL_MAX;
   transport->channel_max        = transport->local_channel_max;
 
@@ -1183,7 +1183,7 @@ int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel,
   int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next);
   if (err) return err;
 
-  // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max -- 
+  // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max --
   // express our displeasure by closing the connection with a framing error.
   if (remote_channel > transport->channel_max) {
     pn_do_error(transport,
@@ -2707,11 +2707,11 @@ uint16_t pn_transport_get_channel_max(pn_transport_t *transport)
 int pn_transport_set_channel_max(pn_transport_t *transport, uint16_t requested_channel_max)
 {
   /*
-   * Once the OPEN frame has been sent, we have communicated our 
+   * Once the OPEN frame has been sent, we have communicated our
    * wishes to the remote client and there is no way to renegotiate.
    * After that point, we do not allow the application to make changes.
-   * Before that point, however, the app is free to either raise or 
-   * lower our local limit.  (But the app cannot raise it above the 
+   * Before that point, however, the app is free to either raise or
+   * lower our local limit.  (But the app cannot raise it above the
    * limit imposed by this library.)
    * The channel-max value will be finalized just before the OPEN frame
    * is sent.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/src/windows/io.c
----------------------------------------------------------------------
diff --git a/proton-c/src/windows/io.c b/proton-c/src/windows/io.c
index 2161ebb..261a56f 100644
--- a/proton-c/src/windows/io.c
+++ b/proton-c/src/windows/io.c
@@ -459,4 +459,3 @@ static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
   closesocket(sock);
   return 0;
 }
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/tests/python/proton_tests/common.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py
index b4bf8fc..eee87c2 100644
--- a/tests/python/proton_tests/common.py
+++ b/tests/python/proton_tests/common.py
@@ -158,7 +158,7 @@ def ensureCanTestExtendedSASL():
 
 class DefaultConfig:
     defines = {}
-    
+
 class Test(TestCase):
   config = DefaultConfig()
 
@@ -496,7 +496,7 @@ class MessengerSenderValgrind(MessengerSenderC):
         if not suppressions:
             suppressions = os.path.join(os.path.dirname(__file__),
                                         "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=1", "--quiet",
+        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
                          "--trace-children=yes", "--leak-check=full",
                          "--suppressions=%s" % suppressions] + self._command
 
@@ -515,7 +515,7 @@ class MessengerReceiverValgrind(MessengerReceiverC):
         if not suppressions:
             suppressions = os.path.join(os.path.dirname(__file__),
                                         "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=1", "--quiet",
+        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
                          "--trace-children=yes", "--leak-check=full",
                          "--suppressions=%s" % suppressions] + self._command
 
@@ -547,7 +547,7 @@ class ReactorSenderValgrind(ReactorSenderC):
         if not suppressions:
             suppressions = os.path.join(os.path.dirname(__file__),
                                         "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=1", "--quiet",
+        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
                          "--trace-children=yes", "--leak-check=full",
                          "--suppressions=%s" % suppressions] + self._command
 
@@ -566,7 +566,6 @@ class ReactorReceiverValgrind(ReactorReceiverC):
         if not suppressions:
             suppressions = os.path.join(os.path.dirname(__file__),
                                         "valgrind.supp" )
-        self._command = [os.environ["VALGRIND"], "--error-exitcode=1", "--quiet",
+        self._command = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
                          "--trace-children=yes", "--leak-check=full",
                          "--suppressions=%s" % suppressions] + self._command
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message