qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-1062: c++: proton::connection_engine with client and server examples.
Date Sun, 24 Jan 2016 07:19:17 GMT
PROTON-1062: c++: proton::connection_engine with client and server examples.

Easier to use proton::connction_engine:
- inherit and override io_read, io_write, io_close to provide IO functionality.
- processing logic (read/write/dispatch) built into connction_engine.

Support for socket IO out of the box
- socket_engine implements socket-based IO
- examples/engine/direct_*.cpp simple single connection servers.
- examples/engine/broker.cpp is a full select-based broker
  - uses same handler as the reactor broker, only changes the IO logic

Full set of engine-based examples using socket IO.

TODO:
- Broker example needs to be ported to windows.
- Documentation& fixes.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b0c66544
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b0c66544
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b0c66544

Branch: refs/heads/master
Commit: b0c6654486a26e16d3877a23fb9580b4b021f50b
Parents: e39baba
Author: Alan Conway <aconway@redhat.com>
Authored: Tue Jan 19 17:16:05 2016 -0500
Committer: Alan Conway <aconway@redhat.com>
Committed: Sun Jan 24 02:17:49 2016 -0500

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |  12 +-
 examples/cpp/README.hpp                         |  11 +-
 examples/cpp/broker.hpp                         |   3 +-
 examples/cpp/engine/CMakeLists.txt              |  58 ++++++
 examples/cpp/engine/broker.cpp                  | 175 ++++++++++++++++
 examples/cpp/engine/client.cpp                  |  96 +++++++++
 examples/cpp/engine/direct_recv.cpp             |  82 ++++++++
 examples/cpp/engine/direct_send.cpp             |  94 +++++++++
 examples/cpp/engine/example_test.py             | 182 ++++++++++++++++
 examples/cpp/engine/helloworld.cpp              |  65 ++++++
 examples/cpp/engine/options.hpp                 | 173 ++++++++++++++++
 examples/cpp/engine/server.cpp                  |  88 ++++++++
 examples/cpp/engine/simple_recv.cpp             |  87 ++++++++
 examples/cpp/engine/simple_send.cpp             |  94 +++++++++
 examples/cpp/select_broker.cpp                  | 184 -----------------
 proton-c/CMakeLists.txt                         |   8 +-
 proton-c/bindings/cpp/CMakeLists.txt            |   9 +-
 .../bindings/cpp/include/proton/connection.hpp  |  11 +-
 .../cpp/include/proton/connection_engine.hpp    | 172 ++++++++-------
 .../cpp/include/proton/connection_options.hpp   |   2 +
 .../bindings/cpp/include/proton/container.hpp   |   6 +-
 proton-c/bindings/cpp/include/proton/error.hpp  |  15 +-
 .../bindings/cpp/include/proton/handler.hpp     |   2 +-
 .../cpp/include/proton/id_generator.hpp         |  41 ++++
 proton-c/bindings/cpp/include/proton/io.hpp     | 119 +++++++++++
 proton-c/bindings/cpp/include/proton/object.hpp |   1 +
 .../bindings/cpp/include/proton/session.hpp     |   4 +-
 proton-c/bindings/cpp/src/connection.cpp        |   9 +-
 proton-c/bindings/cpp/src/connection_engine.cpp | 207 ++++++++++++-------
 .../bindings/cpp/src/connection_options.cpp     |  17 +-
 proton-c/bindings/cpp/src/connector.cpp         |   1 -
 proton-c/bindings/cpp/src/container_impl.cpp    |  23 +--
 proton-c/bindings/cpp/src/container_impl.hpp    |   3 +-
 proton-c/bindings/cpp/src/contexts.cpp          |  16 +-
 proton-c/bindings/cpp/src/contexts.hpp          |  64 ++++--
 proton-c/bindings/cpp/src/encoder.cpp           |   5 +-
 proton-c/bindings/cpp/src/engine_test.cpp       | 189 +++++++++++++++++
 proton-c/bindings/cpp/src/error.cpp             |   8 +-
 proton-c/bindings/cpp/src/id_generator.cpp      |  36 ++++
 proton-c/bindings/cpp/src/interop_test.cpp      |   1 +
 proton-c/bindings/cpp/src/link.cpp              |   3 +-
 proton-c/bindings/cpp/src/message_test.cpp      |   1 +
 proton-c/bindings/cpp/src/messaging_event.hpp   |   1 +
 proton-c/bindings/cpp/src/posix/io.cpp          | 183 ++++++++++++++++
 proton-c/bindings/cpp/src/proton_event.hpp      |   2 +
 proton-c/bindings/cpp/src/scalar_test.cpp       |   1 +
 proton-c/bindings/cpp/src/session.cpp           |  14 +-
 proton-c/bindings/cpp/src/test_bits.hpp         |  43 +++-
 proton-c/bindings/cpp/src/uuid.cpp              |  34 ++-
 proton-c/bindings/cpp/src/uuid.hpp              |  12 +-
 proton-c/bindings/cpp/src/value_test.cpp        |   1 +
 proton-c/bindings/cpp/src/windows/io.cpp        | 193 +++++++++++++++++
 proton-c/include/proton/error.h                 |   4 +-
 proton-c/src/transport/transport.c              |  26 +--
 proton-c/src/windows/io.c                       |   1 -
 tests/python/proton_tests/common.py             |  11 +-
 56 files changed, 2443 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index bb5f9c8..14d5748 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -40,11 +40,6 @@ set(examples
   ssl_client_cert
   encode_decode)
 
-if (NOT WIN32)
-  list(APPEND examples
-    select_broker)
-endif()
-
 foreach(example ${examples})
   add_executable(${example} ${example}.cpp)
   target_link_libraries(${example} ${ProtonCpp_LIBRARIES})
@@ -65,9 +60,4 @@ endif(WIN32)
 add_test(NAME cpp_example_test
   COMMAND ${PYTHON_EXECUTABLE} ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)
 
-set(broker_tests example_test.ExampleTest.test_request_response example_test.ExampleTest.test_simple_send_recv)
-
-if (NOT WIN32)
-  add_test(NAME cpp_example_select_test
-    COMMAND ${PYTHON_EXECUTABLE} ${env_py} -- "PATH=${test_path}" "PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR}" "TEST_BROKER=select_broker" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} -m unittest -v ${broker_tests})
-endif()
+add_subdirectory(engine)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/README.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/README.hpp b/examples/cpp/README.hpp
index 4e94ec0..de65d6e 100644
--- a/examples/cpp/README.hpp
+++ b/examples/cpp/README.hpp
@@ -94,7 +94,7 @@ alternatives.
 
 */
 
-** \example broker.hpp
+/** \example broker.hpp
 
 Common logic for a simple "mini broker" that creates creates queues
 automatically when a client tries to send or subscribe. This file contains
@@ -114,12 +114,3 @@ broker. This broker creates queues automatically when a client tries to send or
 subscribe.
 
 */
