qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/3] qpid-proton git commit: PROTON-1046: C++ multi-threaded controller and improved broker example
Date Wed, 27 Apr 2016 14:54:58 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/mt/epoll_controller.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_controller.cpp b/examples/cpp/mt/epoll_controller.cpp
new file mode 100644
index 0000000..6c75b04
--- /dev/null
+++ b/examples/cpp/mt/epoll_controller.cpp
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/controller.hpp>
+#include <proton/url.hpp>
+#include <proton/work_queue.hpp>
+
+#include <proton/io/connection_engine.hpp>
+#include <proton/io/default_controller.hpp>
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <set>
+#include <sstream>
+#include <system_error>
+
+// Linux native IO
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+#include <unistd.h>
+
+// This is the only public function, the implementation is private.
+std::unique_ptr<proton::controller> make_epoll_controller();
+
+// Private implementation
+namespace  {
+
+
+using lock_guard = std::lock_guard<std::mutex>;
+
+// Get string from errno
+std::string errno_str(const std::string& msg) {
+    return std::system_error(errno, std::system_category(), msg).what();
+}
+
+// Throw proton::error(errno_str(msg)) if result < 0
+int check(int result, const std::string& msg) {
+    if (result < 0)
+        throw proton::error(errno_str(msg));
+    return result;
+}
+
+// Wrapper for getaddrinfo() that cleans up in destructor.
+class unique_addrinfo {
+  public:
+    unique_addrinfo(const std::string& addr) : addrinfo_(0) {
+        proton::url u(addr);
+        int result = ::getaddrinfo(char_p(u.host()), char_p(u.port()), 0, &addrinfo_);
+        if (result)
+            throw proton::error(std::string("bad address: ") + gai_strerror(result));
+    }
+    ~unique_addrinfo() { if (addrinfo_) ::freeaddrinfo(addrinfo_); }
+
+    ::addrinfo* operator->() const { return addrinfo_; }
+
+  private:
+    static const char* char_p(const std::string& s) { return s.empty() ? 0 : s.c_str(); }
+    ::addrinfo *addrinfo_;
+};
+
+// File descriptor wrapper that calls ::close in destructor.
+class unique_fd {
+  public:
+    unique_fd(int fd) : fd_(fd) {}
+    ~unique_fd() { if (fd_ >= 0) ::close(fd_); }
+    operator int() const { return fd_; }
+    int release() { int ret = fd_; fd_ = -1; return ret; }
+
+  protected:
+    int fd_;
+};
+
+class pollable;
+class pollable_engine;
+class pollable_listener;
+
+class epoll_controller : public proton::controller {
+  public:
+    epoll_controller();
+    ~epoll_controller();
+
+    // Implemenet the proton::controller interface
+    void connect(const std::string& addr,
+                 proton::handler& h,
+                 const proton::connection_options& opts) override;
+
+    void listen(const std::string& addr,
+                std::function<proton::handler*(const std::string&)> factory,
+                const proton::connection_options& opts) override;
+
+    void stop_listening(const std::string& addr) override;
+
+    void options(const proton::connection_options& opts) override;
+    proton::connection_options options() override;
+
+    void run() override;
+
+    void stop_on_idle() override;
+    void stop(const proton::error_condition& err) override;
+    void wait() override;
+
+    // Functions used internally.
+
+    void add_engine(proton::handler* h, proton::connection_options opts, int fd);
+    void erase(pollable*);
+
+  private:
+    void idle_check(const lock_guard&);
+    void interrupt();
+
+    const unique_fd epoll_fd_;
+    const unique_fd interrupt_fd_;
+
+    mutable std::mutex lock_;
+
+    proton::connection_options options_;
+    std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
+    std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
+
+    std::condition_variable stopped_;
+    std::atomic<size_t> threads_;
+    bool stopping_;
+    proton::error_condition stop_err_;
+};
+
+// Base class for pollable file-descriptors. Manages epoll interaction,
+// subclasses implement virtual work() to do their serialized work.
+class pollable {
+  public:
+    pollable(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(false), working_(false)
+    {
+        int flags = check(::fcntl(fd, F_GETFL, 0), "non-blocking");
+        check(::fcntl(fd, F_SETFL,  flags | O_NONBLOCK), "non-blocking");
+        ::epoll_event ev = {0};
+        ev.data.ptr = this;
+        ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev);
+    }
+
+    virtual ~pollable() {
+        ::epoll_event ev = {0};
+        ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); // Ignore errors.
+    }
+
+    bool do_work(uint32_t events) {
+        {
+            lock_guard g(lock_);
+            if (working_)
+                return true;         // Another thread is already working.
+            working_ = true;
+            notified_ = false;
+        }
+        uint32_t new_events = work(events);  // Serialized, outside the lock.
+        if (new_events) {
+            lock_guard g(lock_);
+            rearm(notified_ ?  EPOLLIN|EPOLLOUT : new_events);
+        }
+        return new_events;
+    }
+
+    // Called by work_queue to notify that there are jobs.
+    void notify() {
+        lock_guard g(lock_);
+        if (!notified_) {
+            notified_ = true;
+            if (!working_) // No worker thread, rearm now.
+                rearm(EPOLLIN|EPOLLOUT);
+        }
+    }
+
+  protected:
+
+    // Subclass implements  work.
+    // Returns epoll events to re-enable or 0 if finished.
+    virtual uint32_t work(uint32_t events) = 0;
+
+    const unique_fd fd_;
+    const int epoll_fd_;
+
+  private:
+
+    void rearm(uint32_t events) {
+        epoll_event ev;
+        ev.data.ptr = this;
+        ev.events = EPOLLONESHOT | events;
+        check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), "re-arm epoll");
+        working_ = false;
+    }
+
+    std::mutex lock_;
+    bool notified_;
+    bool working_;
+};
+
+class work_queue : public proton::work_queue {
+  public:
+    typedef std::vector<std::function<void()> > jobs;
+
+    work_queue(pollable& p, proton::controller& c) :
+        pollable_(p), closed_(false), controller_(c) {}
+
+    bool push(std::function<void()> f) override {
+        // Note this is an unbounded work queue.
+        // A resource-safe implementation should be bounded.
+        lock_guard g(lock_);
+        if (closed_)
+            return false;
+        jobs_.push_back(f);
+        pollable_.notify();
+        return true;
+    }
+
+    jobs pop_all() {
+        lock_guard g(lock_);
+        return std::move(jobs_);
+    }
+
+    void close() {
+        lock_guard g(lock_);
+        closed_ = true;
+    }
+
+    proton::controller& controller() const override { return controller_; }
+
+  private:
+    std::mutex lock_;
+    pollable& pollable_;
+    jobs jobs_;
+    bool closed_;
+    proton::controller& controller_;
+};
+
+// Handle epoll wakeups for a connection_engine.
+class pollable_engine : public pollable {
+  public:
+
+    pollable_engine(
+        proton::handler* h, proton::connection_options opts, epoll_controller& c,
+        int fd, int epoll_fd
+    ) : pollable(fd, epoll_fd),
+        engine_(*h, opts.link_prefix(std::to_string(fd)+":")),
+        queue_(new work_queue(*this, c))
+    {
+        engine_.work_queue(queue_.get());
+    }
+
+    ~pollable_engine() {
+        queue_->close();               // No calls to notify() after this.
+        engine_.dispatch();            // Run any final events.
+        try { write(); } catch(...) {} // Write connection close if we can.
+        for (auto f : queue_->pop_all()) {// Run final queued work for side-effects.
+            try { f(); } catch(...) {}
+        }
+    }
+
+    uint32_t work(uint32_t events) {
+        try {
+            bool can_read = events & EPOLLIN, can_write = events && EPOLLOUT;
+            do {
+                can_write = can_write && write();
+                can_read = can_read && read();
+                for (auto f : queue_->pop_all()) // Run queued work
+                    f();
+                engine_.dispatch();
+            } while (can_read || can_write);
+            return (engine_.read_buffer().size ? EPOLLIN:0) |
+                (engine_.write_buffer().size ? EPOLLOUT:0);
+        } catch (const std::exception& e) {
+            close(proton::error_condition("exception", e.what()));
+        }
+        return 0;               // Ending
+    }
+
+    void close(const proton::error_condition& err) {
+        engine_.connection().close(err);
+    }
+
+  private:
+
+    bool write() {
+        if (engine_.write_buffer().size) {
+            ssize_t n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
+            while (n == EINTR)
+                n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
+            if (n > 0) {
+                engine_.write_done(n);
+                return true;
+            } else if (errno != EAGAIN && errno != EWOULDBLOCK)
+                check(n, "write");
+        }
+        return false;
+    }
+
+    bool read() {
+        if (engine_.read_buffer().size) {
+            ssize_t n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
+            while (n == EINTR)
+                n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
+            if (n > 0) {
+                engine_.read_done(n);
+                return true;
+            }
+            else if (n == 0)
+                engine_.read_close();
+            else if (errno != EAGAIN && errno != EWOULDBLOCK)
+                check(n, "read");
+        }
+        return false;
+    }
+
+    proton::io::connection_engine engine_;
+    std::shared_ptr<work_queue> queue_;
+};
+
+// A pollable listener fd that creates pollable_engine for incoming connections.
+class pollable_listener : public pollable {
+  public:
+    pollable_listener(
+        const std::string& addr,
+        std::function<proton::handler*(const std::string&)> factory,
+        int epoll_fd,
+        epoll_controller& c,
+        const proton::connection_options& opts
+    ) :
+        pollable(listen(addr), epoll_fd),
+        addr_(addr),
+        factory_(factory),
+        controller_(c),
+        opts_(opts)
+    {}
+
+    uint32_t work(uint32_t events) {
+        if (events & EPOLLRDHUP)
+            return 0;
+        int accepted = check(::accept(fd_, NULL, 0), "accept");
+        controller_.add_engine(factory_(addr_), opts_, accepted);
+        return EPOLLIN;
+    }
+
+    std::string addr() { return addr_; }
+
+  private:
+
+    int listen(const std::string& addr) {
+        std::string msg = "listen on "+addr;
+        unique_addrinfo ainfo(addr);
+        unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
+        int yes = 1;
+        check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), msg);
+        check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
+        check(::listen(fd, 32), msg);
+        return fd.release();
+    }
+
+    std::string addr_;
+    std::function<proton::handler*(const std::string&)> factory_;
+    epoll_controller& controller_;
+    proton::connection_options opts_;
+};
+
+
+epoll_controller::epoll_controller()
+    : epoll_fd_(check(epoll_create(1), "epoll_create")),
+      interrupt_fd_(check(eventfd(1, 0), "eventfd")),
+      stopping_(false), threads_(0)
+{}
+
+epoll_controller::~epoll_controller() {
+    try {
+        stop(proton::error_condition("exception", "controller shut-down"));
+        wait();
+    } catch (...) {}
+}
+
+void epoll_controller::add_engine(proton::handler* h, proton::connection_options opts, int fd) {
+    lock_guard g(lock_);
+    if (stopping_)
+        throw proton::error("controller is stopping");
+    std::unique_ptr<pollable_engine> e(new pollable_engine(h, opts, *this, fd, epoll_fd_));
+    e->notify();
+    engines_[e.get()] = std::move(e);
+}
+
+void epoll_controller::erase(pollable* e) {
+    lock_guard g(lock_);
+    if (!engines_.erase(e)) {
+        pollable_listener* l = dynamic_cast<pollable_listener*>(e);
+        if (l)
+            listeners_.erase(l->addr());
+    }
+    idle_check(g);
+}
+
+void epoll_controller::idle_check(const lock_guard&) {
+    if (stopping_  && engines_.empty() && listeners_.empty())
+        interrupt();
+}
+
+void epoll_controller::connect(const std::string& addr,
+                               proton::handler& h,
+                               const proton::connection_options& opts)
+{
+    std::string msg = "connect to "+addr;
+    unique_addrinfo ainfo(addr);
+    unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
+    check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
+    add_engine(&h, options().update(opts), fd.release());
+}
+
+void epoll_controller::listen(const std::string& addr,
+                              std::function<proton::handler*(const std::string&)> factory,
+                              const proton::connection_options& opts)
+{
+    lock_guard g(lock_);
+    if (!factory)
+        throw proton::error("null function to listen on "+addr);
+    if (stopping_)
+        throw proton::error("controller is stopping");
+    auto& l = listeners_[addr];
+    l.reset(new pollable_listener(addr, factory, epoll_fd_, *this, options_.update(opts)));
+    l->notify();
+}
+
+void epoll_controller::stop_listening(const std::string& addr) {
+    lock_guard g(lock_);
+    listeners_.erase(addr);
+    idle_check(g);
+}
+
+void epoll_controller::options(const proton::connection_options& opts) {
+    lock_guard g(lock_);
+    options_ = opts;
+}
+
+proton::connection_options epoll_controller::options() {
+    lock_guard g(lock_);
+    return options_;
+}
+
+void epoll_controller::run() {
+    ++threads_;
+    try {
+        epoll_event e;
+        while(true) {
+            check(::epoll_wait(epoll_fd_, &e, 1, -1), "epoll_wait");
+            pollable* p = reinterpret_cast<pollable*>(e.data.ptr);
+            if (!p)
+                break;          // Interrupted
+            if (!p->do_work(e.events))
+                erase(p);
+        }
+    } catch (const std::exception& e) {
+        stop(proton::error_condition("exception", e.what()));
+    }
+    if (--threads_ == 0)
+        stopped_.notify_all();
+}
+
+void epoll_controller::stop_on_idle() {
+    lock_guard g(lock_);
+    stopping_ = true;
+    idle_check(g);
+}
+
+void epoll_controller::stop(const proton::error_condition& err) {
+    lock_guard g(lock_);
+    stop_err_ = err;
+    interrupt();
+}
+
+void epoll_controller::wait() {
+    std::unique_lock<std::mutex> l(lock_);
+    stopped_.wait(l, [this]() { return this->threads_ == 0; } );
+    for (auto& eng : engines_)
+        eng.second->close(stop_err_);
+    listeners_.clear();
+    engines_.clear();
+}
+
+void epoll_controller::interrupt() {
+    // Add an always-readable fd with 0 data and no ONESHOT to interrupt all threads.
+    epoll_event ev = {0};
+    ev.events = EPOLLIN;
+    check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), "interrupt");
+}
+
+// Register make_epoll_controller() as proton::controller::create().
+proton::io::default_controller instance(make_epoll_controller);
+
+}
+
+// This is the only public function.
+std::unique_ptr<proton::controller> make_epoll_controller() {
+    return std::unique_ptr<proton::controller>(new epoll_controller());
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/options.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/options.hpp b/examples/cpp/options.hpp
index bd477b5..54f6503 100644
--- a/examples/cpp/options.hpp
+++ b/examples/cpp/options.hpp
@@ -25,6 +25,7 @@
 #include <vector>
 #include <stdexcept>
 
+namespace example {
 /** bad_option is thrown for option parsing errors */
 struct bad_option : public std::runtime_error {
     bad_option(const std::string& s) : std::runtime_error(s) {}
@@ -169,5 +170,6 @@ class options {
     opts opts_;
     bool help_;
 };
+}
 
 #endif // OPTIONS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/recurring_timer.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/recurring_timer.cpp b/examples/cpp/recurring_timer.cpp
index a4841b2..ef1d827 100644
--- a/examples/cpp/recurring_timer.cpp
+++ b/examples/cpp/recurring_timer.cpp
@@ -84,7 +84,7 @@ int main(int argc, char **argv) {
     // Command line options
     double running_time = 5;
     double tick = 0.25;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
     opts.add_value(running_time, 't', "running time", "running time in seconds", "RUNTIME");
     opts.add_value(tick, 'k', "tick time", "tick time as fraction of second", "TICK");
     try {
@@ -92,7 +92,7 @@ int main(int argc, char **argv) {
         recurring recurring_handler(int(running_time * 1000), int(tick * 1000));
         proton::container(recurring_handler).run();
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp
index 370c33b..f4c75d7 100644
--- a/examples/cpp/server.cpp
+++ b/examples/cpp/server.cpp
@@ -80,7 +80,7 @@ class server : public proton::handler {
 
 int main(int argc, char **argv) {
     std::string address("amqp://0.0.0.0:5672/examples");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "listen on URL", "URL");
 
@@ -91,7 +91,7 @@ int main(int argc, char **argv) {
         proton::container(srv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/server_direct.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server_direct.cpp b/examples/cpp/server_direct.cpp
index e092d5f..c4ecf8f 100644
--- a/examples/cpp/server_direct.cpp
+++ b/examples/cpp/server_direct.cpp
@@ -99,7 +99,7 @@ class server : public proton::handler {
 
 int main(int argc, char **argv) {
     std::string address("amqp://127.0.0.1:5672/examples");
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "listen on URL", "URL");
 
@@ -110,7 +110,7 @@ int main(int argc, char **argv) {
         proton::container(srv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp
index 091264a..53f87d9 100644
--- a/examples/cpp/simple_recv.cpp
+++ b/examples/cpp/simple_recv.cpp
@@ -70,7 +70,7 @@ int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
 
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL");
     opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT");
@@ -82,7 +82,7 @@ int main(int argc, char **argv) {
         proton::container(recv).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_send.cpp b/examples/cpp/simple_send.cpp
index 70aa4bc..f2a2477 100644
--- a/examples/cpp/simple_send.cpp
+++ b/examples/cpp/simple_send.cpp
@@ -78,7 +78,7 @@ class simple_send : public proton::handler {
 int main(int argc, char **argv) {
     std::string address("127.0.0.1:5672/examples");
     int message_count = 100;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
 
     opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
     opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT");
@@ -90,7 +90,7 @@ int main(int argc, char **argv) {
         proton::container(send).run();
 
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/examples/cpp/tutorial.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/tutorial.dox b/examples/cpp/tutorial.dox
new file mode 100644
index 0000000..b8383f7
--- /dev/null
+++ b/examples/cpp/tutorial.dox
@@ -0,0 +1,403 @@
+// -*-markdown-*-
+// NOTE: doxygen can include markdown pages directly but there seems to be a bug
+// that shows messed-up line numbers in \skip \until code extracts. This file
+// is markdown wrapped in a doxygen comment - which works. The file is best viewed/editied
+// as markdown.
+
+/**@page tutorial Tutorial
+
+This is a brief tutorial that will walk you through the fundamentals of building
+messaging applications in incremental steps. There are further examples, in
+addition the ones mentioned in the tutorial.
+
+Proton provides an "event-driven" programming model, where you implement a
+subclass of `proton::handler` and override functions that react to various AMQP
+events (connections opening and closing, messages being delivered and so on.)
+
+The examples below show how to implement handlers for clients and servers and
+how to run them using the `proton::container`, a portable, easy-to-use way to
+build single-threaded clients or servers.
+
+@note Proton can also be embedded or integrated into arbitrary IO frameworks and used
+to build multi-threaded applications. See namespace proton::mt for more.
+
+Some of the examples require an AMQP *broker* that can receive, store and send
+messages. @ref broker.hpp and @ref broker.cpp define a simple example
+broker. Run without arguments it listens on `0.0.0.0:5672`, the standard AMQP
+port on all network interfaces. To use a different port or network interface:
+
+    broker -a <host>:<port>
+
+Instead of the example broker, you can use any AMQP 1.0 compliant broker. You
+must configure your broker to have a queue (or topic) named "examples".
+
+The `helloworld` examples take an optional URL argument. The other examples take
+an option `-a URL`. A URL looks like:
+
+    HOST:PORT/ADDRESS
+
+It usually defaults to `127.0.0.1:5672/examples`, but you can change this if
+your broker is on a different host or port, or you want to use a different queue
+or topic name (the ADDRESS part of the URL). URL details are at `proton::url`
+
+Hello World!
+------------
+
+\dontinclude helloworld.cpp
+
+Tradition dictates that we start with hello world! This example sends a message
+to a broker and the receives the same message back to demonstrate sending and
+receiving. In a realistic system the sender and receiver would normally be in
+different processes. The complete example is @ref helloworld.cpp
+
+We will include the following classes: `proton::container` runs an event loop
+which dispatches events to a `proton::handler`. This allows a *reactive*
+style of programming which is well suited to messaging applications. `proton::url` is a simple parser for the URL format mentioned above.
+
+\skip   proton/container
+\until  proton/url
+
+We will define a class `hello_world` which is a subclass of
+`proton::handler` and over-rides functions to handle the events
+of interest in sending and receiving a message.
+
+\skip class hello_world
+\until {}
+
+`on_start()` is called when the event loop first starts. We handle that by
+establishing a connection and creating a sender and a receiver.
+
+\skip on_start
+\until }
+
+`on_sendable()` is called when message can be transferred over the associated
+sender link to the remote peer. We create a `proton::message`, set the message
+body to `"Hello World!"` and send the message. Then we close the sender as we only
+want to send one message. Closing the sender will prevent further calls to
+`on_sendable()`.
+
+\skip on_sendable
+\until }
+
+`on_message()` is called when a message is received. We just print the body of
+the message and close the connection, as we only want one message
+
+\skip on_message
+\until }
+
+The message body is a `proton::value`, see the documentation for more on how to
+extract the message body as type-safe C++ values.
+
+Our `main` function creates an instance of the `hello_world` handler and a
+proton::container using that handler. Calling `proton::container::run` sets
+things in motion and returns when we close the connection as there is nothing
+further to do. It may throw an exception, which will be a subclass of
+`proton::error`. That in turn is a subclass of `std::exception`.
+
+\skip main
+\until }
+\until }
+\until }
+
+Hello World, Direct!
+--------------------
+
+\dontinclude helloworld_direct.cpp
+
+Though often used in conjunction with a broker, AMQP does not *require* this. It
+also allows senders and receivers to communicate directly if desired.
+
+We will modify our example to send a message directly to itself. This is a bit
+contrived but illustrates both sides of the direct send/receive scenario. Full
+code at @ref helloworld_direct.cpp
+
+The first difference, is that rather than creating a receiver on the same
+connection as our sender, we listen for incoming connections by invoking the
+`proton::container::listen()` method on the container.
+
+\skip on_start
+\until }
+
+As we only need then to initiate one link, the sender, we can do that by
+passing in a url rather than an existing connection, and the connection
+will also be automatically established for us.
+
+We send the message in response to the `on_sendable()` callback and
+print the message out in response to the `on_message()` callback exactly
+as before.
+
+\skip on_sendable
+\until }
+\until }
+
+However we also handle two new events. We now close the connection from
+the senders side once the message has been accepted.
+The acceptance of the message is an indication of successful transfer to the
+peer. We are notified of that event through the `on_delivery_accept()`
+callback.
+
+\skip on_delivery_accept
+\until }
+
+Then, once the connection has been closed, of which we are
+notified through the `on_connection_close()` callback, we stop accepting incoming
+connections at which point there is no work to be done and the
+event loop exits, and the run() method will return.
+
+\skip on_connection_close
+\until }
+
+So now we have our example working without a broker involved!
+
+Note that for this example we pick an "unusual" port 8888 since we are talking
+to ourselves rather than a broker.
+
+\skipline url =
+
+Asynchronous Send and Receive
+-----------------------------
+
+Of course, these `HelloWorld!` examples are very artificial, communicating as
+they do over a network connection but with the same process. A more realistic
+example involves communication between separate processes (which could indeed be
+running on completely separate machines).
+
+Let's separate the sender from the receiver, and transfer more than a single
+message between them.
+
+We'll start with a simple sender @ref simple_send.cpp.
+
+\dontinclude simple_send.cpp
+
+As with the previous example, we define the application logic in a class that
+handles events. Because we are transferring more than one message, we need to
+keep track of how many we have sent. We'll use a `sent` member variable for
+that.  The `total` member variable will hold the number of messages we want to
+send.
+
+\skip class simple_send
+\until total
+
+As before, we use the `on_start()` event to establish our sender link over which
+we will transfer messages.
+
+\skip on_start
+\until }
+
+AMQP defines a credit-based flow control mechanism. Flow control allows
+the receiver to control how many messages it is prepared to receive at a
+given time and thus prevents any component being overwhelmed by the
+number of messages it is sent.
+
+In the `on_sendable()` callback, we check that our sender has credit
+before sending messages. We also check that we haven't already sent the
+required number of messages.
+
+\skip on_sendable
+\until }
+\until }
+
+The `proton::sender::send()` call above is asynchronous. When it returns the
+message has not yet actually been transferred across the network to the
+receiver. By handling the `on_accepted()` event, we can get notified when the
+receiver has received and accepted the message. In our example we use this event
+to track the confirmation of the messages we have sent. We only close the
+connection and exit when the receiver has received all the messages we wanted to
+send.
+
+\skip on_delivery_accept
+\until }
+\until }
+
+If we are disconnected after a message is sent and before it has been
+confirmed by the receiver, it is said to be `in doubt`. We don't know
+whether or not it was received. In this example, we will handle that by
+resending any in-doubt messages. This is known as an 'at-least-once'
+guarantee, since each message should eventually be received at least
+once, though a given message may be received more than once (i.e.
+duplicates are possible). In the `on_disconnected()` callback, we reset
+the sent count to reflect only those that have been confirmed. The
+library will automatically try to reconnect for us, and when our sender
+is sendable again, we can restart from the point we know the receiver
+got to.
+
+\skip on_disconnect
+\until }
+
+\dontinclude simple_recv.cpp
+
+Now let's look at the corresponding receiver @ref simple_recv.cpp
+
+This time we'll use an `expected` member variable for for the number of messages we expect and
+a `received` variable to count how many we have received so far.
+
+\skip class simple_recv
+\until received
+
+We handle `on_start()` by creating our receiver, much like we
+did for the sender.
+
+\skip on_start
+\until }
+
+We also handle the `on_message()` event for received messages and print the
+message out as in the `Hello World!` examples.  However we add some logic to
+allow the receiver to wait for a given number of messages, then to close the
+connection and exit. We also add some logic to check for and ignore duplicates,
+using a simple sequential id scheme.
+
+\skip on_message
+\until }
+
+Direct Send and Receive
+-----------------------
+
+Sending between these two examples requires an intermediary broker since neither
+accepts incoming connections. AMQP allows us to send messages directly between
+two processes. In that case one or other of the processes needs to accept
+incoming connections. Let's create a modified version of the receiving example
+that does this with @ref direct_recv.cpp
+
+\dontinclude direct_recv.cpp
+
+There are only two differences here. Instead of initiating a link (and
+implicitly a connection), we listen for incoming connections.
+
+
+\skip on_start
+\until }
+
+When we have received all the expected messages, we then stop listening for
+incoming connections by closing the acceptor object.
+
+\skip on_message
+\until }
+\until }
+\until }
+\until }
+
+You can use the @ref simple_send.cpp example to send to this receiver
+directly. (Note: you will need to stop any broker that is listening on the 5672
+port, or else change the port used by specifying a different address to each
+example via the -a command line switch).
+
+We can also modify the sender to allow the original receiver to connect to it,
+in @ref direct_send.cpp. Again that just requires two modifications:
+
+\dontinclude direct_send.cpp
+
+As with the modified receiver, instead of initiating establishment of a
+link, we listen for incoming connections.
+
+\skip on_start
+\until }
+
+When we have received confirmation of all the messages we sent, we can
+close the acceptor in order to exit.
+
+\skip on_delivery_accept
+\until }
+\until }
+
+To try this modified sender, run the original @ref simple_recv.cpp against it.
+
+The symmetry in the underlying AMQP that enables this is quite unique and
+elegant, and in reflecting this the proton API provides a flexible toolkit for
+implementing all sorts of interesting intermediaries (@ref broker.hpp and @ref
+broker.cpp provide a simple broker for testing purposes is an example of this).
+
+Request/Response
+----------------
+
+A common pattern is to send a request message and expect a response message in
+return. AMQP has special support for this pattern. Let's have a look at a simple
+example. We'll start with @ref server.cpp, the program that will process the
+request and send the response. Note that we are still using a broker in this
+example.
+
+Our server will provide a very simple service: it will respond with the
+body of the request converted to uppercase.
+
+\dontinclude server.cpp
+\skip class server
+\until };
+
+The code here is not too different from the simple receiver example.  When we
+receive a request in `on_message` however, we look at the
+`proton::message::reply_to` address and create a sender with that address for
+the response. We'll cache the senders incase we get further requests with the
+same `reply_to`.
+
+Now let's create a simple @ref client.cpp to test this service out.
+
+\dontinclude client.cpp
+
+Our client takes a list of strings to send as requests
+
+\skipline client(
+
+Since we will be sending and receiving, we create a sender and a receiver in
+`on_start`.  Our receiver has a blank address and sets the `dynamic` flag to
+true, which means we expect the remote end (broker or server) to assign a unique
+address for us.
+
+\skip on_start
+\until }
+
+Now a function to send the next request from our list of requests. We set the
+reply_to address to be the dynamically assigned address of our receiver.
+
+\skip send_request
+\until }
+
+We need to use the address assigned by the broker as the `reply_to` address of
+our requests, so we can't send them until our receiver has been set up. To do
+that, we add an `on_link_open()` method to our handler class, and if the link
+associated with event is the receiver, we use that as the trigger to send our
+first request.
+
+\skip on_link_open
+\until }
+
+When we receive a reply, we send the next request.
+
+\skip on_message
+\until }
+\until }
+\until }
+
+Direct Request/Response
+-----------------------
+
+We can avoid the intermediary process by writing a server that accepts
+connections directly, @ref server_direct.cpp. It involves the following changes
+to our original server:
+
+\dontinclude server_direct.cpp
+
+Our server must generate a unique reply-to addresses for links from the
+client that request a dynamic address (previously this was done by the broker.)
+We use a simple counter.
+
+\skip generate_address
+\until }
+
+Next we need to handle incoming requests for links with dynamic addresses from
+the client.  We give the link a unique address and record it in our `senders`
+map.
+
+\skip on_link_open
+\until }
+
+Note we are interested in *sender* links above because we are implementing the
+server. A *receiver* link created on the client corresponds to a *sender* link
+on the server.
+
+Finally when we receive a message we look up its `reply_to` in our senders map and send the reply.
+
+\skip on_message
+\until }
+\until }
+\until }
+
+*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index e1a826f..6b1459e 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -25,6 +25,7 @@ include_directories(
   "${CMAKE_CURRENT_SOURCE_DIR}/src")
 
 set(qpid-proton-cpp-source
+  ${qpid-proton-mt-source}
   src/acceptor.cpp
   src/binary.cpp
   src/byte_array.cpp
@@ -79,11 +80,10 @@ set(qpid-proton-cpp-source
   src/value.cpp
   )
 
-if(MSVC)
-  list(APPEND qpid-proton-cpp-source src/io/windows/socket.cpp)
-else(MSVC)
-  list(APPEND qpid-proton-cpp-source src/io/posix/socket.cpp)
-endif(MSVC)
+if (BUILD_CPP_MT)
+  list(APPEND qpid-proton-cpp-source src/controller.cpp)
+endif()
+
 
 set_source_files_properties (
   ${qpid-proton-cpp-source}
@@ -103,6 +103,9 @@ set_target_properties (
   SOVERSION "${PN_LIB_SOMAJOR}"
   LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}"
   )
+if (BUILD_CPP_MT)
+  set_target_properties(qpid-proton-cpp PROPERTIES CXX_STANDARD 11)
+endif()
 
 ## Install
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/cpp.cmake
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/cpp.cmake b/proton-c/bindings/cpp/cpp.cmake
index 3f4c313..f89cc0d 100644
--- a/proton-c/bindings/cpp/cpp.cmake
+++ b/proton-c/bindings/cpp/cpp.cmake
@@ -31,4 +31,7 @@ if (CMAKE_CXX_COMPILER)
   if (HAS_STD_PTR)
     add_definitions(-DPN_CPP_HAS_STD_PTR=1)
   endif()
+  check_cxx_source_compiles("#if defined(__cplusplus) && __cplusplus >= 201100\nint main(int, char**) { return 0; }\n#endif" HAS_CPP11)
+
+  option(BUILD_CPP_MT "Build C++ multi-thread extensions, requires C++11" ${HAS_CPP11})
 endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/mainpage.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mainpage.md b/proton-c/bindings/cpp/docs/mainpage.md
index 8ad34cb..dffb3ed 100644
--- a/proton-c/bindings/cpp/docs/mainpage.md
+++ b/proton-c/bindings/cpp/docs/mainpage.md
@@ -4,7 +4,7 @@ This is the C++ API for the Proton AMQP protocol engine. It allows you
 to write client and server applications that send and receive AMQP
 messages.
 
-The best way to start is with the \ref tutorial "tutorial".
+The best way to start is with the @ref tutorial
 
 ## An overview of the AMQP model
 
@@ -22,13 +22,13 @@ Sessions are created over a `proton::connection`, which represents the
 network connection. You can create links directly on a connection
 using its default session if you don't need multiple sessions.
 
-`proton::message` represents the message: the body (content), headers,
-annotations, and so on. A `proton::delivery` represents the act of
-transferring a message over a link. The receiver acknowledges the
-delivery by accepting or rejecting it. The delivery is *settled* when
-both ends are done with it.  Different settlement methods give
-different levels of reliability: *at-most-once*, *at-least-once*, and
-*exactly-once*. See "Delivery Guarantees" below for details.
+`proton::message` represents the message: the body (content), properties,
+headers and annotations. A `proton::delivery` represents the act of transferring
+a message over a link. The receiver acknowledges the delivery by accepting or
+rejecting it. The delivery is *settled* when both ends are done with it.
+Different settlement methods give different levels of reliability:
+*at-most-once*, *at-least-once*, and *exactly-once*. See "Delivery Guarantees"
+below for details.
 
 ## Sources and targets
 
@@ -58,78 +58,9 @@ the client's dynamic receiver.
 
 In the case of a broker, a dynamic address usually corresponds to a
 temporary queue, but any AMQP request-response server can use this
-technique. The \ref server_direct.cpp example illustrates how to
+technique. The @ref server_direct.cpp example illustrates how to
 implement a queueless request-response server.
 
-## Anatomy of a Proton application
-
-To send AMQP commands, call methods on classes like
-`proton::connection`, `proton::sender`, `proton::receiver`, or
-`proton::delivery`. To handle incoming commands, implement the
-`proton::handler` interface. The handler receives calls like
-`on_message` (a message is received), `on_link_open` (a link is
-opened), and `on_sendable` (a link is ready to send messages).
-
-Messages are represented by `proton::message`. AMQP defines a type
-encoding that you can use for interoperability, but you can also use
-any encoding you wish and pass binary data as the
-`proton::message::body`. `proton::value` and `proton::scalar` provide
-conversion between AMQP and C++ data types.
-
-<!-- See the example \ref encode_decode.cpp. -->
-
-There are two alternative ways to manage handlers and AMQP objects,
-the `proton::container` and the `proton::connection_engine`. You can
-code your application so that it can be run with either. Say you find
-the `proton::container` easier to get started but later need more
-flexibility.  You can switch to the `proton::connection_engine` with
-little or no change to your handlers.
-
-### %proton::container
-
-`proton::container` is a *reactor* framework that manages multiple
-connections and dispatches events to handlers. You implement
-`proton::handler` with your logic to react to events, and register it
-with the container. The container lets you open multiple connections
-and links, receive incoming connections and links, and send, receive,
-and acknowledge messages.
-
-The reactor handles IO for multiple connections using sockets and
-`poll`. It dispatches events to your handlers in a single thread,
-where you call `proton::container::run`. The container is not
-thread-safe: once it is running you can only operate on Proton objects
-from your handler methods.
-
-### %proton::connection_engine
-
-`proton::connection_engine` dispatches events for a *single
-connection*. The subclass `proton::io::socket::engine` does
-socket-based IO. An application with a single connection is just like
-using `proton::container` except you attach your handler to a
-`proton::io::socket::engine` instead. You can compare examples, such as
-\ref helloworld.cpp and \ref engine/helloworld.cpp.
-
-Now consider multiple connections. `proton::container` is easy to use
-but restricted to a single thread. `proton::connection_engine` is not
-thread-safe either, but *each engine is independent*. You can process
-different connections in different threads, or use a thread pool to
-process a dynamic set of connections.
-
-The engine does not provide built-in polling and listening like the
-`proton::container`, but you can drive engines using any polling or
-threading strategy and any IO library (for example, epoll, kqueue,
-solaris completion ports, IOCP proactor, boost::asio, libevent, etc.)
-This can be important for optimizing performance or supporting diverse
-platforms. The example \ref engine/broker.cpp shows a broker using
-sockets and poll, but you can see how the code could be adapted.
-
-`proton::connection_engine` also does not dictate the IO mechanism,
-but it is an abstract class. `proton::socket::engine` provides
-ready-made socket-based IO, but you can write your own subclass with
-any IO code. Just override the `io_read`, `io_write` and `io_close`
-methods. For example, the proton test suite implements an in-memory
-connection using `std::deque` for test purposes.
-
 ## Delivery guarantees
 
 For *at-most-once*, the sender settles the message as soon as it sends
@@ -152,3 +83,68 @@ informs the sender of all the unsettled deliveries it knows about, and
 from this the sender can deduce which need to be redelivered. The
 sender likewise informs the receiver which deliveries it knows about,
 from which the receiver can deduce which have already been settled.
+
+## Anatomy of a Proton application
+
+To send AMQP commands, call methods on classes like `proton::connection`,
+`proton::sender`, `proton::receiver`, or `proton::delivery`.
+
+To handle incoming commands, subclass the `proton::handler` interface. The
+handler member functions are called when AMQP protocol events occur on a
+connection. For example `proton::handler::on_message` is called when a message
+is received.
+
+Messages are represented by `proton::message`. AMQP defines a type
+encoding that you can use for interoperability, but you can also use
+any encoding you wish and pass binary data as the
+`proton::message::body`. `proton::value` and `proton::scalar` provide
+conversion between AMQP and C++ data types.
+
+There are several ways to manage handlers and AMQP objects, for different types
+of application. All of them use the same `proton::handler` sub-classes so code
+can be re-used if you change your approach.
+
+### %proton::container - easy single-threaded applications
+
+`proton::container` is an easy way to get started with single threaded
+applications. It manages the low-level socket connection and polling for you so
+you can write portable applications using just the @ref proton API. You an use
+proton::connection::connect() and proton::container::listen() to create
+connections. The container polls multiple connections and calls protocol eventsa
+on your `proton::handler` sub-classes.
+
+The container has two limitations:
+- it is not thread-safe, all container work must happen in a single thread.
+- it provides portable IO handling for you but cannot be used for different types of IO.
+
+### %proton::mt::controller - multi-threaded applications
+
+The proton::controller is similar to the proton::container but it can process
+connections in multiple threads concurrently. It uses the `proton::handler` like
+the container. If you follow the recommended model you can re-use statefull,
+per-connection, handlers with the controller. See @ref mt_page and the examples
+@ref mt/broker.cpp and @ref mt/epoll\_controller.cpp
+
+Default controller IO implementations are provided but you can also implement
+your own controller using the proton::io::connection_engine, read on...
+
+### %proton::io::connection_engine - integrating with foreign IO
+
+The `proton::io::connection_engine` is different from the other proton APIs. You
+might think of it as more like an SPI (Service Provided Interface).
+
+The engine provides a very low-level way of driving a proton::handler: You feed
+raw byte-sequence fragments of an AMQP-encoded stream to the engine and it
+converts that into calls on a proton::handler. The engine provides you with
+outgoing protocol stream bytes in return.
+
+The engine is deliberately very simple and low level. It does no IO, no
+thread-related locking, and is written in simple C++98 compatible code.
+
+You can use the engine directly to connect your application to any kind of IO
+framework or library, memory-based streams or any other source/sink for byte
+stream data.
+
+You can also use the engine to build a custom implementation of
+proton::mt::controller and proton::mt::work\_queue so portable proton
+applications using the controller can run without modification on your platform.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/mt_page.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mt_page.md b/proton-c/bindings/cpp/docs/mt_page.md
new file mode 100644
index 0000000..bed53e9
--- /dev/null
+++ b/proton-c/bindings/cpp/docs/mt_page.md
@@ -0,0 +1,21 @@
+# Multi-threaded proton {#mt_page}
+
+Most classes in namespace @ref proton are not thread-safe. Objects associated
+with a single connection *must not* be used concurrently. However objects
+associated with *different* connections *can* be used concurrently in separate
+threads.
+
+The recommended way to use proton multi-threaded is to *serialize* the work for
+each connection but allow different connections to be processed concurrently.
+
+proton::controller allows you to manage connections in a multi-threaded way. You
+supply a proton::handler for each connection. Proton will ensure that the
+`proton::handler::on_*()` functions are never called concurrently so
+per-connection handlers do not need a lock even if they have state.
+
+proton::work_queue allows you to make calls to arbitrary functions or other
+code, serialized in the same way as `proton::handler::on_()` calls. Typically
+this is used to call your own handler's member functions in the same way as
+proton::handler override functions.
+
+For examples see @ref mt/broker.cpp, mt/simple\_send.cpp and mt/simple\_recv.cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/tutorial.dox
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/tutorial.dox b/proton-c/bindings/cpp/docs/tutorial.dox
deleted file mode 100644
index e40a3e7..0000000
--- a/proton-c/bindings/cpp/docs/tutorial.dox
+++ /dev/null
@@ -1,428 +0,0 @@
-// -*-markdown-*-
-// NOTE: doxygen can include markdown pages directly but there seems to be a bug
-// that shows messed-up line numbers in \skip \until code extracts so this file
-// is markdown wrapped in a C++ comment - which works.
-
-/**\page tutorial Tutorial
-
-This is a brief tutorial that will walk you through the fundamentals of building
-messaging applications in incremental steps. There are further examples, in
-addition the ones mentioned in the tutorial.
-
-Some of the examples require an AMQP *broker* that can receive, store and send
-messages. \ref broker.hpp and \ref broker.cpp define a simple example
-broker. Run without arguments it listens on `0.0.0.0:5672`, the standard AMQP
-port on all network interfaces. To use a different port or network interface:
-
-    broker -a <host>:<port>
-
-Instead of the example broker, you can use any AMQP 1.0 compliant broker. You
-must configure your broker to have a queue (or topic) named "examples".
-
-The `helloworld` examples take an optional URL argument. The other examples take
-an option `-a URL`. A URL looks like:
-
-    HOST:PORT/ADDRESS
-
-It usually defaults to `127.0.0.1:5672/examples`, but you can change this if
-your broker is on a different host or port, or you want to use a different queue
-or topic name (the ADDRESS part of the URL). URL details are at `proton::url`
-
-The first part of the tutorial uses the `proton::container`, later we will
-show some of the same examples implemented using the `proton::connection_engine`.
-Most of the code is the same for either approach.
-
-Hello World!
-------------
-
-\dontinclude helloworld.cpp
-
-Tradition dictates that we start with hello world! This example sends a message
-to a broker and the receives the same message back to demonstrate sending and
-receiving. In a realistic system the sender and receiver would normally be in
-different processes. The complete example is \ref helloworld.cpp
-
-We will include the following classes: `proton::container` runs an event loop
-which dispatches events to a `proton::handler`. This allows a *reactive*
-style of programming which is well suited to messaging applications. `proton::url` is a simple parser for the URL format mentioned above.
-
-\skip   proton/container
-\until  proton/url
-
-We will define a class `hello_world` which is a subclass of
-`proton::handler` and over-rides functions to handle the events
-of interest in sending and receiving a message.
-
-\skip class hello_world
-\until {}
-
-`on_start()` is called when the event loop first starts. We handle that by
-establishing a connection and creating a sender and a receiver.
-
-\skip on_start
-\until }
-
-`on_sendable()` is called when message can be transferred over the associated
-sender link to the remote peer. We create a `proton::message`, set the message
-body to `"Hello World!"` and send the message. Then we close the sender as we only
-want to send one message. Closing the sender will prevent further calls to
-`on_sendable()`.
-
-\skip on_sendable
-\until }
-
-`on_message()` is called when a message is received. We just print the body of
-the message and close the connection, as we only want one message
-
-\skip on_message
-\until }
-
-The message body is a `proton::value`, see the documentation for more on how to
-extract the message body as type-safe C++ values.
-
-Our `main` function creates an instance of the `hello_world` handler and a
-proton::container using that handler. Calling `proton::container::run` sets
-things in motion and returns when we close the connection as there is nothing
-further to do. It may throw an exception, which will be a subclass of
-`proton::error`. That in turn is a subclass of `std::exception`.
-
-\skip main
-\until }
-\until }
-\until }
-
-Hello World, Direct!
---------------------
-
-\dontinclude helloworld_direct.cpp
-
-Though often used in conjunction with a broker, AMQP does not *require* this. It
-also allows senders and receivers to communicate directly if desired.
-
-We will modify our example to send a message directly to itself. This is a bit
-contrived but illustrates both sides of the direct send/receive scenario. Full
-code at \ref helloworld_direct.cpp
-
-The first difference, is that rather than creating a receiver on the same
-connection as our sender, we listen for incoming connections by invoking the
-`proton::container::listen()` method on the container.
-
-\skip on_start
-\until }
-
-As we only need then to initiate one link, the sender, we can do that by
-passing in a url rather than an existing connection, and the connection
-will also be automatically established for us.
-
-We send the message in response to the `on_sendable()` callback and
-print the message out in response to the `on_message()` callback exactly
-as before.
-
-\skip on_sendable
-\until }
-\until }
-
-However we also handle two new events. We now close the connection from
-the senders side once the message has been accepted.
-The acceptance of the message is an indication of successful transfer to the
-peer. We are notified of that event through the `on_delivery_accept()`
-callback.
-
-\skip on_delivery_accept
-\until }
-
-Then, once the connection has been closed, of which we are
-notified through the `on_connection_close()` callback, we stop accepting incoming
-connections at which point there is no work to be done and the
-event loop exits, and the run() method will return.
-
-\skip on_connection_close
-\until }
-
-So now we have our example working without a broker involved!
-
-Note that for this example we pick an "unusual" port 8888 since we are talking
-to ourselves rather than a broker.
-
-\skipline url =
-
-Asynchronous Send and Receive
------------------------------
-
-Of course, these `HelloWorld!` examples are very artificial, communicating as
-they do over a network connection but with the same process. A more realistic
-example involves communication between separate processes (which could indeed be
-running on completely separate machines).
-
-Let's separate the sender from the receiver, and transfer more than a single
-message between them.
-
-We'll start with a simple sender \ref simple_send.cpp.
-
-\dontinclude simple_send.cpp
-
-As with the previous example, we define the application logic in a class that
-handles events. Because we are transferring more than one message, we need to
-keep track of how many we have sent. We'll use a `sent` member variable for
-that.  The `total` member variable will hold the number of messages we want to
-send.
-
-\skip class simple_send
-\until total
-
-As before, we use the `on_start()` event to establish our sender link over which
-we will transfer messages.
-
-\skip on_start
-\until }
-
-AMQP defines a credit-based flow control mechanism. Flow control allows
-the receiver to control how many messages it is prepared to receive at a
-given time and thus prevents any component being overwhelmed by the
-number of messages it is sent.
-
-In the `on_sendable()` callback, we check that our sender has credit
-before sending messages. We also check that we haven't already sent the
-required number of messages.
-
-\skip on_sendable
-\until }
-\until }
-
-The `proton::sender::send()` call above is asynchronous. When it returns the
-message has not yet actually been transferred across the network to the
-receiver. By handling the `on_accepted()` event, we can get notified when the
-receiver has received and accepted the message. In our example we use this event
-to track the confirmation of the messages we have sent. We only close the
-connection and exit when the receiver has received all the messages we wanted to
-send.
-
-\skip on_delivery_accept
-\until }
-\until }
-
-If we are disconnected after a message is sent and before it has been
-confirmed by the receiver, it is said to be `in doubt`. We don't know
-whether or not it was received. In this example, we will handle that by
-resending any in-doubt messages. This is known as an 'at-least-once'
-guarantee, since each message should eventually be received at least
-once, though a given message may be received more than once (i.e.
-duplicates are possible). In the `on_disconnected()` callback, we reset
-the sent count to reflect only those that have been confirmed. The
-library will automatically try to reconnect for us, and when our sender
-is sendable again, we can restart from the point we know the receiver
-got to.
-
-\skip on_disconnect
-\until }
-
-\dontinclude simple_recv.cpp
-
-Now let's look at the corresponding receiver \ref simple_recv.cpp
-
-This time we'll use an `expected` member variable for for the number of messages we expect and
-a `received` variable to count how many we have received so far.
-
-\skip class simple_recv
-\until received
-
-We handle `on_start()` by creating our receiver, much like we
-did for the sender.
-
-\skip on_start
-\until }
-
-We also handle the `on_message()` event for received messages and print the
-message out as in the `Hello World!` examples.  However we add some logic to
-allow the receiver to wait for a given number of messages, then to close the
-connection and exit. We also add some logic to check for and ignore duplicates,
-using a simple sequential id scheme.
-
-\skip on_message
-\until }
-
-Direct Send and Receive
------------------------
-
-Sending between these two examples requires an intermediary broker since neither
-accepts incoming connections. AMQP allows us to send messages directly between
-two processes. In that case one or other of the processes needs to accept
-incoming connections. Let's create a modified version of the receiving example
-that does this with \ref direct_recv.cpp
-
-\dontinclude direct_recv.cpp
-
-There are only two differences here. Instead of initiating a link (and
-implicitly a connection), we listen for incoming connections.
-
-
-\skip on_start
-\until }
-
-When we have received all the expected messages, we then stop listening for
-incoming connections by closing the acceptor object.
-
-\skip on_message
-\until }
-\until }
-\until }
-\until }
-
-You can use the \ref simple_send.cpp example to send to this receiver
-directly. (Note: you will need to stop any broker that is listening on the 5672
-port, or else change the port used by specifying a different address to each
-example via the -a command line switch).
-
-We can also modify the sender to allow the original receiver to connect to it,
-in \ref direct_send.cpp. Again that just requires two modifications:
-
-\dontinclude direct_send.cpp
-
-As with the modified receiver, instead of initiating establishment of a
-link, we listen for incoming connections.
-
-\skip on_start
-\until }
-
-When we have received confirmation of all the messages we sent, we can
-close the acceptor in order to exit.
-
-\skip on_delivery_accept
-\until }
-\until }
-
-To try this modified sender, run the original \ref simple_recv.cpp against it.
-
-The symmetry in the underlying AMQP that enables this is quite unique and
-elegant, and in reflecting this the proton API provides a flexible toolkit for
-implementing all sorts of interesting intermediaries (\ref broker.hpp and \ref
-broker.cpp provide a simple broker for testing purposes is an example of this).
-
-Request/Response
-----------------
-
-A common pattern is to send a request message and expect a response message in
-return. AMQP has special support for this pattern. Let's have a look at a simple
-example. We'll start with \ref server.cpp, the program that will process the
-request and send the response. Note that we are still using a broker in this
-example.
-
-Our server will provide a very simple service: it will respond with the
-body of the request converted to uppercase.
-
-\dontinclude server.cpp
-\skip class server
-\until };
-
-The code here is not too different from the simple receiver example.  When we
-receive a request in `on_message` however, we look at the
-`proton::message::reply_to` address and create a sender with that address for
-the response. We'll cache the senders incase we get further requests with the
-same `reply_to`.
-
-Now let's create a simple \ref client.cpp to test this service out.
-
-\dontinclude client.cpp
-
-Our client takes a list of strings to send as requests
-
-\skipline client(
-
-Since we will be sending and receiving, we create a sender and a receiver in
-`on_start`.  Our receiver has a blank address and sets the `dynamic` flag to
-true, which means we expect the remote end (broker or server) to assign a unique
-address for us.
-
-\skip on_start
-\until }
-
-Now a function to send the next request from our list of requests. We set the
-reply_to address to be the dynamically assigned address of our receiver.
-
-\skip send_request
-\until }
-
-We need to use the address assigned by the broker as the `reply_to` address of
-our requests, so we can't send them until our receiver has been set up. To do
-that, we add an `on_link_open()` method to our handler class, and if the link
-associated with event is the receiver, we use that as the trigger to send our
-first request.
-
-\skip on_link_open
-\until }
-
-When we receive a reply, we send the next request.
-
-\skip on_message
-\until }
-\until }
-\until }
-
-Direct Request/Response
------------------------
-
-We can avoid the intermediary process by writing a server that accepts
-connections directly, \ref server_direct.cpp. It involves the following changes
-to our original server:
-
-\dontinclude server_direct.cpp
-
-Our server must generate a unique reply-to addresses for links from the
-client that request a dynamic address (previously this was done by the broker.)
-We use a simple counter.
-
-\skip generate_address
-\until }
-
-Next we need to handle incoming requests for links with dynamic addresses from
-the client.  We give the link a unique address and record it in our `senders`
-map.
-
-\skip on_link_open
-\until }
-
-Note we are interested in *sender* links above because we are implementing the
-server. A *receiver* link created on the client corresponds to a *sender* link
-on the server.
-
-Finally when we receive a message we look up its `reply_to` in our senders map and send the reply.
-
-\skip on_message
-\until }
-\until }
-\until }
-
-Connection Engine
------------------
-
-The `proton::connection_engine` is an alternative to the container. For simple
-applications with a single connection, its use is about the same as the the
-`proton::container`, but it allows more flexibility for multi-threaded
-applications or applications with unusual IO requirements.
-
-\dontinclude engine/helloworld.cpp
-
-We'll look at the \ref engine/helloworld.cpp example step-by-step to see how it differs
-from the container \ref helloworld.cpp version.
-
-First we include the `proton::io::socket::engine` class, which is a `proton::connection_engine`
-that uses socket IO.
-
-\skipline proton/io.hpp
-
-Our `hello_world` class differs only in the `on_start()` method. Instead of
-calling `container.connect()`, we simply call `proton::connection::open` to open the
-engine's' connection:
-
-\skip on_start
-\until }
-
-Our `main` function only differs in that it creates and runs a `socket::engine`
-instead of a `container`.
-
-\skip main
-\until }
-\until }
-\until }
-
-*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/docs/user.doxygen.in
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/user.doxygen.in b/proton-c/bindings/cpp/docs/user.doxygen.in
index 26b1784..7628151 100644
--- a/proton-c/bindings/cpp/docs/user.doxygen.in
+++ b/proton-c/bindings/cpp/docs/user.doxygen.in
@@ -53,8 +53,9 @@ WARNINGS               = YES
 
 # Configuration options related to the input files
 
-INPUT = @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/include @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/docs @CMAKE_SOURCE_DIR@/examples/cpp/README.dox
+INPUT = @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/include @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/docs @CMAKE_SOURCE_DIR@/examples/cpp
 FILE_PATTERNS          = *.hpp *.md *.dox
+EXCLUDE_PATTERNS       = @CMAKE_SOURCE_DIR@/examples/*.?pp  # Don't parse example sources, only *.dox
 FULL_PATH_NAMES        = YES
 RECURSIVE              = YES
 STRIP_FROM_PATH        = @CMAKE_SOURCE_DIR@/proton-c/bindings/cpp/include

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index d2efab9..a4b7246 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -139,14 +139,18 @@ class connection_options {
     PN_CPP_EXTERN connection_options& sasl_config_path(const std::string &);
     /// @endcond
 
-    /// @cond INTERNAL
+    /// Update option values from values set in other.
+    PN_CPP_EXTERN connection_options& update(const connection_options& other);
+
+    /// Copy and update option values from values set in other.
+    PN_CPP_EXTERN connection_options update(const connection_options& other) const;
+
   private:
     void apply(connection&) const;
     proton_handler* handler() const;
     static pn_connection_t *pn_connection(connection &);
     class ssl_client_options &ssl_client_options();
     class ssl_server_options &ssl_server_options();
-    PN_CPP_EXTERN void update(const connection_options& other);
 
     class impl;
     internal::pn_unique_ptr<impl> impl_;
@@ -154,7 +158,6 @@ class connection_options {
     friend class container_impl;
     friend class connector;
     friend class io::connection_engine;
-    /// @endcond
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/controller.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/controller.hpp b/proton-c/bindings/cpp/include/proton/controller.hpp
new file mode 100644
index 0000000..6b0784c
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/controller.hpp
@@ -0,0 +1,118 @@
+#ifndef PROTON_MT_HPP
+#define PROTON_MT_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#include <proton/handler.hpp>
+#include <proton/connection_options.hpp>
+#include <proton/error_condition.hpp>
+
+#include <functional>
+#include <memory>
+
+namespace proton {
+
+class connection;
+
+/// The controller lets the application initiate and listen for connections, and
+/// start/stop overall IO activity. A single controller can manage many
+/// connections.
+///
+/// A controller associates a proton::handler with a proton::connection (the
+/// AMQP protocol connection) and the corresponding proton::transport
+/// (representing the underlying IO connection)
+///
+/// The first call to a proton::handler is always handler::on_transport_open(),
+/// the last is handler::on_transport_close(). Handlers can be deleted after
+/// handler::on_transport_close().
+///
+class controller {
+  public:
+    /// Create an instance of the default controller implementation.
+    /// @param container_id set on connections for this controller.
+    /// If empty, generate a random QUID default.
+    PN_CPP_EXTERN static std::unique_ptr<controller> create();
+
+    /// Get the controller associated with a connection.
+    /// @throw proton::error if this is not a controller-managed connection.
+    PN_CPP_EXTERN static controller& get(const proton::connection&);
+
+    controller(const controller&) = delete;
+
+    virtual ~controller() {}
+
+    /// Start listening on address.
+    ///
+    /// @param address identifies a listening address.
+    ///
+    /// @param make_handler returns a handler for each accepted connection.
+    /// handler::on_connection_open() is called with the incoming
+    /// proton::connection.  The handler can accept by calling
+    /// connection::open()) or reject by calling connection::close()).
+    ///
+    /// Calls to the factory for this address are serialized. Calls for separate
+    /// addresses in separate calls to listen() may be concurrent.
+    ///
+    virtual void listen(
+        const std::string& address,
+        std::function<proton::handler*(const std::string&)> make_handler,
+        const connection_options& = connection_options()) = 0;
+
+    /// Stop listening on address, must match exactly with address string given to listen().
+    virtual void stop_listening(const std::string& address) = 0;
+
+    /// Connect to address.
+    ///
+    /// The handler will get a proton::handler::on_connection_open() call the
+    /// connection is completed by the other end.
+    virtual void connect(
+        const std::string& address, proton::handler&,
+        const connection_options& = connection_options()) = 0;
+
+    /// Default options for all connections, e.g. container_id.
+    virtual void options(const connection_options&) = 0;
+
+    /// Default options for all connections, e.g. container_id.
+    virtual connection_options options() = 0;
+
+    /// Run the controller in this thread. Returns when the controller is
+    /// stopped.  Multiple threads can call run()
+    virtual void run() = 0;
+
+    /// Stop the controller: abort open connections, run() will return in all threads.
+    /// Handlers will receive on_transport_error() with the error_condition.
+    virtual void stop(const error_condition& = error_condition()) = 0;
+
+    /// The controller will stop (run() will return) when there are no open
+    /// connections or listeners left.
+    virtual void stop_on_idle() = 0;
+
+    /// Wait till the controller is stopped.
+    virtual void wait() = 0;
+
+  protected:
+    controller() {}
+};
+
+}
+
+
+#endif // PROTON_MT_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/error.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/error.hpp b/proton-c/bindings/cpp/include/proton/error.hpp
index dd30867..1b77b3f 100644
--- a/proton-c/bindings/cpp/include/proton/error.hpp
+++ b/proton-c/bindings/cpp/include/proton/error.hpp
@@ -22,14 +22,17 @@
  *
  */
 
-#include "proton/config.hpp"
-#include "proton/export.hpp"
+#include <proton/config.hpp>
+#include <proton/export.hpp>
+#include <proton/value.hpp>
 
 #include <stdexcept>
 #include <string>
 
 namespace proton {
 
+class value;
+
 /// The base proton error.
 ///
 /// All exceptions thrown from functions in the proton namespace are

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index ef87ff3..4526036 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -50,6 +50,18 @@ class connection_engine;
 ///
 /// Subclass and override event-handling member functions.
 ///
+/// Close and error handling: there are several objects that have on_X_close and on_X_error functions.
+/// They are called as follows:
+///
+/// - If X is closed cleanly, with no error status then on_X_close() is called.
+/// - If X is closed with an error then on_X_error() is called followed by on_X_close()
+///   Note the error condition is also available in on_X_close from X::condition().
+///
+/// By default, if you do not implement on_X_error, it will call
+/// on_unhandled_error().  If you do not implement on_unhandled_error() it will
+/// throw a proton::error exception, which may not be what you want but does
+/// help to identify forgotten error handling quickly.
+///
 /// @see proton::event
 class
 PN_CPP_CLASS_EXTERN handler

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
index acf47fa..ded68de 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -20,6 +20,7 @@
  * under the License.
  */
 
+#include "proton/config.hpp"
 #include "proton/connection.hpp"
 #include "proton/connection_options.hpp"
 #include "proton/error.hpp"
@@ -38,27 +39,44 @@ struct pn_collector_t;
 namespace proton {
 
 class handler;
+class work_queue;            // Only used for multi-threaded connection_engines.
 
-/// Contains classes to integrate proton into different IO and threading environments.
-namespace io {
+/** @page integration
 
-///@cond INTERNAL
-class connection_engine_context;
-///
+This namespace contains a low-level "Service Provider Interface" that can be
+used to implement the proton API over any native or 3rd party IO library.
+
+The io::connection_engine is the core engine that converts raw AMQP bytes read
+from any IO source into proton::handler event calls, and generates AMQP
+byte-encoded output that can be written to any IO destination.
+
+The integration needs to implement two user-visible interfaces:
+ - proton::controller lets the user initiate or listen for connections.
+ - proton::work_queue lets the user serialize their own work with a connection.
+
+ @see epoll_controller.cpp for an example of an integration.
+
+[TODO controller doesn't belong in the mt namespace, a single-threaded
+integration would need a controller too.]
+
+*/
+namespace io {
 
 /// Pointer to a mutable memory region with a size.
 struct mutable_buffer {
-    char* data;
-    size_t size;
+    char* data;                 ///< Beginning of the buffered data.
+    size_t size;                ///< Number of bytes in the buffer.
 
+    /// Construct a buffer starting at data_ with size_ bytes.
     mutable_buffer(char* data_=0, size_t size_=0) : data(data_), size(size_) {}
 };
 
 /// Pointer to a const memory region with a size.
 struct const_buffer {
-    const char* data;
-    size_t size;
+    const char* data;           ///< Beginning of the buffered data.
+    size_t size;                ///< Number of bytes in the buffer.
 
+    /// Construct a buffer starting at data_ with size_ bytes.
     const_buffer(const char* data_=0, size_t size_=0) : data(data_), size(size_) {}
 };
 
@@ -88,39 +106,16 @@ struct const_buffer {
 /// epoll) or an async-request driven proactor (e.g. windows completion ports,
 /// boost.asio, libuv etc.)
 ///
+/// The engine never throws exceptions.
+///
 class
 PN_CPP_CLASS_EXTERN connection_engine {
   public:
-    // TODO aconway 2016-03-18: this will change
-    class container {
-      public:
-        /// Create a container with id.  Default to random UUID.
-        PN_CPP_EXTERN container(const std::string &id = "");
-        PN_CPP_EXTERN ~container();
-
-        /// Return the container-id
-        PN_CPP_EXTERN std::string id() const;
-
-        /// Make options to configure a new engine, using the default options.
-        ///
-        /// Call this once for each new engine as the options include a generated unique link_prefix.
-        /// You can modify the configuration before creating the engine but you should not
-        /// modify the container_id or link_prefix.
-        PN_CPP_EXTERN connection_options make_options();
-
-        /// Set the default options to be used for connection engines.
-        /// The container will set the container_id and link_prefix when make_options is called.
-        PN_CPP_EXTERN void options(const connection_options&);
-
-      private:
-        class impl;
-        internal::pn_unique_ptr<impl> impl_;
-    };
-
     /// Create a connection engine that dispatches to handler.
+    // TODO aconway 2016-04-06: no options, only via handler.
     PN_CPP_EXTERN connection_engine(handler&, const connection_options& = connection_options());
 
-    PN_CPP_EXTERN virtual ~connection_engine();
+    PN_CPP_EXTERN ~connection_engine();
 
     /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
     /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
@@ -132,6 +127,7 @@ PN_CPP_CLASS_EXTERN connection_engine {
     PN_CPP_EXTERN void read_done(size_t n);
 
     /// Indicate that the read side of the transport is closed and no more data will be read.
+    /// Note that there may still be events to dispatch() or data to write.
     PN_CPP_EXTERN void read_close();
 
     /// The engine's write buffer. Write data from this buffer then call write_done()
@@ -141,9 +137,11 @@ PN_CPP_CLASS_EXTERN connection_engine {
 
     /// Indicate that the first n bytes of write_buffer() have been written successfully.
     /// This changes the buffer, call write_buffer() to get the updated buffer.
+
     PN_CPP_EXTERN void write_done(size_t n);
 
-    /// Indicate that the write side of the transport has closed and no more data will be written.
+    /// Indicate that the write side of the transport has closed and no more data can be written.
+    /// Note that there may still be events to dispatch() or data to read.
     PN_CPP_EXTERN void write_close();
 
     /// Close the engine with an error that will be passed to handler::on_transport_error().
@@ -154,15 +152,14 @@ PN_CPP_CLASS_EXTERN connection_engine {
     /// Dispatch all available events and call the corresponding \ref handler methods.
     ///
     /// Returns true if the engine is still active, false if it is finished and
-    /// can be destroyed. The engine is finished when either of the following is
-    /// true:
+    /// can be destroyed. The engine is finished when all events are dispatched
+    /// and one of the following is true:
     ///
     /// - both read_close() and write_close() have been called, no more IO is possible.
-    /// - The AMQP connection() is closed AND write_buffer() is empty.
+    /// - The AMQP connection() is closed AND the write_buffer() is empty.
     ///
-    /// May expand the read_buffer() and/or the write_buffer().
+    /// May modify the read_buffer() and/or the write_buffer().
     ///
-    /// @throw any exceptions thrown by the \ref handler.
     PN_CPP_EXTERN bool dispatch();
 
     /// Get the AMQP connection associated with this connection_engine.
@@ -171,6 +168,12 @@ PN_CPP_CLASS_EXTERN connection_engine {
     /// Get the transport associated with this connection_engine.
     PN_CPP_EXTERN proton::transport transport() const;
 
+    /// For controller connections, set the connection's work_queue. Set
+    /// via plain pointer, not std::shared_ptr so the connection_engine can be
+    /// compiled with C++03.  The work_queue must outlive the engine. The
+    /// std::shared_ptr<work_queue> will be available via work_queue::get(this->connection())
+    PN_CPP_EXTERN void work_queue(class work_queue*);
+
   private:
     connection_engine(const connection_engine&);
     connection_engine& operator=(const connection_engine&);
@@ -180,6 +183,7 @@ PN_CPP_CLASS_EXTERN connection_engine {
     proton::transport transport_;
     proton::internal::pn_ptr<pn_collector_t> collector_;
 };
+
 }}
 
 #endif // CONNECTION_ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/default_controller.hpp b/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
new file mode 100644
index 0000000..f876d5f
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
@@ -0,0 +1,47 @@
+#ifndef PROTON_IO_DRIVER_HPP
+#define PROTON_IO_DRIVER_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <functional>
+#include <memory>
+
+namespace proton {
+
+class controller;
+
+namespace io {
+
+/// A proton::controller implementation can create a static instance of default_controller
+/// to register as the default implementation.
+/// If more than one implementation is linked, which one becomes the default
+/// is undefined.
+struct default_controller {
+
+    /// A controller make-function takes a string container-id and returns a unique_ptr<controller>
+    typedef std::function<std::unique_ptr<controller>()> make_fn;
+
+    /// Construct a static instance of default_controller to register your controller factory.
+    PN_CPP_EXTERN default_controller(make_fn);
+};
+
+}}
+
+#endif // PROTON_IO_CONTROLLER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/io/socket.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/socket.hpp b/proton-c/bindings/cpp/include/proton/io/socket.hpp
deleted file mode 100644
index bcbcecf..0000000
--- a/proton-c/bindings/cpp/include/proton/io/socket.hpp
+++ /dev/null
@@ -1,130 +0,0 @@
-#ifndef PROTON_IO_IO_HPP
-#define PROTON_IO_IO_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/io/connection_engine.hpp>
-#include <proton/url.hpp>
-
-
-namespace proton {
-namespace io {
-namespace socket {
-
-struct
-PN_CPP_CLASS_EXTERN io_error : public proton::error {
-    PN_CPP_EXTERN explicit io_error(const std::string&); ///< Construct with message
-};
-
-/// @name Setup and teardown
-///
-/// Call initialize() before using any functions in the proton::io::socket
-/// namespace.  Call finalize() when you are done.
-///
-/// You can call initialize/finalize more than once as long as they are in
-/// matching pairs. Use \ref guard to call initialize/finalize around a scope.
-///
-/// Note that on POSIX systems these are no-ops, but they are required
-/// for Windows.
-///
-/// @{
-
-/// Initialize the proton::io subsystem.
-PN_CPP_EXTERN void initialize();
-
-/// Finalize the proton::io subsystem.
-PN_CPP_EXTERN void finalize(); // nothrow
-
-/// Use to call io::initialize and io::finalize around a scope.
-struct guard {
-    guard() { initialize(); }
-    ~guard() { finalize(); }
-};
-
-/// @}
-
-/// An IO resource.
-typedef int64_t descriptor;
-
-/// An invalid descriptor.
-PN_CPP_EXTERN extern const descriptor INVALID_DESCRIPTOR;
-
-/// Return a string describing the most recent IO error.
-PN_CPP_EXTERN std::string error_str();
-
-/// Open a TCP connection to the host:port (port can be a service name or number) from a proton::url.
-PN_CPP_EXTERN descriptor connect(const proton::url&);
-
-/// Listening socket.
-class listener {
-  public:
-    /// Listen on host/port. Empty host means listen on all interfaces.
-    /// port can be a service name or number
-    PN_CPP_EXTERN listener(const std::string& host, const std::string& port);
-    PN_CPP_EXTERN ~listener();
-
-    /// Accept a connection. Return the descriptor, set host, port to the remote address.
-    /// port can be a service name or number.
-    PN_CPP_EXTERN descriptor accept(std::string& host, std::string& port);
-
-    /// Accept a connection, does not provide address info.
-    descriptor accept() { std::string dummy; return accept(dummy, dummy); }
-
-    /// Convert to descriptor
-    descriptor socket() const { return socket_; }
-
-  private:
-    guard guard_;
-    listener(const listener&);
-    listener& operator=(const listener&);
-    descriptor socket_;
-};
-
-/// A \ref connection_engine with non-blocking socket IO.
-class engine : public connection_engine {
-  public:
-    /// Wrap an open socket. Does not automatically open the connection.
-    PN_CPP_EXTERN engine(descriptor socket_, handler&, const connection_options& = connection_options());
-
-    /// Create socket engine connected to url, open the connection as a client.
-    PN_CPP_EXTERN engine(const url&, handler&, const connection_options& = connection_options());
-
-    PN_CPP_EXTERN ~engine();
-
-    /// Run the engine until it closes
-    PN_CPP_EXTERN void run();
-
-    /// Non-blocking read from socket to engine
-    PN_CPP_EXTERN void read();
-
-    /// Non-blocking write from engine to socket
-    PN_CPP_EXTERN void write();
-
-    descriptor socket() const { return socket_; }
-
-  private:
-    void init();
-    guard guard_;
-    descriptor socket_;
-};
-
-}}}
-
-#endif  /*!PROTON_IO_IO_HPP*/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message