-
-/** \example select_broker.cpp
-
-A simple, single-threaded, select-based broker using the `proton::engine`. This
-broker implementation uses the standard `select` call as an illustration of
-how to integrate proton with an external IO "framework", instead of letting
-the `proton::container` manage IO.
-
-*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/broker.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp
index 2b50128..4313bab 100644
--- a/examples/cpp/broker.hpp
+++ b/examples/cpp/broker.hpp
@@ -106,7 +106,8 @@ class queues {
 
     // Get or create a queue.
     virtual queue &get(const std::string &address = std::string()) {
-        if (address.empty()) throw std::runtime_error("empty queue name");
+        if (address.empty())
+            throw std::runtime_error("empty queue name");
         queue*& q = queues_[address];
         if (!q) q = new queue(address);
         return *q;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/CMakeLists.txt b/examples/cpp/engine/CMakeLists.txt
new file mode 100644
index 0000000..ceecc2b
--- /dev/null
+++ b/examples/cpp/engine/CMakeLists.txt
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+find_package(ProtonCpp REQUIRED)
+
+include_directories(${ProtonCpp_INCLUDE_DIRS})
+
+set(examples
+  broker
+  helloworld
+  simple_recv
+  simple_send
+  direct_recv
+  direct_send
+  client
+  server)
+
+foreach(example ${examples})
+  set(extra_source "")
+  if (example EQUAL broker)
+    set(extra_source broker.hpp)
+  endif()
+  add_executable(engine-${example} ${example}.cpp ${extra_source})
+  target_link_libraries(engine-${example} ${ProtonCpp_LIBRARIES})
+  set_source_files_properties(engine-${example}.cpp PROPERTIES COMPILE_FLAGS "${CXX_WARNING_FLAGS}")
+  set_target_properties(engine-${example} PROPERTIES OUTPUT_NAME ${example})
+endforeach()
+
+set(env_py "${CMAKE_SOURCE_DIR}/proton-c/env.py")
+set(test_bin_dir "$<TARGET_FILE_DIR:engine-broker>")
+
+if (WIN32)
+  # Ignore existing path (usualy containting spaces, escape chars).
+  # Choose just enough path for Windows, ';' separated.
+  set(test_path "${test_bin_dir}" "$<TARGET_FILE_DIR:qpid-proton>" "$<TARGET_FILE_DIR:qpid-proton-cpp>")
+else(WIN32)
+  # ':' separated path with test_bin_dir first.
+  set(test_path "${test_bin_dir}:$ENV{PATH}")
+endif(WIN32)
+
+add_test(NAME cpp_example_engine_test
+  COMMAND ${PYTHON_EXECUTABLE} ${env_py} -- "PATH=${test_path}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.py -v)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/broker.cpp b/examples/cpp/engine/broker.cpp
new file mode 100644
index 0000000..518929c
--- /dev/null
+++ b/examples/cpp/engine/broker.cpp
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "../options.hpp"
+#include "../broker.hpp"
+
+#include <iostream>
+
+// FIXME aconway 2016-01-23: port broker to windows, need io::poller.
+#ifdef WIN32
+#include "proton/acceptor.hpp"
+#include "proton/container.hpp"
+#include "proton/value.hpp"
+
+class broker {
+  public:
+    broker(const proton::url& url) : handler_(url, queues_) {}
+
+    proton::handler& handler() { return handler_; }
+
+  private:
+
+    class my_handler : public broker_handler {
+      public:
+        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {}
+
+        void on_start(proton::event &e) {
+            e.container().listen(url_);
+            std::cout << "broker listening on " << url_ << std::endl;
+        }
+
+      private:
+        const proton::url& url_;
+    };
+
+  private:
+    queues queues_;
+    my_handler handler_;
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    proton::url url("0.0.0.0");
+    options opts(argc, argv);
+    opts.add_value(url, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        broker b(url);
+        proton::container(b.handler()).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}
+#else // WIN32
+#include <proton/io.hpp>
+#include <sys/select.h>
+#include <set>
+
+template <class T> T check(T result, const std::string& msg="io_error: ") {
+    if (result < 0)
+        throw proton::io_error(msg + proton::io::error_str());
+    return result;
+}
+
+void fd_set_if(bool on, int fd, fd_set *fds);
+
+class broker {
+    typedef std::set<proton::io::socket_engine*> engines;
+
+    queues queues_;
+    broker_handler handler_;
+    proton::connection_engine::container container_;
+    engines engines_;
+    fd_set reading_, writing_;
+
+  public:
+    broker() : handler_(queues_) {
+        FD_ZERO(&reading_);
+        FD_ZERO(&writing_);
+    }
+
+    ~broker() {
+        for (engines::iterator i = engines_.begin(); i != engines_.end(); ++i)
+            delete *i;
+    }
+
+    void run(const proton::url& url) {
+        proton::io::listener listener(url.host(), url.port());
+        std::cout << "listening on " << url << " fd=" << listener.socket() << std::endl;
+        FD_SET(listener.socket(), &reading_);
+        while(true) {
+            fd_set readable_set = reading_;
+            fd_set writable_set = writing_;
+            check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
+
+            if (FD_ISSET(listener.socket(), &readable_set)) {
+                std::string client_host, client_port;
+                int fd = listener.accept(client_host, client_port);
+                std::cout << "accepted " << client_host << ":" << client_port
+                          << " fd=" << fd << std::endl;
+                engines_.insert(new proton::io::socket_engine(fd, handler_, container_.make_options()));
+                FD_SET(fd, &reading_);
+                FD_SET(fd, &writing_);
+            }
+
+            for (engines::iterator i = engines_.begin(); i != engines_.end(); ) {
+                engines::iterator j = i++;        // Save iterator in case we need to erase it.
+                proton::io::socket_engine *eng = *j;
+                int flags = 0;
+                if (FD_ISSET(eng->socket(), &readable_set))
+                    flags |= proton::io::socket_engine::READ;
+                if (FD_ISSET(eng->socket(), &writable_set))
+                    flags |= proton::io::socket_engine::WRITE;
+                if (flags) eng->process_nothrow(flags);
+                // Set reading/writing bits for next time around
+                fd_set_if(eng->can_read(), eng->socket(), &reading_);
+                fd_set_if(eng->can_write(), eng->socket(), &writing_);
+
+                if (eng->closed()) {
+                    std::cout << "closed fd=" << eng->socket() << " "
+                              << eng->error_str() << std::endl;
+                    engines_.erase(j);
+                    delete eng;
+                }
+            }
+        }
+    }
+};
+
+void fd_set_if(bool on, int fd, fd_set *fds) {
+    if (on)
+        FD_SET(fd, fds);
+    else
+        FD_CLR(fd, fds);
+}
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("0.0.0.0");
+    options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        broker().run(address);
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/client.cpp b/examples/cpp/engine/client.cpp
new file mode 100644
index 0000000..a010d0f
--- /dev/null
+++ b/examples/cpp/engine/client.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+#include "proton/io.hpp"
+#include "proton/url.hpp"
+#include "proton/event.hpp"
+#include "proton/handler.hpp"
+#include "proton/connection.hpp"
+
+#include <iostream>
+#include <vector>
+
+class client : public proton::handler {
+  private:
+    proton::url url;
+    std::vector<std::string> requests;
+    proton::sender sender;
+    proton::receiver receiver;
+
+  public:
+    client(const proton::url &u, const std::vector<std::string>& r) : url(u), requests(r) {}
+
+    void on_start(proton::event &e) {
+        e.connection().open();
+        sender = e.connection().open_sender(url.path());
+        receiver = e.connection().open_receiver("", proton::link_options().dynamic_address(true));
+    }
+
+    void send_request() {
+        proton::message req;
+        req.body(requests.front());
+        req.reply_to(receiver.remote_source().address());
+        sender.send(req);
+    }
+
+    void on_link_open(proton::event &e) {
+        if (e.link() == receiver)
+            send_request();
+    }
+
+    void on_message(proton::event &e) {
+        if (requests.empty()) return; // Spurious extra message!
+        proton::message& response = e.message();
+        std::cout << requests.front() << " => " << response.body() << std::endl;
+        requests.erase(requests.begin());
+        if (!requests.empty()) {
+            send_request();
+        } else {
+            e.connection().close();
+        }
+    }
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("127.0.0.1:5672/examples");
+    options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+
+    try {
+        opts.parse();
+
+        std::vector<std::string> requests;
+        requests.push_back("Twas brillig, and the slithy toves");
+        requests.push_back("Did gire and gymble in the wabe.");
+        requests.push_back("All mimsy were the borogroves,");
+        requests.push_back("And the mome raths outgrabe.");
+        client handler(address, requests);
+        proton::io::socket_engine(address, handler).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/direct_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_recv.cpp b/examples/cpp/engine/direct_recv.cpp
new file mode 100644
index 0000000..3579310
--- /dev/null
+++ b/examples/cpp/engine/direct_recv.cpp
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include "proton/io.hpp"
+#include "proton/event.hpp"
+#include "proton/handler.hpp"
+#include "proton/link.hpp"
+#include "proton/url.hpp"
+#include "proton/value.hpp"
+
+#include <iostream>
+#include <map>
+
+class direct_recv : public proton::handler {
+  private:
+    uint64_t expected;
+    uint64_t received;
+
+  public:
+    direct_recv(int c) : expected(c), received(0) {}
+
+    void on_start(proton::event &e) {
+        e.connection().open();
+    }
+
+    void on_message(proton::event &e) {
+        proton::message& msg = e.message();
+        if (msg.id().get<uint64_t>() < received)
+            return; // ignore duplicate
+        if (expected == 0 || received < expected) {
+            std::cout << msg.body() << std::endl;
+            received++;
+        }
+        if (received == expected) {
+            e.receiver().close();
+            e.connection().close();
+        }
+    }
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("127.0.0.1:5672/examples");
+    int message_count = 100;
+    options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "listen and receive on URL", "URL");
+    opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
+    try {
+        opts.parse();
+        proton::url url(address);
+        proton::io::listener listener(url.host(), url.port());
+        std::cout << "direct_recv listening on " << url << std::endl;
+        direct_recv handler(message_count);
+        proton::io::socket_engine(listener.accept(), handler).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/direct_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/direct_send.cpp b/examples/cpp/engine/direct_send.cpp
new file mode 100644
index 0000000..3659aea
--- /dev/null
+++ b/examples/cpp/engine/direct_send.cpp
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include "proton/acceptor.hpp"
+#include "proton/connection.hpp"
+#include "proton/io.hpp"
+#include "proton/url.hpp"
+#include "proton/event.hpp"
+#include "proton/handler.hpp"
+#include "proton/value.hpp"
+
+#include <iostream>
+#include <map>
+
+class simple_send : public proton::handler {
+  private:
+    int sent;
+    int confirmed;
+    int total;
+  public:
+    simple_send(int c) : sent(0), confirmed(0), total(c) {}
+
+    void on_start(proton::event &e) {
+        e.connection().open();
+    }
+
+    void on_sendable(proton::event &e) {
+        proton::sender sender = e.sender();
+        while (sender.credit() && sent < total) {
+            proton::message msg;
+            msg.id(sent + 1);
+            std::map<std::string, int> m;
+            m["sequence"] = sent+1;
+            msg.body(m);
+            sender.send(msg);
+            sent++;
+        }
+    }
+
+    void on_delivery_accept(proton::event &e) {
+        confirmed++;
+        if (confirmed == total) {
+            std::cout << "all messages confirmed" << std::endl;
+            e.connection().close();
+        }
+    }
+
+    void on_disconnect(proton::event &e) {
+        sent = confirmed;
+    }
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("127.0.0.1:5672/examples");
+    int message_count = 100;
+    options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "listen and send on URL", "URL");
+    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
+    try {
+        opts.parse();
+        proton::url url(address);
+        proton::io::listener listener(url.host(), url.port());
+        std::cout << "direct_send listening on " << url << std::endl;
+        simple_send handler(message_count);
+        proton::io::socket_engine(listener.accept(), handler).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/example_test.py b/examples/cpp/engine/example_test.py
new file mode 100644
index 0000000..a4c4c17
--- /dev/null
+++ b/examples/cpp/engine/example_test.py
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+
+# This is a test script to run the examples and verify that they behave as expected.
+
+import unittest
+import os, sys, socket, time
+from  random import randrange
+from subprocess import Popen, PIPE, STDOUT
+from copy import copy
+import platform
+from os.path import dirname as dirname
+
+def cmdline(*args):
+    """Adjust executable name args[0] for windows and/or valgrind"""
+    args = list(args)
+    if platform.system() == "Windows":
+        args[0] += ".exe"
+    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
+        args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet",
+                "--leak-check=full"] + args
+    return args
+
+def background(*args):
+    """Run executable in the backround, return the popen"""
+    p = Popen(cmdline(*args), stdout=PIPE, stderr=sys.stderr)
+    p.args = args               # Save arguments for debugging output
+    return p
+
+def verify(p):
+    """Wait for executable to exit and verify status."""
+    try:
+        out, err = p.communicate()
+    except Exception as e:
+        raise Exception("Error running %s: %s", p.args, e)
+    if p.returncode:
+        raise Exception("""%s exit code %s
+vvvvvvvvvvvvvvvv
+%s
+^^^^^^^^^^^^^^^^
+""" % (p.args, p.returncode, out))
+    if platform.system() == "Windows":
+        # Just \n please
+        if out:
+            out = out.translate(None, '\r')
+    return out
+
+def execute(*args):
+    return verify(background(*args))
+
+NULL = open(os.devnull, 'w')
+
+def wait_addr(addr, timeout=10):
+    """Wait up to timeout for something to listen on port"""
+    deadline = time.time() + timeout
+    while time.time() < deadline:
+        try:
+            c = socket.create_connection(addr.split(":"), deadline - time.time())
+            c.close()
+            return
+        except socket.error as e:
+            time.sleep(0.01)
+    raise Exception("Timed out waiting for %s", addr)
+
+def pick_addr():
+    """Pick a new host:port address."""
+    # TODO aconway 2015-07-14: need a safer way to pick ports.
+    p =  randrange(10000, 20000)
+    return "127.0.0.1:%s" % p
+
+def ssl_certs_dir():
+    """Absolute path to the test SSL certificates"""
+    pn_root = dirname(dirname(dirname(sys.argv[0])))
+    return os.path.join(pn_root, "examples/cpp/ssl_certs")
+
+class Broker(object):
+    """Run the test broker"""
+
+    @classmethod
+    def get(cls):
+        if not hasattr(cls, "_broker"):
+            cls._broker = Broker()
+        return cls._broker
+
+    @classmethod
+    def stop(cls):
+        if cls.get() and cls._broker.process:
+            cls._broker.process.kill()
+            cls._broker = None
+
+    def __init__(self):
+        broker_exe = os.environ.get("TEST_BROKER") or "broker"
+        self.addr = pick_addr()
+        cmd = cmdline(broker_exe, "-a", self.addr)
+        try:
+            self.process = Popen(cmd, stdout=NULL, stderr=sys.stderr)
+            wait_addr(self.addr)
+            self.addr += "/examples"
+        except Exception as e:
+            raise Exception("Error running %s: %s", cmd, e)
+
+class ExampleTest(unittest.TestCase):
+    """Run the examples, verify they behave as expected."""
+
+    @classmethod
+    def tearDownClass(self):
+        Broker.stop()
+
+    def test_helloworld(self):
+        b = Broker.get()
+        hw = execute("helloworld", b.addr)
+        self.assertEqual('Hello World!\n', hw)
+
+    def test_simple_send_recv(self):
+        b = Broker.get()
+        send = execute("simple_send", "-a", b.addr)
+        self.assertEqual("all messages confirmed\n", send)
+        recv = execute("simple_recv", "-a", b.addr)
+        recv_expect = "simple_recv listening on amqp://%s\n" % (b.addr)
+        recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
+        self.assertEqual(recv_expect, recv)
+
+    def test_simple_send_direct_recv(self):
+        addr = pick_addr()
+        recv = background("direct_recv", "-a", addr)
+        while not "listening" in recv.stdout.readline():
+            pass
+        self.assertEqual("all messages confirmed\n", execute("simple_send", "-a", addr))
+        recv_expect = "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
+        self.assertEqual(recv_expect, verify(recv))
+
+    def test_simple_recv_direct_send(self):
+        addr = pick_addr()
+        send = background("direct_send", "-a", addr)
+        while not "listening" in send.stdout.readline():
+            pass
+        recv_expect = "simple_recv listening on amqp://%s\n" % (addr)
+        recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
+        self.assertEqual(recv_expect, execute("simple_recv", "-a", addr))
+        send_expect = "all messages confirmed\n"
+        self.assertEqual(send_expect, verify(send))
+
+    CLIENT_EXPECT="""Twas brillig, and the slithy toves => TWAS BRILLIG, AND THE SLITHY TOVES
+Did gire and gymble in the wabe. => DID GIRE AND GYMBLE IN THE WABE.
+All mimsy were the borogroves, => ALL MIMSY WERE THE BOROGROVES,
+And the mome raths outgrabe. => AND THE MOME RATHS OUTGRABE.
+"""
+    def test_simple_recv_send(self):
+        # Start receiver first, then run sender"""
+        b = Broker.get()
+        recv = background("simple_recv", "-a", b.addr)
+        self.assertEqual("all messages confirmed\n", execute("simple_send", "-a", b.addr))
+        recv_expect = "simple_recv listening on amqp://%s\n" % (b.addr)
+        recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
+        self.assertEqual(recv_expect, verify(recv))
+
+    def test_client_server(self):
+        b = Broker.get()
+        server = background("server", "-a", b.addr)
+        try:
+            self.assertEqual(execute("client", "-a", b.addr), self.CLIENT_EXPECT)
+        finally:
+            server.kill()
+
+if __name__ == "__main__":
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/helloworld.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/helloworld.cpp b/examples/cpp/engine/helloworld.cpp
new file mode 100644
index 0000000..72be78e
--- /dev/null
+++ b/examples/cpp/engine/helloworld.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/event.hpp"
+#include "proton/handler.hpp"
+#include "proton/url.hpp"
+#include "proton/io.hpp"
+
+#include <iostream>
+
+class hello_world : public proton::handler {
+  private:
+    std::string address_;
+
+  public:
+
+    hello_world(const std::string& address) : address_(address) {}
+
+    void on_start(proton::event &e) {
+        e.connection().open();
+        e.connection().open_receiver(address_);
+        e.connection().open_sender(address_);
+    }
+
+    void on_sendable(proton::event &e) {
+        proton::message m("Hello World!");
+        e.sender().send(m);
+        e.sender().close();
+    }
+
+    void on_message(proton::event &e) {
+        std::cout << e.message().body() << std::endl;
+        e.connection().close();
+    }
+};
+
+int main(int argc, char **argv) {
+    try {
+        proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
+        hello_world hw(url.path());
+        proton::io::socket_engine(url, hw).run();
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/options.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/options.hpp b/examples/cpp/engine/options.hpp
new file mode 100644
index 0000000..bd477b5
--- /dev/null
+++ b/examples/cpp/engine/options.hpp
@@ -0,0 +1,173 @@
+#ifndef OPTIONS_HPP
+#define OPTIONS_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <string>
+#include <sstream>
+#include <ostream>
+#include <vector>
+#include <stdexcept>
+
+/** bad_option is thrown for option parsing errors */
+struct bad_option : public std::runtime_error {
+    bad_option(const std::string& s) : std::runtime_error(s) {}
+};
+
+/** Simple command-line option parser for example programs */
+class options {
+  public:
+
+    options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() {
+        size_t slash = prog_.find_last_of("/\\");
+        if (slash != std::string::npos)
+            prog_ = prog_.substr(slash+1); // Extract prog name from path
+        add_flag(help_, 'h', "help", "Print the help message");
+    }
+
+    ~options() {
+        for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i)
+            delete *i;
+    }
+
+    /** Updates value when parse() is called if option is present with a value. */
+    template<class T>
+    void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) {
+        opts_.push_back(new option_value<T>(value, short_name, long_name, description, var));
+    }
+
+    /** Sets flag when parse() is called if option is present. */
+    void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) {
+        opts_.push_back(new option_flag(flag, short_name, long_name, description));
+    }
+
+    /** Parse the command line, return the index of the first non-option argument.
+     *@throws bad_option if there is a parsing error or unknown option.
+     */
+    int parse() {
+        int arg = 1;
+        for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) {
+            opts::iterator i = opts_.begin();
+            while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg))
+                ++i;
+            if (i == opts_.end())
+                throw bad_option(std::string("unknown option ") + argv_[arg]);
+        }
+        if (help_) throw bad_option("");
+        return arg;
+    }
+
+    /** Print a usage message */
+  friend std::ostream& operator<<(std::ostream& os, const options& op) {
+      os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl;
+      os << std::endl << "options:" << std::endl;
+      for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i)
+          os << **i << std::endl;
+      return os;
+  }
+
+ private:
+    class option {
+      public:
+        option(char s, const std::string& l, const std::string& d, const std::string v) :
+            short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {}
+        virtual ~option() {}
+
+        virtual bool parse(int argc, char const * const * argv, int &i) = 0;
+        virtual void print_default(std::ostream&) const {};
+
+      friend std::ostream& operator<<(std::ostream& os, const option& op) {
+          os << "  " << op.short_;
+          if (!op.var_.empty()) os << " " << op.var_;
+          os << ", " << op.long_;
+          if (!op.var_.empty()) os << "=" << op.var_;
+          os << std::endl << "        " << op.desc_;
+          op.print_default(os);
+          return os;
+      }
+
+      protected:
+        std::string short_, long_, desc_, var_;
+    };
+
+    template <class T>
+    class option_value : public option {
+      public:
+        option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) :
+            option(s, l, d, v), value_(value) {}
+
+        bool parse(int argc, char const * const * argv, int &i) {
+            std::string arg(argv[i]);
+            if (arg == short_ || arg == long_) {
+                if (i < argc-1) {
+                    set_value(arg, argv[++i]);
+                    return true;
+                } else {
+                    throw bad_option("missing value for " + arg);
+                }
+            }
+            if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) {
+                set_value(long_, arg.substr(long_.size()+1));
+                return true;
+            }
+            return false;
+        }
+
+        virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; }
+
+        void set_value(const std::string& opt, const std::string& s) {
+            std::istringstream is(s);
+            is >> value_;
+            if (is.fail() || is.bad())
+                throw bad_option("bad value for " + opt + ": " + s);
+        }
+
+      private:
+        T& value_;
+    };
+
+    class option_flag: public option {
+      public:
+        option_flag(bool& flag, const char s, const std::string& l, const std::string& d) :
+            option(s, l, d, ""), flag_(flag)
+        { flag_ = false; }
+
+        bool parse(int /*argc*/, char const * const * argv, int &i) {
+            if (argv[i] == short_ || argv[i] == long_) {
+                flag_ = true;
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+      private:
+        bool &flag_;
+    };
+
+    typedef std::vector<option*> opts;
+
+    int argc_;
+    char const * const * argv_;
+    std::string prog_;
+    opts opts_;
+    bool help_;
+};
+
+#endif // OPTIONS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/server.cpp b/examples/cpp/engine/server.cpp
new file mode 100644
index 0000000..4641c4c
--- /dev/null
+++ b/examples/cpp/engine/server.cpp
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include "proton/connection.hpp"
+#include "proton/io.hpp"
+#include "proton/url.hpp"
+#include "proton/event.hpp"
+#include "proton/handler.hpp"
+#include "proton/url.hpp"
+
+#include <iostream>
+#include <map>
+#include <string>
+#include <cctype>
+
+class server : public proton::handler {
+  private:
+    typedef std::map<std::string, proton::sender> sender_map;
+    proton::url url;
+    sender_map senders;
+
+  public:
+
+    server(const std::string &u) : url(u) {}
+
+    void on_start(proton::event &e) {
+        e.connection().open();
+        e.connection().open_receiver(url.path());
+        std::cout << "server connected to " << url << std::endl;
+    }
+
+    std::string to_upper(const std::string &s) {
+        std::string uc(s);
+        size_t l = uc.size();
+        for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]);
+        return uc;
+    }
+
+    void on_message(proton::event &e) {
+        std::cout << "Received " << e.message().body() << std::endl;
+        std::string reply_to = e.message().reply_to();
+        proton::message reply;
+        reply.address(reply_to);
+        reply.body(to_upper(e.message().body().get<std::string>()));
+        reply.correlation_id(e.message().correlation_id());
+        if (!senders[reply_to])
+            senders[reply_to] = e.connection().open_sender(reply_to);
+        senders[reply_to].send(reply);
+    }
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("amqp://0.0.0.0:5672/examples");
+    options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        server handler(address);
+        proton::io::socket_engine(address, handler).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_recv.cpp b/examples/cpp/engine/simple_recv.cpp
new file mode 100644
index 0000000..1d07a96
--- /dev/null
+++ b/examples/cpp/engine/simple_recv.cpp
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include "proton/io.hpp"
+#include "proton/url.hpp"
+#include "proton/event.hpp"
+#include "proton/handler.hpp"
+#include "proton/link.hpp"
+#include "proton/value.hpp"
+#include "proton/message_id.hpp"
+
+#include <iostream>
+#include <map>
+
+
+
+class simple_recv : public proton::handler {
+  private:
+    proton::url url;
+    proton::receiver receiver;
+    uint64_t expected;
+    uint64_t received;
+  public:
+
+    simple_recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
+
+    void on_start(proton::event &e) {
+        e.connection().open();
+        receiver = e.connection().open_receiver(url.path());
+        std::cout << "simple_recv listening on " << url << std::endl;
+    }
+
+    void on_message(proton::event &e) {
+        proton::message& msg = e.message();
+        if (msg.id().get<uint64_t>() < received)
+            return; // ignore duplicate
+        if (expected == 0 || received < expected) {
+            std::cout << msg.body() << std::endl;
+            received++;
+            if (received == expected) {
+                e.receiver().close();
+                e.connection().close();
+            }
+        }
+    }
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("127.0.0.1:5672/examples");
+    int message_count = 100;
+    options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
+    opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
+
+    try {
+        opts.parse();
+        simple_recv handler(address, message_count);
+        proton::io::socket_engine(address, handler).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/engine/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/engine/simple_send.cpp b/examples/cpp/engine/simple_send.cpp
new file mode 100644
index 0000000..3b9a6f0
--- /dev/null
+++ b/examples/cpp/engine/simple_send.cpp
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include "proton/io.hpp"
+#include "proton/url.hpp"
+#include "proton/event.hpp"
+#include "proton/handler.hpp"
+#include "proton/connection.hpp"
+#include "proton/value.hpp"
+
+#include <iostream>
+#include <map>
+
+class simple_send : public proton::handler {
+  private:
+    proton::url url;
+    proton::sender sender;
+    int sent;
+    int confirmed;
+    int total;
+  public:
+
+    simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
+
+    void on_start(proton::event &e) {
+        e.connection().open();
+        sender = e.connection().open_sender(url.path());
+    }
+
+    void on_sendable(proton::event &e) {
+        proton::sender sender = e.sender();
+        while (sender.credit() && sent < total) {
+            proton::message msg;
+            msg.id(sent + 1);
+            std::map<std::string, int> m;
+            m["sequence"] = sent+1;
+            msg.body(m);
+            sender.send(msg);
+            sent++;
+        }
+    }
+
+    void on_delivery_accept(proton::event &e) {
+        confirmed++;
+        if (confirmed == total) {
+            std::cout << "all messages confirmed" << std::endl;
+            e.connection().close();
+        }
+    }
+
+    void on_disconnect(proton::event &e) {
+        sent = confirmed;
+    }
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    std::string address("127.0.0.1:5672/examples");
+    int message_count = 100;
+    options opts(argc, argv);
+    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+    opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
+    try {
+        opts.parse();
+        simple_send handler(address, message_count);
+        proton::io::socket_engine(address, handler).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/examples/cpp/select_broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/select_broker.cpp b/examples/cpp/select_broker.cpp
deleted file mode 100644
index 871a180..0000000
--- a/examples/cpp/select_broker.cpp
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "options.hpp"
-#include "broker.hpp"
-
-#include "proton/connection_engine.hpp"
-
-#include <sstream>
-#include <arpa/inet.h>
-#include <sys/select.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-#include <errno.h>
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
-     // Note strerror is thread unsafe, this example is single-threaded.
-    if (result < 0) throw std::runtime_error(msg + ": " + strerror(errno));
-    return result;
-}
-
-void fd_set_if(bool on, int fd, fd_set *fds);
-int do_listen(uint16_t port);
-int do_accept(int listen_fd);
-
-class broker {
-
-    typedef std::map<int, proton::connection_engine*> engine_map;
-
-    queues queues_;
-    broker_handler handler_;
-    engine_map engines_;
-    fd_set reading_, writing_;
-
-  public:
-    broker() : handler_(queues_) {
-        FD_ZERO(&reading_);
-        FD_ZERO(&writing_);
-    }
-
-    ~broker() {
-        for (engine_map::iterator i = engines_.begin(); i != engines_.end(); ++i)
-            delete i->second;
-    }
-
-    void run(uint16_t port) {
-
-        int listen_fd = do_listen(port);
-        FD_SET(listen_fd, &reading_);
-
-        while(true) {
-            fd_set readable_set = reading_;
-            fd_set writable_set = writing_;
-
-            check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select");
-            for (int fd = 0; fd < FD_SETSIZE; ++fd) {
-                if (fd == listen_fd) {
-                    if (FD_ISSET(listen_fd, &readable_set)) {
-                        int new_fd = do_accept(listen_fd);
-                        engines_[new_fd] = new proton::connection_engine(handler_);
-                        FD_SET(new_fd, &reading_);
-                        FD_SET(new_fd, &writing_);
-                    }
-                    continue;
-                }
-                if (engines_.find(fd) != engines_.end()) {
-                    proton::connection_engine& eng = *engines_[fd];
-                    try {
-                        if (FD_ISSET(fd, &readable_set))
-                            readable(fd, eng);
-
-                        if (FD_ISSET(fd, &writable_set))
-                            writable(fd, eng);
-                    } catch (const std::exception& e) {
-                        std::cout << e.what() << " fd=" << fd << std::endl;
-                        eng.close_input();
-                        eng.close_output();
-                    }
-                    // Set reading/writing bits for next time around
-                    fd_set_if(eng.input().size(), fd, &reading_);
-                    fd_set_if(eng.output().size(), fd, &writing_);
-
-                    if (eng.closed()) {
-                        ::close(fd);
-                        delete engines_[fd];
-                        engines_.erase(fd);
-                    }
-                }
-            }
-        }
-    }
-
-  private:
-
-    void readable(int fd, proton::connection_engine& eng) {
-        proton::buffer<char> input = eng.input();
-        if (input.size()) {
-            ssize_t n = check(read(fd, input.begin(), input.size()));
-            if (n > 0) {
-                eng.received(n);
-            } else {
-                eng.close_input();
-            }
-        }
-    }
-
-    void writable(int fd, proton::connection_engine& eng) {
-        proton::buffer<const char> output = eng.output();
-        if (output.size()) {
-            ssize_t n = check(write(fd, output.begin(), output.size()));
-            if (n > 0)
-                eng.sent(n);
-            else {
-                eng.close_output();
-            }
-        }
-    }
-
-};
-
-void fd_set_if(bool on, int fd, fd_set *fds) {
-    if (on)
-        FD_SET(fd, fds);
-    else
-        FD_CLR(fd, fds);
-}
-
-int do_listen(uint16_t port) {
-    int listen_fd = check(socket(PF_INET, SOCK_STREAM, 0), "create listener");
-    int yes = 1;
-    check(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt");
-    struct sockaddr_in name;
-    name.sin_family = AF_INET;
-    name.sin_port = htons (port);
-    name.sin_addr.s_addr = htonl (INADDR_ANY);
-    check(bind(listen_fd, (struct sockaddr *)&name, sizeof(name)), "bind listener");
-    check(listen(listen_fd, 32), "listen");
-    std::cout << "listening on port " << port << " fd=" << listen_fd << std::endl;
-    return listen_fd;
-}
-
-int do_accept(int listen_fd) {
-    struct sockaddr_in client_addr;
-    socklen_t size = sizeof(client_addr);
-    int fd = check(accept(listen_fd, (struct sockaddr *)&client_addr, &size), "accept");
-    std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr)
-              << ":" << ntohs(client_addr.sin_port)
-              << " fd=" << fd << std::endl;
-    return fd;
-}
-
-int main(int argc, char **argv) {
-    // Command line options
-    proton::url url("0.0.0.0");
-    options opts(argc, argv);
-    opts.add_value(url, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker().run(url.port_int());
-        return 0;
-    } catch (const bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt
index 563c8ab..9d97348 100644
--- a/proton-c/CMakeLists.txt
+++ b/proton-c/CMakeLists.txt
@@ -248,7 +248,7 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
   endif (ENABLE_WARNING_ERROR)
   # TODO aconway 2016-01-06: we should be able to clean up the code and turn on
   # some of these warnings.
-  set (CXX_WARNING_FLAGS "${WERROR} -pedantic -Weverything -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast")
+  set (CXX_WARNING_FLAGS "${WERROR} -pedantic -Weverything -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast -Wno-missing-noreturn")
 endif()
 
 if (MSVC)
@@ -449,11 +449,11 @@ install (TARGETS qpid-proton
 # Install windows qpid-proton pdb files
 if (MSVC)
   install(FILES ${CMAKE_CURRENT_BINARY_DIR}/Debug/qpid-proton${CMAKE_DEBUG_POSTFIX}.pdb
-    DESTINATION bin 
+    DESTINATION bin
     CONFIGURATIONS Debug
     OPTIONAL)
   install(FILES ${CMAKE_CURRENT_BINARY_DIR}/RelWithDebInfo/qpid-proton.pdb
-    DESTINATION bin 
+    DESTINATION bin
     CONFIGURATIONS RelWithDebInfo
     OPTIONAL)
 endif (MSVC)
@@ -468,7 +468,7 @@ install (FILES  ${CMAKE_CURRENT_BINARY_DIR}/include/proton/version.h
 configure_file(
   ${CMAKE_CURRENT_SOURCE_DIR}/src/libqpid-proton.pc.in
   ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc @ONLY)
-install (FILES 
+install (FILES
   ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc
   DESTINATION ${LIB_INSTALL_DIR}/pkgconfig)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 8ae1c94..59f4d5b 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -48,6 +48,7 @@ set(qpid-proton-cpp-source
   src/connection_engine.cpp
   src/error.cpp
   src/event.cpp
+  src/id_generator.cpp
   src/link.cpp
   src/link_options.cpp
   src/message.cpp
@@ -76,6 +77,11 @@ set(qpid-proton-cpp-source
   src/value.cpp
   )
 
+if(MSVC)
+  list(APPEND qpid-proton-cpp-source src/windows/io.cpp)
+else(MSVC)
+  list(APPEND qpid-proton-cpp-source src/posix/io.cpp)
+endif(MSVC)
 
 set_source_files_properties (
   ${qpid-proton-cpp-source}
@@ -151,7 +157,7 @@ install (FILES
 
 ## Test
 if (ENABLE_VALGRIND AND VALGRIND_EXE)
-  set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=1 --quiet --leak-check=full --trace-children=yes)
+  set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=42 --quiet --leak-check=full --trace-children=yes)
 endif ()
 
 macro(add_cpp_test test)
@@ -171,3 +177,4 @@ add_cpp_test(interop_test ${CMAKE_SOURCE_DIR}/tests)
 add_cpp_test(message_test)
 add_cpp_test(value_test)
 add_cpp_test(scalar_test)
+add_cpp_test(engine_test)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index c32452a..6cbd8c3 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -42,7 +42,8 @@ class connection : public object<pn_connection_t>, endpoint
   public:
     connection(pn_connection_t* c=0) : object<pn_connection_t>(c) {}
 
-    /// Get the container, throw an exception if this connection is not managed by a container.
+    /// Get the container, throw an exception if this connection is not managed
+    /// by a container.
     PN_CPP_EXTERN class container &container() const;
 
     /// Get the transport for the connection.
@@ -57,9 +58,6 @@ class connection : public object<pn_connection_t>, endpoint
     /// Return the container-ID for the connection.
     PN_CPP_EXTERN std::string container_id() const;
 
-    // Set the container-ID for the connection
-    PN_CPP_EXTERN void container_id(const std::string& id);
-
     /** Initiate local open, not complete till messaging_handler::on_connection_opened()
      * or proton_handler::on_connection_remote_open()
      */
@@ -95,14 +93,19 @@ class connection : public object<pn_connection_t>, endpoint
     /** Get the endpoint state */
     PN_CPP_EXTERN endpoint::state state() const;
 
+    /// True if the connection is fully closed, i.e. local and remote ends are closed.
+    bool closed() const { return (state()&LOCAL_CLOSED) && (state()&REMOTE_CLOSED); }
+
   private:
     PN_CPP_EXTERN void user(const std::string &);
     PN_CPP_EXTERN void password(const std::string &);
 
   friend class connection_context;
+  friend class connection_engine;
   friend class connection_options;
   friend class connector;
   friend class transport;
+  friend class container_impl;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/connection_engine.hpp
index 3a39be1..83839e8 100644
--- a/proton-c/bindings/cpp/include/proton/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_engine.hpp
@@ -19,8 +19,12 @@
  * under the License.
  */
 
-#include "proton/pn_unique_ptr.hpp"
+#include "proton/connection.hpp"
+#include "proton/connection_options.hpp"
 #include "proton/export.hpp"
+#include "proton/id_generator.hpp"
+#include "proton/pn_unique_ptr.hpp"
+#include "proton/types.hpp"
 
 #include <cstddef>
 #include <utility>
@@ -28,23 +32,12 @@
 
 namespace proton {
 
+class connection_engine_context;
+
 class handler;
 class connection;
 
-/// Pointers to a byte range to use as a buffer.
-template <class T> class buffer {
-  public:
-    explicit buffer(T* begin__=0, T* end__=0) : begin_(begin__), end_(end__) {}
-    explicit buffer(T* ptr, size_t n) : begin_(ptr), end_(ptr + n) {}
-    T* begin() const { return begin_; }
-    T* end() const { return end_; }
-    size_t size() const { return end() - begin(); }
-    bool empty() const { return !size(); }
-
- private:
-    T* begin_;
-    T* end_;
-};
+// TODO aconway 2016-01-23: doc contrast with container.
 
 /**
  * A connection_engine manages a single AMQP connection.  It is useful for
@@ -61,11 +54,6 @@ template <class T> class buffer {
  * something else such as an RDMA connection, a shared-memory buffer or a Unix
  * pipe.
  *
- * The engine is an alternative event_loop to the proton::container. The
- * container is easier to use in single-threaded, stand-alone applications that
- * want to use standard socket connections.  The engine can be embedding into
- * any existing IO framework for any type of IO.
- *
  * The application is coded the same way for engine or container: you implement
  * proton::handler. Handlers attached to an engine will receive transport,
  * connection, session, link and message events. They will not receive reactor,
@@ -76,80 +64,120 @@ template <class T> class buffer {
  */
 class connection_engine {
   public:
-
-    // TODO aconway 2015-11-02: engine() take connection-options, handle SSL
-    // TODO aconway 2015-11-02: engine needs to accept application events.
-    // TODO aconway 2015-11-02: generalize reconnect logic for container and engine.
+    // FIXME aconway 2016-01-23: DOC
+    class container {
+      public:
+        /// Create a container with id, default to random UUID if id == "".
+        PN_CPP_EXTERN container(const std::string &id = "");
+
+        /// Return the container-id
+        PN_CPP_EXTERN std::string id() const;
+
+        /// Make options to configure a new engine, using the default options.
+        ///
+        /// Call this once for each new engine as the options include a generated unique link_prefix.
+        /// You can modify the configuration before creating the engine but you should not
+        /// modify the container_id or link_prefix.
+        PN_CPP_EXTERN connection_options make_options();
+
+        /// Set the default options to be used for connection engines.
+        /// The container will set the container_id and link_prefix when make_options is called.
+        PN_CPP_EXTERN void options(const connection_options&);
+
+      private:
+        const std::string id_;
+        id_generator id_gen_;
+        connection_options options_;
+    };
+
+    /** Create a connection engine that dispatches to handler. */
+    PN_CPP_EXTERN connection_engine(handler&, const connection_options& = no_opts);
+
+    PN_CPP_EXTERN virtual ~connection_engine();
+
+    /// Return the number of bytes that the engine is currently ready to read.
+    PN_CPP_EXTERN size_t can_read() const;
+
+    /// Return the number of bytes that the engine is currently ready to write.
+    PN_CPP_EXTERN size_t can_write() const;
+
+    /// Combine these flags with | to indicate read, write, both or neither
+    enum io_flag {
+        READ = 1,
+        WRITE = 2
+    };
+
+    /// Read, write and dispatch events.
+    ///
+    /// io_flags indicates whether to read, write, both or neither.
+    /// dispatches all events generated by reading or writing.
+    ///
+    /// @throw proton::closed_error if closed() is true before calling process()
+    /// @throw proton::io_error if the engine is closed by an error.
+    /// @return true if process should be called again, i.e. !closed()
+    PN_CPP_EXTERN bool process(int io_flags=READ|WRITE);
+
+    /// Non-throwing version of process.
+    /// Use closed() and error_str() to check the status of the engine.
+    PN_CPP_EXTERN bool process_nothrow(int io_flags=READ|WRITE);
 
     /**
-     * Create an engine that will advertise id as the AMQP container-id for its connection.
+     * True if the engine is closed, meaning there are no further
+     * events to process and close_io has been called.
+     * Call error_str() to get an error description.
      */
-    PN_CPP_EXTERN connection_engine(handler&, const std::string& id=std::string());
+    PN_CPP_EXTERN bool closed() const;
 
-    PN_CPP_EXTERN ~connection_engine();
+    /** If the engine was closed by an error, return a pointer */
+    PN_CPP_EXTERN std::string error_str() const;
 
-    /**
-     * Input buffer. If input.size() == 0 means no input can be accepted right now, but
-     * sending output might free up space.
-     */
-    PN_CPP_EXTERN buffer<char> input();
+    /** Get the AMQP connection associated with this connection_engine. */
+    PN_CPP_EXTERN class connection connection() const;
 
-    /**
-     * Process n bytes from input(). Calls handler functions for the AMQP events
-     * encoded by the received data.
-     *
-     * After the call the input() and output() buffers may have changed.
-     */
-    PN_CPP_EXTERN void received(size_t n);
+    /** Get the transport object connection associated with this connection_engine. */
+    PN_CPP_EXTERN class transport transport() const;
 
-    /**
-     * Indicate that no more input data is available from the external
-     * connection. May call handler functions.
+    /** Disconnect the engine. Calls io::close and dispatches final events to
+     * the handler. Neither the handler nor the io will be used after this call.
      */
-    PN_CPP_EXTERN void close_input();
+    PN_CPP_EXTERN void disconnect();
 
-    /**
-     * Output buffer. Send data from this buffer and call sent() to indicate it
-     * has been sent. If output().size() == 0 there is no data to send,
-     * receiving more input data might make output data available.
+  protected:
+    /** Does a non-blocking read of up to max bytes into buf.
+     * Return the number read, 0 if no data could be read without blocking.
+     *
+     *@throw proton::closed_error if the input reaches EOF.
+     *@throw proton::io_error if there is a read error.
      */
-    PN_CPP_EXTERN buffer<const char> output();
+    virtual size_t io_read(char* buf, size_t max) = 0;
 
-    /**
-     * Call when the first n bytes from the start of the output buffer have been
-     * sent.  May call handler functions.
+    /** Does a non-blocking write of up to max bytes from buf.
+     * Return the number written, 0 if no data could be written without blocking.
      *
-     * After the call the input() and output() buffers may have changed.
+     *@throw proton::io_error if there is a write error.
      */
-    PN_CPP_EXTERN void sent(size_t n);
+    virtual size_t io_write(const char*, size_t) = 0;
 
     /**
-     * Indicate that no more output data can be sent to the external connection.
-     * May call handler functions.
+     * Close the io, no more _io methods will be called after this is called.
      */
-    PN_CPP_EXTERN void close_output();
+    virtual void io_close() = 0;
 
-    /**
-     * True if connection_engine is closed. This can either be because close_input() and
-     * close_output() were called to indicate the external connection has
-     * closed, or because AMQP close frames were received in the AMQP data.  In
-     * either case no more input() buffer space or output() data will be
-     * available for this connection_engine.
-     */
-    PN_CPP_EXTERN bool closed() const;
+    PN_CPP_EXTERN static const connection_options no_opts;
 
-    /** The AMQP connection associated with this connection_engine. */
-    PN_CPP_EXTERN class connection  connection() const;
+  private:
 
-    /** The AMQP container-id associated with this connection_engine. */
-    PN_CPP_EXTERN  std::string id() const;
+    connection_engine(const connection_engine&);
+    connection_engine& operator=(const connection_engine&);
 
-  private:
-    void run();
+    void dispatch();
+    void try_read();
+    void try_write();
 
-    struct impl;
-    pn_unique_ptr<impl> impl_;
+    class connection connection_;
+    connection_engine_context* ctx_;
 };
 
+
 }
 #endif // CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 21468f9..932fc58 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -69,6 +69,7 @@ class connection_options {
     PN_CPP_EXTERN connection_options& idle_timeout(duration);
     PN_CPP_EXTERN connection_options& heartbeat(duration);
     PN_CPP_EXTERN connection_options& container_id(const std::string &id);
+    PN_CPP_EXTERN connection_options& link_prefix(const std::string &id);
     PN_CPP_EXTERN connection_options& reconnect(const reconnect_timer &);
     PN_CPP_EXTERN connection_options& client_domain(const class client_domain &);
     PN_CPP_EXTERN connection_options& server_domain(const class server_domain &);
@@ -92,6 +93,7 @@ class connection_options {
 
   friend class container_impl;
   friend class connector;
+  friend class connection_engine;
 };
 
 } // namespace

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 4251038..8e907b7 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -51,8 +51,9 @@ class container_impl;
  */
 class container {
   public:
-    /// Container ID should be unique within your system. By default a random ID is generated.
-    PN_CPP_EXTERN container(const std::string& id=std::string());
+    /// Container ID should be unique within your system.
+    /// By default a random ID is generated.
+    PN_CPP_EXTERN container(const std::string& id="");
 
     /// Container ID should be unique within your system. By default a random ID is generated.
     PN_CPP_EXTERN container(handler& mhandler, const std::string& id=std::string());
@@ -109,6 +110,7 @@ class container {
   private:
     pn_unique_ptr<container_impl> impl_;
     friend class connector;
+  friend class link;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/error.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/error.hpp b/proton-c/bindings/cpp/include/proton/error.hpp
index f3475cf..3083406 100644
--- a/proton-c/bindings/cpp/include/proton/error.hpp
+++ b/proton-c/bindings/cpp/include/proton/error.hpp
@@ -21,11 +21,12 @@
  * under the License.
  *
  */
-#include <stdexcept>
-#include <string>
 #include "proton/config.hpp"
 #include "proton/export.hpp"
 
+#include <stdexcept>
+#include <string>
+
 namespace proton {
 
 /** Functions in the proton namespace throw a subclass of proton::error on error. */
@@ -40,6 +41,14 @@ struct decode_error : public error { PN_CPP_EXTERN explicit decode_error(const s
 /** Raised if there is an error encoding a C++ value as AMQP data. */
 struct encode_error : public error { PN_CPP_EXTERN explicit encode_error(const std::string&); };
 
-}
+/** Error reading or writing external IO. */
+struct io_error : public error { PN_CPP_EXTERN explicit io_error(const std::string&); };
 
+/** Attempt to use a closed resource (connnection, session or link). */
+struct closed_error : public io_error {
+    PN_CPP_EXTERN explicit closed_error(const std::string& = default_msg);
+    static const std::string default_msg;
+};
+
+}
 #endif  /*!PROTON_CPP_EXCEPTIONS_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index 971ffab..fde9716 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -47,7 +47,7 @@ class handler
      *@param peer_close_is_error treat orderly remote connection close as error.
      */
     PN_CPP_EXTERN handler(int prefetch=10, bool auto_accept=true, bool auto_settle=true,
-                                    bool peer_close_is_error=false);
+                          bool peer_close_is_error=false);
 
     PN_CPP_EXTERN virtual ~handler();
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/id_generator.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/id_generator.hpp b/proton-c/bindings/cpp/include/proton/id_generator.hpp
new file mode 100644
index 0000000..d653c81
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/id_generator.hpp
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/types.hpp"
+
+#ifndef ID_GENERATOR_HPP
+#define ID_GENERATOR_HPP
+
+namespace proton {
+
+///@internal
+class id_generator {
+  public:
+    PN_CPP_EXTERN id_generator(const std::string &prefix="");
+    PN_CPP_EXTERN void prefix(const std::string &);
+    PN_CPP_EXTERN std::string next();
+
+  private:
+    std::string prefix_;
+    uint64_t count_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/io.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io.hpp b/proton-c/bindings/cpp/include/proton/io.hpp
new file mode 100644
index 0000000..c4a3aa4
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io.hpp
@@ -0,0 +1,119 @@
+
+#ifndef SOCKET_IO_HPP
+#define SOCKET_IO_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/connection_engine.hpp>
+
+namespace proton {
+
+/** IO using sockets, file descriptors or handles.
+ *
+ * Note that you can use proton::connection_engine to communicate using AMQP
+ * over your own IO implementation or to integrate an existing IO framework of
+ * your choice.
+ */
+namespace io {
+
+///@name Setup and tear-down IO functionality.
+///
+/// Call proton::io::initialize before using any functions in the proton::io namespace.
+/// Call proton::io::finalize when you are done.
+///
+/// You can call initialize/finalize more than once as long as they are in
+/// matching pairs. Use proton::io::guard to call initialize/finalize around a scope.
+///
+/// Note on POSIX systems these are no-ops, but they are required for Windows.
+///
+///@{
+///
+/// Initialize the proton::io subsystem
+PN_CPP_EXTERN void initialize();
+
+/// Finalize the proton::io subsystem
+PN_CPP_EXTERN void finalize(); //nothrow
+struct guard {
+    guard() { initialize(); }
+    ~guard() { finalize(); }
+};
+///@}
+
+typedef int64_t descriptor;
+
+PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
+
+// Return the string describing the most recent IO error.
+PN_CPP_EXTERN std::string error_str();
+
+/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
+PN_CPP_EXTERN descriptor connect(const proton::url&);
+
+/// Listening socket.
+class listener {
+  public:
+    /// Listen on host/port. Empty host means listen on all interfaces.
+    /// port can be a service name or number
+    PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
+    PN_CPP_EXTERN ~listener();
+
+    /// Accept a connection. Return the descriptor, set host, port to the remote address.
+    /// port can be a service name or number.
+    PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
+
+    /// Accept a connection, does not provide address info.
+    descriptor accept() { std::string dummy; return accept(dummy, dummy); }
+
+    /// Convert to descriptor
+    descriptor socket() const { return socket_; }
+
+  private:
+    guard guard_;
+    listener(const listener&);
+    listener& operator=(const listener&);
+    descriptor socket_;
+};
+
+class socket_engine : public connection_engine {
+  public:
+    /// Wrap an open socket. Sets non-blocking mode.
+    PN_CPP_EXTERN socket_engine(descriptor socket_, handler&, const connection_options& = no_opts);
+
+    /// Create socket engine connected to url.
+    PN_CPP_EXTERN socket_engine(const url&, handler&, const connection_options& = no_opts);
+
+    /// Get the socket descriptor.
+    descriptor socket() const { return socket_; }
+
+    PN_CPP_EXTERN void run();
+
+  protected:
+    PN_CPP_EXTERN size_t io_read(char* buf, size_t max);
+    PN_CPP_EXTERN size_t io_write(const char*, size_t);
+    PN_CPP_EXTERN void io_close();
+
+  private:
+    void init();
+    guard guard_;
+    descriptor socket_;
+};
+
+}} // proton::io
+
+#endif // SOCKET_IO_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/object.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/object.hpp b/proton-c/bindings/cpp/include/proton/object.hpp
index 1f2032b..cc2bd31 100644
--- a/proton-c/bindings/cpp/include/proton/object.hpp
+++ b/proton-c/bindings/cpp/include/proton/object.hpp
@@ -48,6 +48,7 @@ template <class T> class pn_ptr : private pn_ptr_base, public comparable<pn_ptr<
     pn_ptr& operator=(pn_ptr o) { std::swap(ptr_, o.ptr_); return *this; }
 
     T* get() const { return ptr_; }
+    T* release() { T *p = ptr_; ptr_ = 0; return p; }
     bool operator!() const { return !ptr_; }
 
   friend bool operator==(const pn_ptr& a, const pn_ptr& b) { return a.ptr_ == b.ptr_; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/include/proton/session.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/session.hpp b/proton-c/bindings/cpp/include/proton/session.hpp
index 5b5c67a..49ed5d2 100644
--- a/proton-c/bindings/cpp/include/proton/session.hpp
+++ b/proton-c/bindings/cpp/include/proton/session.hpp
@@ -61,14 +61,14 @@ class session : public object<pn_session_t>, public endpoint
      *@param name if specified must be unique, by default the container generates a name
      * of the form: <hex-digits> + "@" + container.id()
      */
-    PN_CPP_EXTERN receiver create_receiver(const std::string& name=std::string());
+    PN_CPP_EXTERN receiver create_receiver(const std::string& name="");
 
     /** An un-opened sender link, you can set link properties before calling open().
      *
      *@param name if specified must be unique, by default the container generates a name
      * of the form: <hex-digits> + "@" + container.id()
      */
-    PN_CPP_EXTERN sender create_sender(const std::string& name=std::string());
+    PN_CPP_EXTERN sender create_sender(const std::string& name="");
 
     /** Create and open a sender with target=addr and optional link options opts*/
     PN_CPP_EXTERN sender open_sender(const std::string &addr, const link_options &opts = link_options());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b0c66544/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index 75465af..603fa91 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -67,12 +67,11 @@ std::string connection::container_id() const {
     return id ? std::string(id) : std::string();
 }
 
-void connection::container_id(const std::string& id) {
-    pn_connection_set_container(pn_object(), id.c_str());
-}
-
 container& connection::container() const {
-    return container_context::get(pn_object_reactor(pn_object()));
+    pn_reactor_t *r = pn_object_reactor(pn_object());
+    if (!r)
+        throw error("connection does not have a container");
+    return container_context::get(r);
 }
 
 link_range connection::find_links(endpoint::state mask) const {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message