qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [3/4] qpid-proton git commit: PROTON-1184: C++ merge APIs for single and multi-threaded use.
Date Mon, 16 May 2016 14:43:32 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/mt/epoll_controller.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_controller.cpp b/examples/cpp/mt/epoll_controller.cpp
deleted file mode 100644
index 80aba0c..0000000
--- a/examples/cpp/mt/epoll_controller.cpp
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/controller.hpp>
-#include <proton/work_queue.hpp>
-#include <proton/url.hpp>
-
-#include <proton/io/connection_engine.hpp>
-#include <proton/io/default_controller.hpp>
-
-#include <atomic>
-#include <memory>
-#include <mutex>
-#include <condition_variable>
-#include <thread>
-#include <set>
-#include <sstream>
-#include <system_error>
-
-// Linux native IO
-#include <assert.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <sys/epoll.h>
-#include <sys/eventfd.h>
-#include <unistd.h>
-
-// This is the only public function, the implementation is private.
-std::unique_ptr<proton::controller> make_epoll_controller();
-
-// Private implementation
-namespace  {
-
-
-using lock_guard = std::lock_guard<std::mutex>;
-
-// Get string from errno
-std::string errno_str(const std::string& msg) {
-    return std::system_error(errno, std::system_category(), msg).what();
-}
-
-// Throw proton::error(errno_str(msg)) if result < 0
-int check(int result, const std::string& msg) {
-    if (result < 0)
-        throw proton::error(errno_str(msg));
-    return result;
-}
-
-// Wrapper for getaddrinfo() that cleans up in destructor.
-class unique_addrinfo {
-  public:
-    unique_addrinfo(const std::string& addr) : addrinfo_(0) {
-        proton::url u(addr);
-        int result = ::getaddrinfo(char_p(u.host()), char_p(u.port()), 0, &addrinfo_);
-        if (result)
-            throw proton::error(std::string("bad address: ") + gai_strerror(result));
-    }
-    ~unique_addrinfo() { if (addrinfo_) ::freeaddrinfo(addrinfo_); }
-
-    ::addrinfo* operator->() const { return addrinfo_; }
-
-  private:
-    static const char* char_p(const std::string& s) { return s.empty() ? 0 : s.c_str(); }
-    ::addrinfo *addrinfo_;
-};
-
-// File descriptor wrapper that calls ::close in destructor.
-class unique_fd {
-  public:
-    unique_fd(int fd) : fd_(fd) {}
-    ~unique_fd() { if (fd_ >= 0) ::close(fd_); }
-    operator int() const { return fd_; }
-    int release() { int ret = fd_; fd_ = -1; return ret; }
-
-  protected:
-    int fd_;
-};
-
-class pollable;
-class pollable_engine;
-class pollable_listener;
-
-class epoll_controller : public proton::controller {
-  public:
-    epoll_controller();
-    ~epoll_controller();
-
-    // Implemenet the proton::controller interface
-    void connect(const std::string& addr,
-                 proton::handler& h,
-                 const proton::connection_options& opts) override;
-
-    void listen(const std::string& addr,
-                std::function<proton::handler*(const std::string&)> factory,
-                const proton::connection_options& opts) override;
-
-    void stop_listening(const std::string& addr) override;
-
-    void options(const proton::connection_options& opts) override;
-    proton::connection_options options() override;
-
-    void run() override;
-
-    void stop_on_idle() override;
-    void stop(const proton::error_condition& err) override;
-    void wait() override;
-
-    // Functions used internally.
-
-    void add_engine(proton::handler* h, proton::connection_options opts, int fd);
-    void erase(pollable*);
-
-  private:
-    void idle_check(const lock_guard&);
-    void interrupt();
-
-    const unique_fd epoll_fd_;
-    const unique_fd interrupt_fd_;
-
-    mutable std::mutex lock_;
-
-    proton::connection_options options_;
-    std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
-    std::map<pollable*, std::unique_ptr<pollable_engine> > engines_;
-
-    std::condition_variable stopped_;
-    bool stopping_;
-    proton::error_condition stop_err_;
-    std::atomic<size_t> threads_;
-};
-
-// Base class for pollable file-descriptors. Manages epoll interaction,
-// subclasses implement virtual work() to do their serialized work.
-class pollable {
-  public:
-    pollable(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(false), working_(false)
-    {
-        int flags = check(::fcntl(fd, F_GETFL, 0), "non-blocking");
-        check(::fcntl(fd, F_SETFL,  flags | O_NONBLOCK), "non-blocking");
-        ::epoll_event ev = {};
-        ev.data.ptr = this;
-        ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev);
-    }
-
-    virtual ~pollable() {
-        ::epoll_event ev = {};
-        ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); // Ignore errors.
-    }
-
-    bool do_work(uint32_t events) {
-        {
-            lock_guard g(lock_);
-            if (working_)
-                return true;         // Another thread is already working.
-            working_ = true;
-            notified_ = false;
-        }
-        uint32_t new_events = work(events);  // Serialized, outside the lock.
-        if (new_events) {
-            lock_guard g(lock_);
-            rearm(notified_ ?  EPOLLIN|EPOLLOUT : new_events);
-        }
-        return new_events;
-    }
-
-    // Called by work_queue to notify that there are jobs.
-    void notify() {
-        lock_guard g(lock_);
-        if (!notified_) {
-            notified_ = true;
-            if (!working_) // No worker thread, rearm now.
-                rearm(EPOLLIN|EPOLLOUT);
-        }
-    }
-
-  protected:
-
-    // Subclass implements  work.
-    // Returns epoll events to re-enable or 0 if finished.
-    virtual uint32_t work(uint32_t events) = 0;
-
-    const unique_fd fd_;
-    const int epoll_fd_;
-
-  private:
-
-    void rearm(uint32_t events) {
-        epoll_event ev;
-        ev.data.ptr = this;
-        ev.events = EPOLLONESHOT | events;
-        check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), "re-arm epoll");
-        working_ = false;
-    }
-
-    std::mutex lock_;
-    bool notified_;
-    bool working_;
-};
-
-class work_queue : public proton::work_queue {
-  public:
-    typedef std::vector<std::function<void()> > jobs;
-
-    work_queue(pollable& p, proton::controller& c) :
-        pollable_(p), closed_(false), controller_(c) {}
-
-    bool push(std::function<void()> f) override {
-        // Note this is an unbounded work queue.
-        // A resource-safe implementation should be bounded.
-        lock_guard g(lock_);
-        if (closed_)
-            return false;
-        jobs_.push_back(f);
-        pollable_.notify();
-        return true;
-    }
-
-    jobs pop_all() {
-        lock_guard g(lock_);
-        return std::move(jobs_);
-    }
-
-    void close() {
-        lock_guard g(lock_);
-        closed_ = true;
-    }
-
-    proton::controller& controller() const override { return controller_; }
-
-  private:
-    std::mutex lock_;
-    pollable& pollable_;
-    jobs jobs_;
-    bool closed_;
-    proton::controller& controller_;
-};
-
-// Handle epoll wakeups for a connection_engine.
-class pollable_engine : public pollable {
-  public:
-
-    pollable_engine(
-        proton::handler* h, proton::connection_options opts, epoll_controller& c,
-        int fd, int epoll_fd
-    ) : pollable(fd, epoll_fd),
-        engine_(*h, opts),
-        queue_(new work_queue(*this, c))
-    {
-        engine_.work_queue(queue_.get());
-    }
-
-    ~pollable_engine() {
-        queue_->close();               // No calls to notify() after this.
-        engine_.dispatch();            // Run any final events.
-        try { write(); } catch(...) {} // Write connection close if we can.
-        for (auto f : queue_->pop_all()) {// Run final queued work for side-effects.
-            try { f(); } catch(...) {}
-        }
-    }
-
-    uint32_t work(uint32_t events) {
-        try {
-            bool can_read = events & EPOLLIN, can_write = events && EPOLLOUT;
-            do {
-                can_write = can_write && write();
-                can_read = can_read && read();
-                for (auto f : queue_->pop_all()) // Run queued work
-                    f();
-                engine_.dispatch();
-            } while (can_read || can_write);
-            return (engine_.read_buffer().size ? EPOLLIN:0) |
-                (engine_.write_buffer().size ? EPOLLOUT:0);
-        } catch (const std::exception& e) {
-            close(proton::error_condition("exception", e.what()));
-        }
-        return 0;               // Ending
-    }
-
-    void close(const proton::error_condition& err) {
-        engine_.connection().close(err);
-    }
-
-  private:
-
-    bool write() {
-        if (engine_.write_buffer().size) {
-            ssize_t n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
-            while (n == EINTR)
-                n = ::write(fd_, engine_.write_buffer().data, engine_.write_buffer().size);
-            if (n > 0) {
-                engine_.write_done(n);
-                return true;
-            } else if (errno != EAGAIN && errno != EWOULDBLOCK)
-                check(n, "write");
-        }
-        return false;
-    }
-
-    bool read() {
-        if (engine_.read_buffer().size) {
-            ssize_t n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
-            while (n == EINTR)
-                n = ::read(fd_, engine_.read_buffer().data, engine_.read_buffer().size);
-            if (n > 0) {
-                engine_.read_done(n);
-                return true;
-            }
-            else if (n == 0)
-                engine_.read_close();
-            else if (errno != EAGAIN && errno != EWOULDBLOCK)
-                check(n, "read");
-        }
-        return false;
-    }
-
-    proton::io::connection_engine engine_;
-    std::shared_ptr<work_queue> queue_;
-};
-
-// A pollable listener fd that creates pollable_engine for incoming connections.
-class pollable_listener : public pollable {
-  public:
-    pollable_listener(
-        const std::string& addr,
-        std::function<proton::handler*(const std::string&)> factory,
-        int epoll_fd,
-        epoll_controller& c,
-        const proton::connection_options& opts
-    ) :
-        pollable(listen(addr), epoll_fd),
-        addr_(addr),
-        factory_(factory),
-        controller_(c),
-        opts_(opts)
-    {}
-
-    uint32_t work(uint32_t events) {
-        if (events & EPOLLRDHUP)
-            return 0;
-        int accepted = check(::accept(fd_, NULL, 0), "accept");
-        controller_.add_engine(factory_(addr_), opts_, accepted);
-        return EPOLLIN;
-    }
-
-    std::string addr() { return addr_; }
-
-  private:
-
-    int listen(const std::string& addr) {
-        std::string msg = "listen on "+addr;
-        unique_addrinfo ainfo(addr);
-        unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
-        int yes = 1;
-        check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), msg);
-        check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
-        check(::listen(fd, 32), msg);
-        return fd.release();
-    }
-
-    std::string addr_;
-    std::function<proton::handler*(const std::string&)> factory_;
-    epoll_controller& controller_;
-    proton::connection_options opts_;
-};
-
-
-epoll_controller::epoll_controller()
-    : epoll_fd_(check(epoll_create(1), "epoll_create")),
-      interrupt_fd_(check(eventfd(1, 0), "eventfd")),
-      stopping_(false), threads_(0)
-{}
-
-epoll_controller::~epoll_controller() {
-    try {
-        stop(proton::error_condition("exception", "controller shut-down"));
-        wait();
-    } catch (...) {}
-}
-
-void epoll_controller::add_engine(proton::handler* h, proton::connection_options opts, int fd) {
-    lock_guard g(lock_);
-    if (stopping_)
-        throw proton::error("controller is stopping");
-    std::unique_ptr<pollable_engine> e(new pollable_engine(h, opts, *this, fd, epoll_fd_));
-    e->notify();
-    engines_[e.get()] = std::move(e);
-}
-
-void epoll_controller::erase(pollable* e) {
-    lock_guard g(lock_);
-    if (!engines_.erase(e)) {
-        pollable_listener* l = dynamic_cast<pollable_listener*>(e);
-        if (l)
-            listeners_.erase(l->addr());
-    }
-    idle_check(g);
-}
-
-void epoll_controller::idle_check(const lock_guard&) {
-    if (stopping_  && engines_.empty() && listeners_.empty())
-        interrupt();
-}
-
-void epoll_controller::connect(const std::string& addr,
-                               proton::handler& h,
-                               const proton::connection_options& opts)
-{
-    std::string msg = "connect to "+addr;
-    unique_addrinfo ainfo(addr);
-    unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
-    check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
-    add_engine(&h, options().update(opts), fd.release());
-}
-
-void epoll_controller::listen(const std::string& addr,
-                              std::function<proton::handler*(const std::string&)> factory,
-                              const proton::connection_options& opts)
-{
-    lock_guard g(lock_);
-    if (!factory)
-        throw proton::error("null function to listen on "+addr);
-    if (stopping_)
-        throw proton::error("controller is stopping");
-    auto& l = listeners_[addr];
-    l.reset(new pollable_listener(addr, factory, epoll_fd_, *this, options_.update(opts)));
-    l->notify();
-}
-
-void epoll_controller::stop_listening(const std::string& addr) {
-    lock_guard g(lock_);
-    listeners_.erase(addr);
-    idle_check(g);
-}
-
-void epoll_controller::options(const proton::connection_options& opts) {
-    lock_guard g(lock_);
-    options_ = opts;
-}
-
-proton::connection_options epoll_controller::options() {
-    lock_guard g(lock_);
-    return options_;
-}
-
-void epoll_controller::run() {
-    ++threads_;
-    try {
-        epoll_event e;
-        while(true) {
-            check(::epoll_wait(epoll_fd_, &e, 1, -1), "epoll_wait");
-            pollable* p = reinterpret_cast<pollable*>(e.data.ptr);
-            if (!p)
-                break;          // Interrupted
-            if (!p->do_work(e.events))
-                erase(p);
-        }
-    } catch (const std::exception& e) {
-        stop(proton::error_condition("exception", e.what()));
-    }
-    if (--threads_ == 0)
-        stopped_.notify_all();
-}
-
-void epoll_controller::stop_on_idle() {
-    lock_guard g(lock_);
-    stopping_ = true;
-    idle_check(g);
-}
-
-void epoll_controller::stop(const proton::error_condition& err) {
-    lock_guard g(lock_);
-    stop_err_ = err;
-    interrupt();
-}
-
-void epoll_controller::wait() {
-    std::unique_lock<std::mutex> l(lock_);
-    stopped_.wait(l, [this]() { return this->threads_ == 0; } );
-    for (auto& eng : engines_)
-        eng.second->close(stop_err_);
-    listeners_.clear();
-    engines_.clear();
-}
-
-void epoll_controller::interrupt() {
-    // Add an always-readable fd with 0 data and no ONESHOT to interrupt all threads.
-    epoll_event ev = {};
-    ev.events = EPOLLIN;
-    check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), "interrupt");
-}
-
-// Register make_epoll_controller() as proton::controller::create().
-proton::io::default_controller instance(make_epoll_controller);
-
-}
-
-// This is the only public function.
-std::unique_ptr<proton::controller> make_epoll_controller() {
-    return std::unique_ptr<proton::controller>(new epoll_controller());
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/mt/mt_container.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/mt_container.hpp b/examples/cpp/mt/mt_container.hpp
new file mode 100644
index 0000000..164fe72
--- /dev/null
+++ b/examples/cpp/mt/mt_container.hpp
@@ -0,0 +1,29 @@
+#ifndef MT_MT_CONTROLLER_HPP
+#define MT_MT_CONTROLLER_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/default_container.hpp>
+#include <memory>
+
+// Defined in whichever MT container implementation we are linked with.
+std::unique_ptr<proton::container> make_mt_container(const std::string& id);
+
+#endif // MT_MT_DEFAULT_CONTAINER.HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/queue_browser.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp
index e4fc29e..9f52a9d 100644
--- a/examples/cpp/queue_browser.cpp
+++ b/examples/cpp/queue_browser.cpp
@@ -20,7 +20,7 @@
  */
 
 #include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/delivery.hpp"
 #include "proton/handler.hpp"
 #include "proton/receiver_options.hpp"
@@ -29,7 +29,7 @@
 
 #include <iostream>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 using proton::source_options;
 
@@ -40,13 +40,13 @@ class browser : public proton::handler {
   public:
     browser(const std::string& u) : url(u) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         proton::connection conn = c.connect(url);
         source_options browsing = source_options().distribution_mode(proton::source::COPY);
         conn.open_receiver(url.path(), proton::receiver_options().source(browsing));
     }
 
-    void on_message(proton::delivery &, proton::message &m) override {
+    void on_message(proton::delivery &, proton::message &m) PN_CPP_OVERRIDE {
         std::cout << m.body() << std::endl;
     }
 };
@@ -56,7 +56,7 @@ int main(int argc, char **argv) {
         std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples";
 
         browser b(url);
-        proton::container(b).run();
+        proton::default_container(b).run();
 
         return 0;
     } catch (const std::exception& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/selected_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/selected_recv.cpp b/examples/cpp/selected_recv.cpp
index 5bb65ff..3c57905 100644
--- a/examples/cpp/selected_recv.cpp
+++ b/examples/cpp/selected_recv.cpp
@@ -20,7 +20,7 @@
  */
 
 #include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/handler.hpp"
 #include "proton/receiver_options.hpp"
 #include "proton/source_options.hpp"
@@ -28,7 +28,7 @@
 
 #include <iostream>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 namespace {
 
@@ -60,14 +60,14 @@ class selected_recv : public proton::handler {
   public:
     selected_recv(const std::string& u) : url(u) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         proton::source_options opts;
         set_filter(opts, "colour = 'green'");
         proton::connection conn = c.connect(url);
         conn.open_receiver(url.path(), proton::receiver_options().source(opts));
     }
 
-    void on_message(proton::delivery &, proton::message &m) override {
+    void on_message(proton::delivery &, proton::message &m) PN_CPP_OVERRIDE {
         std::cout << m.body() << std::endl;
     }
 };
@@ -77,7 +77,7 @@ int main(int argc, char **argv) {
         std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples";
 
         selected_recv recv(url);
-        proton::container(recv).run();
+        proton::default_container(recv).run();
 
         return 0;
     } catch (const std::exception& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp
index 6f2f150..332fb1b 100644
--- a/examples/cpp/server.cpp
+++ b/examples/cpp/server.cpp
@@ -22,7 +22,7 @@
 #include "options.hpp"
 
 #include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/handler.hpp"
 #include "proton/tracker.hpp"
 #include "proton/url.hpp"
@@ -32,7 +32,7 @@
 #include <string>
 #include <cctype>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 class server : public proton::handler {
   private:
@@ -44,7 +44,7 @@ class server : public proton::handler {
   public:
     server(const std::string &u) : url(u) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         connection = c.connect(url);
         connection.open_receiver(url.path());
 
@@ -59,7 +59,7 @@ class server : public proton::handler {
         return uc;
     }
 
-    void on_message(proton::delivery &, proton::message &m) override {
+    void on_message(proton::delivery &, proton::message &m) PN_CPP_OVERRIDE {
         std::cout << "Received " << m.body() << std::endl;
 
         std::string reply_to = m.reply_to();
@@ -87,7 +87,7 @@ int main(int argc, char **argv) {
         opts.parse();
 
         server srv(address);
-        proton::container(srv).run();
+        proton::default_container(srv).run();
 
         return 0;
     } catch (const example::bad_option& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/server_direct.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server_direct.cpp b/examples/cpp/server_direct.cpp
index fc7aa67..74dcfd9 100644
--- a/examples/cpp/server_direct.cpp
+++ b/examples/cpp/server_direct.cpp
@@ -21,8 +21,7 @@
 
 #include "options.hpp"
 
-#include "proton/acceptor.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/handler.hpp"
 #include "proton/sender.hpp"
 #include "proton/source_options.hpp"
@@ -34,7 +33,7 @@
 #include <sstream>
 #include <cctype>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 class server : public proton::handler {
   private:
@@ -46,7 +45,7 @@ class server : public proton::handler {
   public:
     server(const std::string &u) : url(u), address_counter(0) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         c.listen(url);
         std::cout << "server listening on " << url << std::endl;
     }
@@ -68,7 +67,7 @@ class server : public proton::handler {
         return addr.str();
     }
 
-    void on_sender_open(proton::sender &sender) override {
+    void on_sender_open(proton::sender &sender) PN_CPP_OVERRIDE {
         if (sender.source().dynamic()) {
             std::string addr = generate_address();
             sender.open(proton::sender_options().source(proton::source_options().address(addr)));
@@ -76,7 +75,7 @@ class server : public proton::handler {
         }
     }
 
-    void on_message(proton::delivery &, proton::message &m) override {
+    void on_message(proton::delivery &, proton::message &m) PN_CPP_OVERRIDE {
         std::cout << "Received " << m.body() << std::endl;
 
         std::string reply_to = m.reply_to();
@@ -107,7 +106,7 @@ int main(int argc, char **argv) {
         opts.parse();
 
         server srv(address);
-        proton::container(srv).run();
+        proton::default_container(srv).run();
 
         return 0;
     } catch (const example::bad_option& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp
index 3eadf32..b1578ec 100644
--- a/examples/cpp/simple_recv.cpp
+++ b/examples/cpp/simple_recv.cpp
@@ -22,7 +22,7 @@
 #include "options.hpp"
 
 #include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/delivery.hpp"
 #include "proton/handler.hpp"
 #include "proton/link.hpp"
@@ -32,7 +32,7 @@
 #include <iostream>
 #include <map>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 class simple_recv : public proton::handler {
   private:
@@ -44,12 +44,12 @@ class simple_recv : public proton::handler {
   public:
     simple_recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         receiver = c.open_receiver(url);
         std::cout << "simple_recv listening on " << url << std::endl;
     }
 
-    void on_message(proton::delivery &d, proton::message &msg) override {
+    void on_message(proton::delivery &d, proton::message &msg) PN_CPP_OVERRIDE {
         if (msg.id().get<uint64_t>() < received) {
             return; // Ignore duplicate
         }
@@ -79,7 +79,7 @@ int main(int argc, char **argv) {
         opts.parse();
 
         simple_recv recv(address, message_count);
-        proton::container(recv).run();
+        proton::default_container(recv).run();
 
         return 0;
     } catch (const example::bad_option& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/simple_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_send.cpp b/examples/cpp/simple_send.cpp
index 594b7d6..921bb95 100644
--- a/examples/cpp/simple_send.cpp
+++ b/examples/cpp/simple_send.cpp
@@ -22,7 +22,7 @@
 #include "options.hpp"
 
 #include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/handler.hpp"
 #include "proton/tracker.hpp"
 #include "proton/value.hpp"
@@ -30,7 +30,7 @@
 #include <iostream>
 #include <map>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 class simple_send : public proton::handler {
   private:
@@ -43,11 +43,11 @@ class simple_send : public proton::handler {
   public:
     simple_send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         sender = c.open_sender(url);
     }
 
-    void on_sendable(proton::sender &s) override {
+    void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
         while (s.credit() && sent < total) {
             proton::message msg;
             std::map<std::string, int> m;
@@ -61,7 +61,7 @@ class simple_send : public proton::handler {
         }
     }
 
-    void on_tracker_accept(proton::tracker &t) override {
+    void on_tracker_accept(proton::tracker &t) PN_CPP_OVERRIDE {
         confirmed++;
 
         if (confirmed == total) {
@@ -70,7 +70,7 @@ class simple_send : public proton::handler {
         }
     }
 
-    void on_transport_close(proton::transport &) override {
+    void on_transport_close(proton::transport &) PN_CPP_OVERRIDE {
         sent = confirmed;
     }
 };
@@ -87,7 +87,7 @@ int main(int argc, char **argv) {
         opts.parse();
 
         simple_send send(address, message_count);
-        proton::container(send).run();
+        proton::default_container(send).run();
 
         return 0;
     } catch (const example::bad_option& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/ssl.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl.cpp b/examples/cpp/ssl.cpp
index f4e724d..379c95b 100644
--- a/examples/cpp/ssl.cpp
+++ b/examples/cpp/ssl.cpp
@@ -19,10 +19,9 @@
  *
  */
 
-#include "proton/acceptor.hpp"
 #include "proton/connection_options.hpp"
 #include "proton/connection.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/handler.hpp"
 #include "proton/ssl.hpp"
 #include "proton/tracker.hpp"
@@ -30,7 +29,7 @@
 
 #include <iostream>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 using proton::connection_options;
 using proton::ssl_client_options;
@@ -46,15 +45,15 @@ std::string find_CN(const std::string &);
 
 
 struct server_handler : public proton::handler {
-    proton::acceptor acceptor;
+    std::string url;
 
-    void on_connection_open(proton::connection &c) override {
+    void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
         std::cout << "Inbound server connection connected via SSL.  Protocol: " <<
             c.transport().ssl().protocol() << std::endl;
-        acceptor.close();
+        c.container().stop_listening(url);
     }
 
-    void on_message(proton::delivery &, proton::message &m) override {
+    void on_message(proton::delivery &, proton::message &m) PN_CPP_OVERRIDE {
         std::cout << m.body() << std::endl;
     }
 };
@@ -68,12 +67,12 @@ class hello_world_direct : public proton::handler {
   public:
     hello_world_direct(const std::string& u) : url(u) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         // Configure listener.  Details vary by platform.
         ssl_certificate server_cert = platform_certificate("tserver", "tserverpw");
         ssl_server_options ssl_srv(server_cert);
         connection_options server_opts;
-        server_opts.ssl_server_options(ssl_srv).handler(&s_handler);
+        server_opts.ssl_server_options(ssl_srv).handler(s_handler);
         c.server_connection_options(server_opts);
 
         // Configure client with a Certificate Authority database populated with the server's self signed certificate.
@@ -84,24 +83,25 @@ class hello_world_direct : public proton::handler {
         client_opts.ssl_client_options(ssl_cli);
         c.client_connection_options(client_opts);
 
-        s_handler.acceptor = c.listen(url);
+        s_handler.url = url;
+        c.listen(url);
         c.open_sender(url);
     }
 
-    void on_connection_open(proton::connection &c) override {
+    void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
         std::string subject = c.transport().ssl().remote_subject();
         std::cout << "Outgoing client connection connected via SSL.  Server certificate identity " <<
             find_CN(subject) << std::endl;
     }
 
-    void on_sendable(proton::sender &s) override {
+    void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
         proton::message m;
         m.body("Hello World!");
         s.send(m);
         s.close();
     }
 
-    void on_tracker_accept(proton::tracker &t) override {
+    void on_tracker_accept(proton::tracker &t) PN_CPP_OVERRIDE {
         // All done.
         t.connection().close();
     }
@@ -122,7 +122,7 @@ int main(int argc, char **argv) {
         else cert_directory = "ssl_certs/";
 
         hello_world_direct hwd(url);
-        proton::container(hwd).run();
+        proton::default_container(hwd).run();
         return 0;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/ssl_client_cert.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl_client_cert.cpp b/examples/cpp/ssl_client_cert.cpp
index 0708d2c..b16aad4 100644
--- a/examples/cpp/ssl_client_cert.cpp
+++ b/examples/cpp/ssl_client_cert.cpp
@@ -19,10 +19,9 @@
  *
  */
 
-#include "proton/acceptor.hpp"
 #include "proton/connection.hpp"
 #include "proton/connection_options.hpp"
-#include "proton/container.hpp"
+#include "proton/default_container.hpp"
 #include "proton/handler.hpp"
 #include "proton/sasl.hpp"
 #include "proton/ssl.hpp"
@@ -31,7 +30,7 @@
 
 #include <iostream>
 
-#include "fake_cpp11.hpp"
+#include <proton/config.hpp>
 
 using proton::connection_options;
 using proton::ssl_client_options;
@@ -48,9 +47,9 @@ static std::string find_CN(const std::string &);
 
 
 struct server_handler : public proton::handler {
-    proton::acceptor inbound_listener;
+    proton::listener listener;
 
-    void on_connection_open(proton::connection &c) override {
+    void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
         std::cout << "Inbound server connection connected via SSL.  Protocol: " <<
             c.transport().ssl().protocol() << std::endl;
         if (c.transport().sasl().outcome() == sasl::OK) {
@@ -61,10 +60,10 @@ struct server_handler : public proton::handler {
             std::cout << "Inbound client authentication failed" <<std::endl;
             c.close();
         }
-        inbound_listener.close();
+        listener.stop();
     }
 
-    void on_message(proton::delivery &, proton::message &m) override {
+    void on_message(proton::delivery &, proton::message &m) PN_CPP_OVERRIDE {
         std::cout << m.body() << std::endl;
     }
 };
@@ -78,14 +77,14 @@ class hello_world_direct : public proton::handler {
   public:
     hello_world_direct(const std::string& u) : url(u) {}
 
-    void on_container_start(proton::container &c) override {
+    void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
         // Configure listener.  Details vary by platform.
         ssl_certificate server_cert = platform_certificate("tserver", "tserverpw");
         std::string client_CA = platform_CA("tclient");
         // Specify an SSL domain with CA's for client certificate verification.
         ssl_server_options srv_ssl(server_cert, client_CA);
         connection_options server_opts;
-        server_opts.ssl_server_options(srv_ssl).handler(&s_handler);
+        server_opts.ssl_server_options(srv_ssl).handler(s_handler);
         server_opts.sasl_allowed_mechs("EXTERNAL");
         c.server_connection_options(server_opts);
 
@@ -99,24 +98,24 @@ class hello_world_direct : public proton::handler {
         client_opts.ssl_client_options(ssl_cli).sasl_allowed_mechs("EXTERNAL");
         c.client_connection_options(client_opts);
 
-        s_handler.inbound_listener = c.listen(url);
+        s_handler.listener = c.listen(url);
         c.open_sender(url);
     }
 
-    void on_connection_open(proton::connection &c) override {
+    void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
         std::string subject = c.transport().ssl().remote_subject();
         std::cout << "Outgoing client connection connected via SSL.  Server certificate identity " <<
             find_CN(subject) << std::endl;
     }
 
-    void on_sendable(proton::sender &s) override {
+    void on_sendable(proton::sender &s) PN_CPP_OVERRIDE {
         proton::message m;
         m.body("Hello World!");
         s.send(m);
         s.close();
     }
 
-    void on_tracker_accept(proton::tracker &t) override {
+    void on_tracker_accept(proton::tracker &t) PN_CPP_OVERRIDE {
         // All done.
         t.connection().close();
     }
@@ -137,7 +136,7 @@ int main(int argc, char **argv) {
         else cert_directory = "ssl_certs/";
 
         hello_world_direct hwd(url);
-        proton::container(hwd).run();
+        proton::default_container(hwd).run();
         return 0;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/examples/cpp/tutorial.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/tutorial.dox b/examples/cpp/tutorial.dox
index b8383f7..ab46a1d 100644
--- a/examples/cpp/tutorial.dox
+++ b/examples/cpp/tutorial.dox
@@ -268,7 +268,7 @@ implicitly a connection), we listen for incoming connections.
 \until }
 
 When we have received all the expected messages, we then stop listening for
-incoming connections by closing the acceptor object.
+incoming connections by calling `container::stop_listening()`
 
 \skip on_message
 \until }
@@ -292,8 +292,8 @@ link, we listen for incoming connections.
 \skip on_start
 \until }
 
-When we have received confirmation of all the messages we sent, we can
-close the acceptor in order to exit.
+When we have received confirmation of all the messages we sent, we call
+`container::stop_listening()` to exit.
 
 \skip on_delivery_accept
 \until }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 26b8dd4..e18d4b9 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -27,7 +27,6 @@ include_directories(
 add_definitions(${CXX_WARNING_FLAGS})
 
 set(qpid-proton-cpp-source
-  ${qpid-proton-mt-source}
   src/acceptor.cpp
   src/binary.cpp
   src/byte_array.cpp
@@ -46,10 +45,11 @@ set(qpid-proton-cpp-source
   src/endpoint.cpp
   src/error.cpp
   src/error_condition.cpp
+  src/event_loop.cpp
   src/handler.cpp
-  src/id_generator.cpp
   src/io/connection_engine.cpp
   src/link.cpp
+  src/listener.cpp
   src/message.cpp
   src/messaging_adapter.cpp
   src/node_options.cpp
@@ -75,20 +75,14 @@ set(qpid-proton-cpp-source
   src/terminus.cpp
   src/timestamp.cpp
   src/tracker.cpp
-  src/transport.cpp
   src/transfer.cpp
+  src/transport.cpp
   src/type_id.cpp
   src/url.cpp
   src/uuid.cpp
   src/value.cpp
   )
 
-if (HAS_CPP11)
-  message(STATUS "Enable C++11 extensions")
-  list(APPEND qpid-proton-cpp-source src/controller.cpp)
-endif()
-
-
 set_source_files_properties (
   ${qpid-proton-cpp-source}
   PROPERTIES
@@ -181,6 +175,7 @@ endmacro(add_cpp_test)
 
 add_cpp_test(codec_test)
 add_cpp_test(engine_test)
+add_cpp_test(thread_safe_test)
 add_cpp_test(interop_test ${CMAKE_SOURCE_DIR}/tests)
 add_cpp_test(message_test)
 add_cpp_test(scalar_test)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/docs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/CMakeLists.txt b/proton-c/bindings/cpp/docs/CMakeLists.txt
index c5ae4e5..d512d15 100644
--- a/proton-c/bindings/cpp/docs/CMakeLists.txt
+++ b/proton-c/bindings/cpp/docs/CMakeLists.txt
@@ -23,15 +23,19 @@ if (DOXYGEN_FOUND)
   configure_file (
     ${CMAKE_CURRENT_SOURCE_DIR}/user.doxygen.in
     ${CMAKE_CURRENT_BINARY_DIR}/user.doxygen)
+
+  file(GLOB_RECURSE headers ../include/proton/*.hpp)
   add_custom_target (docs-cpp
     COMMAND ${CMAKE_COMMAND} -E remove_directory html # get rid of old files
-    COMMAND ${DOXYGEN_EXECUTABLE} user.doxygen)
+    COMMAND ${DOXYGEN_EXECUTABLE} user.doxygen
+    DEPENDS ${headers}
+    )
   add_dependencies (docs docs-cpp)
   # HTML files are generated to ./html - put those in the install.
   install (DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/html/"
-           DESTINATION "${PROTON_SHARE}/docs/api-cpp"
-           COMPONENT documentation
-           OPTIONAL)
+    DESTINATION "${PROTON_SHARE}/docs/api-cpp"
+    COMPONENT documentation
+    OPTIONAL)
 endif (DOXYGEN_FOUND)
 
 set_directory_properties(PROPERTIES ADDITIONAL_MAKE_CLEAN_FILES html)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/docs/mainpage.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mainpage.md b/proton-c/bindings/cpp/docs/mainpage.md
index dffb3ed..2ba2eaf 100644
--- a/proton-c/bindings/cpp/docs/mainpage.md
+++ b/proton-c/bindings/cpp/docs/mainpage.md
@@ -106,27 +106,15 @@ can be re-used if you change your approach.
 
 ### %proton::container - easy single-threaded applications
 
-`proton::container` is an easy way to get started with single threaded
-applications. It manages the low-level socket connection and polling for you so
-you can write portable applications using just the @ref proton API. You an use
+`proton::container` is the top level object in a proton application.  Use
 proton::connection::connect() and proton::container::listen() to create
-connections. The container polls multiple connections and calls protocol eventsa
+connections. The container polls multiple connections and calls protocol events
 on your `proton::handler` sub-classes.
 
-The container has two limitations:
-- it is not thread-safe, all container work must happen in a single thread.
-- it provides portable IO handling for you but cannot be used for different types of IO.
+The default container implementation is created by `proton::new_default_container()`.
 
-### %proton::mt::controller - multi-threaded applications
-
-The proton::controller is similar to the proton::container but it can process
-connections in multiple threads concurrently. It uses the `proton::handler` like
-the container. If you follow the recommended model you can re-use statefull,
-per-connection, handlers with the controller. See @ref mt_page and the examples
-@ref mt/broker.cpp and @ref mt/epoll\_controller.cpp
-
-Default controller IO implementations are provided but you can also implement
-your own controller using the proton::io::connection_engine, read on...
+You can implement your own container to integrate proton with arbitrary your own
+container using the proton::io::connection_engine.
 
 ### %proton::io::connection_engine - integrating with foreign IO
 
@@ -146,5 +134,5 @@ framework or library, memory-based streams or any other source/sink for byte
 stream data.
 
 You can also use the engine to build a custom implementation of
-proton::mt::controller and proton::mt::work\_queue so portable proton
-applications using the controller can run without modification on your platform.
+proton::container and so portable proton applications can run without
+modification on your platform.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/docs/mt_page.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mt_page.md b/proton-c/bindings/cpp/docs/mt_page.md
index bed53e9..a1ac849 100644
--- a/proton-c/bindings/cpp/docs/mt_page.md
+++ b/proton-c/bindings/cpp/docs/mt_page.md
@@ -1,5 +1,7 @@
 # Multi-threaded proton {#mt_page}
 
+<!-- FIXME aconway 2016-05-04: doc -->
+
 Most classes in namespace @ref proton are not thread-safe. Objects associated
 with a single connection *must not* be used concurrently. However objects
 associated with *different* connections *can* be used concurrently in separate
@@ -8,7 +10,7 @@ threads.
 The recommended way to use proton multi-threaded is to *serialize* the work for
 each connection but allow different connections to be processed concurrently.
 
-proton::controller allows you to manage connections in a multi-threaded way. You
+proton::container allows you to manage connections in a multi-threaded way. You
 supply a proton::handler for each connection. Proton will ensure that the
 `proton::handler::on_*()` functions are never called concurrently so
 per-connection handlers do not need a lock even if they have state.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/acceptor.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/acceptor.hpp b/proton-c/bindings/cpp/include/proton/acceptor.hpp
deleted file mode 100644
index b0e11ed..0000000
--- a/proton-c/bindings/cpp/include/proton/acceptor.hpp
+++ /dev/null
@@ -1,61 +0,0 @@
-#ifndef PROTON_CPP_ACCEPTOR_H
-#define PROTON_CPP_ACCEPTOR_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <proton/reactor.h>
-#include <proton/export.hpp>
-#include <proton/object.hpp>
-
-struct pn_connection_t;
-
-namespace proton {
-
-/// A context for accepting inbound connections.
-///
-/// @see container::listen
-class acceptor : public internal::object<pn_acceptor_t> {
-    /// @cond INTERNAL
-    acceptor(pn_acceptor_t* a) : internal::object<pn_acceptor_t>(a) {}
-    /// @endcond
-
-  public:
-    acceptor() : internal::object<pn_acceptor_t>(0) {}
-
-    /// Close the acceptor.
-    PN_CPP_EXTERN void close();
-
-    /// Return the current set of connection options applied to
-    /// inbound connectons by the acceptor.
-    ///
-    /// Note that changes made to the connection options only affect
-    /// connections accepted after this call returns.
-    PN_CPP_EXTERN class connection_options &connection_options();
-
-    /// @cond INTERNAL
-    friend class internal::factory<acceptor>;
-    /// @endcond
-};
-
-}
-
-#endif // PROTON_CPP_ACCEPTOR_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/config.hpp b/proton-c/bindings/cpp/include/proton/config.hpp
index e6cb8ae..f6b10d6 100644
--- a/proton-c/bindings/cpp/include/proton/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/config.hpp
@@ -59,6 +59,12 @@
 #define PN_CPP_HAS_OVERRIDE PN_CPP_HAS_CPP11
 #endif
 
+#if PN_CPP_HAS_OVERRIDE
+#define PN_CPP_OVERRIDE override
+#else
+#define PN_CPP_OVERRIDE
+#endif
+
 #ifndef PN_CPP_HAS_EXPLICIT_CONVERSIONS
 #define PN_CPP_HAS_EXPLICIT_CONVERSIONS PN_CPP_HAS_CPP11
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index fd7bffa..1851625 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -27,6 +27,7 @@
 #include "proton/object.hpp"
 #include "proton/session.hpp"
 #include "proton/types.h"
+
 #include <string>
 
 struct pn_connection_t;
@@ -39,16 +40,14 @@ class sender;
 class sender_options;
 class receiver;
 class receiver_options;
-
-namespace io {
-class connection_engine;
-}
+class container;
+template <class T> class thread_safe;
 
 /// A connection to a remote AMQP peer.
 class
 PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, public endpoint {
     /// @cond INTERNAL
-    connection(pn_connection_t* c) : internal::object<pn_connection_t>(c) {}
+    PN_CPP_EXTERN connection(pn_connection_t* c) : internal::object<pn_connection_t>(c) {}
     /// @endcond
 
   public:
@@ -119,13 +118,13 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     PN_CPP_EXTERN uint16_t max_sessions() const;
     PN_CPP_EXTERN uint32_t idle_timeout() const;
 
-    /// @cond INTERNAL
   private:
     void user(const std::string &);
     void password(const std::string &);
 
     friend class internal::factory<connection>;
     friend class connector;
+  friend class proton::thread_safe<connection>;
     /// @endcond
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index c6a7c04..01826e7 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -70,6 +70,9 @@ class connection_options {
     /// Create an empty set of options.
     PN_CPP_EXTERN connection_options();
 
+    /// Shorthand for connection_options().handler(h)
+    PN_CPP_EXTERN connection_options(class handler& h);
+
     /// Copy options.
     PN_CPP_EXTERN connection_options(const connection_options&);
 
@@ -80,8 +83,10 @@ class connection_options {
 
     // XXX add C++11 move operations
 
-    /// Set a handler for the connection.
-    PN_CPP_EXTERN connection_options& handler(class handler *);
+    /// Set a connection handler.
+    ///
+    /// The handler must not be deleted until handler::on_transport_close() is called.
+    PN_CPP_EXTERN connection_options& handler(class handler&);
 
     /// Set the maximum frame size.
     PN_CPP_EXTERN connection_options& max_frame_size(uint32_t max);
@@ -136,9 +141,6 @@ class connection_options {
     /// Update option values from values set in other.
     PN_CPP_EXTERN connection_options& update(const connection_options& other);
 
-    /// Copy and update option values from values set in other.
-    PN_CPP_EXTERN connection_options update(const connection_options& other) const;
-
   private:
     void apply(connection&) const;
     proton_handler* handler() const;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 540f4cf..6d613e2 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -22,9 +22,13 @@
  *
  */
 
-#include "proton/duration.hpp"
-#include "proton/export.hpp"
-#include "proton/pn_unique_ptr.hpp"
+// FIXME aconway 2016-05-04: doc
+
+#include <proton/connection_options.hpp>
+#include <proton/error_condition.hpp>
+#include <proton/listener.hpp>
+#include <proton/pn_unique_ptr.hpp>
+#include <proton/thread_safe.hpp>
 
 #include <string>
 
@@ -32,131 +36,152 @@ namespace proton {
 
 class connection;
 class connection_options;
-class acceptor;
-class handler;
+class container_impl;
 class handler;
+class listen_handler;
 class receiver;
 class receiver_options;
 class sender;
 class sender_options;
 class task;
-class container_impl;
 
-namespace internal {
-class link;
-}
+class container;
 
 /// A top-level container of connections, sessions, senders and receivers.
 ///
 /// A container gives a unique identity to each communicating peer. It
 /// is often a process-level object.
-
+///
 /// It serves as an entry point to the API, allowing connections, senders
 /// and receivers to be established. It can be supplied with an event handler
 /// in order to intercept important messaging events, such as newly
 /// received messages or newly issued credit for sending
 /// messages.
-class container {
-  public:
-    /// Create a container.
+class PN_CPP_CLASS_EXTERN container {
+ public:
+    PN_CPP_EXTERN virtual ~container();
+
+    /// Connect to url, send an `open` request to the remote peer.
     ///
-    /// Container ID should be unique within your system. By default a
-    /// random ID is generated.
+    /// Options are applied to the connection as follows, values in later
+    /// options override earlier ones:
     ///
-    /// This container will not be very useful unless event handlers are supplied
-    /// as options when creating a connection/listener/sender or receiver.
-    PN_CPP_EXTERN container();
-    PN_CPP_EXTERN container(const std::string& id);
-
-    /// Create a container with an event handler.
+    ///  1. client_connection_options()
+    ///  2. options passed to connect()
+    ///
+    /// The handler in the composed options is used to call
+    /// proton::handler::on_connection_open() when the remote peer's open response
+    /// is received.
+    ///@{
+    virtual returned<connection> connect(const std::string& url, const connection_options &) = 0;
+    PN_CPP_EXTERN returned<connection> connect(const std::string& url);
+    ///@}
+
+    ///@cond INTERNAL
+    /// Stop listening on url, must match the url string given to listen().
+    /// You can also use the proton::listener object returned by listen()
+    virtual void stop_listening(const std::string& url) = 0;
+    ///@endcond
+
+    // FIXME aconway 2016-05-13: doc options
+
+    /// Start listening on url.
     ///
-    /// Container ID should be unique within your system. By default a
-    /// random ID is generated.
-    PN_CPP_EXTERN container(handler& mhandler);
-    PN_CPP_EXTERN container(handler& mhandler, const std::string& id);
+    /// Calls to the @ref listen_handler are serialized for this listener,
+    /// but handlers attached to separate listeners may be called concurrently.
+    ///
+    /// @param url identifies a listening url.
+    /// @param lh handles listening events
+    /// @return listener lets you stop listening
+    virtual listener listen(const std::string& url, listen_handler& lh) = 0;
+
+    /// Listen with a fixed set of options for all accepted connections.
+    /// See listen(const std::string&, listen_handler&)
+    PN_CPP_EXTERN virtual listener listen(const std::string& url, const connection_options&);
 
-    PN_CPP_EXTERN ~container();
+    /// Start listening on URL.
+    /// New connections will use the handler from server_connection_options()
+    PN_CPP_EXTERN virtual listener listen(const std::string& url);
 
-    /// Open a connection to `url`.
-    PN_CPP_EXTERN connection connect(const std::string& url);
-    PN_CPP_EXTERN connection connect(const std::string& url,
-                                     const connection_options &opts);
+    /// Run the container in this thread.
+    /// Returns when the container stops: see auto_stop() and stop().
+    ///
+    /// With a multithreaded container, call run() in multiple threads to create a thread pool.
+    virtual void run() = 0;
 
-    /// Listen on `url` for incoming connections.
-    PN_CPP_EXTERN acceptor listen(const std::string &url);
-    PN_CPP_EXTERN acceptor listen(const std::string &url,
-                                  const connection_options &opts);
+    /// If true, the container will stop (i.e. run() will return) when all
+    /// active connections and listeners are closed. If false the container
+    /// will keep running till stop() is called.
+    ///
+    /// auto_stop is set by default when a new container is created.
+    // FIXME aconway 2016-05-06: doc
+    virtual void auto_stop(bool) = 0;
 
-    /// Start processing events. It returns when all connections and
-    /// acceptors are closed.
-    PN_CPP_EXTERN void run();
+    ///@name Stop the container with an optional error_condition err.
+    /// - abort all open connections and listeners.
+    /// - process final handler events and injected functions
+    /// - if !err.empty(), handlers will receive on_transport_error(err)
+    /// - run() will return in all threads.
+    virtual void stop(const error_condition& err = error_condition()) = 0;
 
     /// Open a connection to `url` and open a sender for `url.path()`.
     /// Any supplied sender or connection options will override the
     /// container's template options.
-    PN_CPP_EXTERN sender open_sender(const std::string &url);
-    PN_CPP_EXTERN sender open_sender(const std::string &url,
-                                     const proton::sender_options &o);
-    PN_CPP_EXTERN sender open_sender(const std::string &url,
-                                     const proton::sender_options &o,
-                                     const connection_options &c);
+    /// @{
+    PN_CPP_EXTERN virtual returned<sender> open_sender(const std::string &url);
+    PN_CPP_EXTERN virtual returned<sender> open_sender(const std::string &url,
+                                                       const proton::sender_options &o);
+    virtual returned<sender> open_sender(const std::string &url,
+                                         const proton::sender_options &o,
+                                         const connection_options &c) = 0;
+    //@}
 
     /// Open a connection to `url` and open a receiver for
     /// `url.path()`.  Any supplied receiver or connection options will
     /// override the container's template options.
-    PN_CPP_EXTERN receiver open_receiver(const std::string&url);
-    PN_CPP_EXTERN receiver open_receiver(const std::string&url,
-                                         const proton::receiver_options &o);
-    PN_CPP_EXTERN receiver open_receiver(const std::string&url,
-                                         const proton::receiver_options &o,
-                                         const connection_options &c);
+    /// @{
+    PN_CPP_EXTERN virtual returned<receiver> open_receiver(const std::string&url);
+    PN_CPP_EXTERN virtual returned<receiver> open_receiver(const std::string&url,
+                                                           const proton::receiver_options &o);
+    virtual returned<receiver> open_receiver(const std::string&url,
+                                             const proton::receiver_options &o,
+                                             const connection_options &c) = 0;
+    ///@}
 
     /// A unique identifier for the container.
-    PN_CPP_EXTERN std::string id() const;
-
-    /// @cond INTERNAL
-    /// XXX settle some API questions
-    /// Schedule a timer task event in delay milliseconds.
-    PN_CPP_EXTERN task schedule(int delay);
-    PN_CPP_EXTERN task schedule(int delay, handler *h);
-
-    /// @endcond
-
-    /// Copy the connection options to a template which will be
-    /// applied to subsequent outgoing connections.  These are applied
-    /// first and overriden by additional connection options provided
-    /// in other methods.
-    PN_CPP_EXTERN void client_connection_options(const connection_options &);
-
-    /// Copy the connection options to a template which will be
-    /// applied to incoming connections.  These are applied before the
-    /// first open event on the connection.
-    PN_CPP_EXTERN void server_connection_options(const connection_options &);
-
-    /// Copy the sender options to a template applied to new senders
-    /// created and opened by this container.  They are applied before
-    /// the open event on the sender and may be overriden by sender
-    /// options in other methods.
-    PN_CPP_EXTERN void sender_options(const sender_options &);
-
-    /// Copy the receiver options to a template applied to new receivers
-    /// created and opened by this container.  They are applied before
-    /// the open event on the receiver and may be overriden by receiver
-    /// options in other methods.
-    PN_CPP_EXTERN void receiver_options(const receiver_options &);
-
-    /// @cond INTERNAL
-  private:
-    internal::pn_unique_ptr<container_impl> impl_;
-
-    friend class connector;
-    friend class messaging_adapter;
-    friend class receiver_options;
-    friend class sender_options;
-    friend class session_options;
-    /// @endcond
+    virtual std::string id() const = 0;
+
+    // FIXME aconway 2016-05-04: need timed injection to replace schedule()
+
+    /// Connection options that will be to outgoing connections. These are
+    /// applied first and overriden by options provided in connect() and
+    /// handler::on_connection_open()
+    /// @{
+    virtual void client_connection_options(const connection_options &) = 0;
+    virtual connection_options client_connection_options() const = 0;
+    ///@}
+
+    /// Connection options that will be applied to incoming connections. These
+    /// are applied first and overridden by options provided in listen(),
+    /// listen_handler::on_accept() and handler::on_connection_open()
+    /// @{
+    virtual void server_connection_options(const connection_options &) = 0;
+    virtual connection_options server_connection_options() const = 0;
+    ///@}
+
+    /// Sender options applied to senders created by this container. They are
+    /// applied before handler::on_sender_open() and can be over-ridden.  @{
+    /// @{
+    virtual void sender_options(const sender_options &) = 0;
+    virtual class sender_options sender_options() const = 0;
+    ///@}
+
+    /// Receiver options applied to receivers created by this container. They
+    /// are applied before handler::on_receiver_open() and can be over-ridden.
+    /// @{
+    virtual void receiver_options(const receiver_options &) = 0;
+    virtual class receiver_options receiver_options() const = 0;
+    /// @}
 };
-
 }
-
 #endif // PROTON_CPP_CONTAINER_H

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/controller.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/controller.hpp b/proton-c/bindings/cpp/include/proton/controller.hpp
deleted file mode 100644
index 6b0784c..0000000
--- a/proton-c/bindings/cpp/include/proton/controller.hpp
+++ /dev/null
@@ -1,118 +0,0 @@
-#ifndef PROTON_MT_HPP
-#define PROTON_MT_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-#include <proton/handler.hpp>
-#include <proton/connection_options.hpp>
-#include <proton/error_condition.hpp>
-
-#include <functional>
-#include <memory>
-
-namespace proton {
-
-class connection;
-
-/// The controller lets the application initiate and listen for connections, and
-/// start/stop overall IO activity. A single controller can manage many
-/// connections.
-///
-/// A controller associates a proton::handler with a proton::connection (the
-/// AMQP protocol connection) and the corresponding proton::transport
-/// (representing the underlying IO connection)
-///
-/// The first call to a proton::handler is always handler::on_transport_open(),
-/// the last is handler::on_transport_close(). Handlers can be deleted after
-/// handler::on_transport_close().
-///
-class controller {
-  public:
-    /// Create an instance of the default controller implementation.
-    /// @param container_id set on connections for this controller.
-    /// If empty, generate a random QUID default.
-    PN_CPP_EXTERN static std::unique_ptr<controller> create();
-
-    /// Get the controller associated with a connection.
-    /// @throw proton::error if this is not a controller-managed connection.
-    PN_CPP_EXTERN static controller& get(const proton::connection&);
-
-    controller(const controller&) = delete;
-
-    virtual ~controller() {}
-
-    /// Start listening on address.
-    ///
-    /// @param address identifies a listening address.
-    ///
-    /// @param make_handler returns a handler for each accepted connection.
-    /// handler::on_connection_open() is called with the incoming
-    /// proton::connection.  The handler can accept by calling
-    /// connection::open()) or reject by calling connection::close()).
-    ///
-    /// Calls to the factory for this address are serialized. Calls for separate
-    /// addresses in separate calls to listen() may be concurrent.
-    ///
-    virtual void listen(
-        const std::string& address,
-        std::function<proton::handler*(const std::string&)> make_handler,
-        const connection_options& = connection_options()) = 0;
-
-    /// Stop listening on address, must match exactly with address string given to listen().
-    virtual void stop_listening(const std::string& address) = 0;
-
-    /// Connect to address.
-    ///
-    /// The handler will get a proton::handler::on_connection_open() call the
-    /// connection is completed by the other end.
-    virtual void connect(
-        const std::string& address, proton::handler&,
-        const connection_options& = connection_options()) = 0;
-
-    /// Default options for all connections, e.g. container_id.
-    virtual void options(const connection_options&) = 0;
-
-    /// Default options for all connections, e.g. container_id.
-    virtual connection_options options() = 0;
-
-    /// Run the controller in this thread. Returns when the controller is
-    /// stopped.  Multiple threads can call run()
-    virtual void run() = 0;
-
-    /// Stop the controller: abort open connections, run() will return in all threads.
-    /// Handlers will receive on_transport_error() with the error_condition.
-    virtual void stop(const error_condition& = error_condition()) = 0;
-
-    /// The controller will stop (run() will return) when there are no open
-    /// connections or listeners left.
-    virtual void stop_on_idle() = 0;
-
-    /// Wait till the controller is stopped.
-    virtual void wait() = 0;
-
-  protected:
-    controller() {}
-};
-
-}
-
-
-#endif // PROTON_MT_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/default_container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/default_container.hpp b/proton-c/bindings/cpp/include/proton/default_container.hpp
new file mode 100644
index 0000000..d3a7608
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/default_container.hpp
@@ -0,0 +1,92 @@
+#ifndef PROTON_DEFAULT_CONTAINER_HPP
+#define PROTON_DEFAULT_CONTAINER_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// FIXME aconway 2016-05-04: doc
+
+#include <proton/container.hpp>
+
+namespace proton {
+
+// FIXME aconway 2016-05-04: doc
+class PN_CPP_CLASS_EXTERN  default_container : public container {
+ public:
+    /// Create a default, single-threaded container with a handler.
+    /// The handler will be called for all events on all connections in the container.
+    ///
+    /// Container ID should be unique within your system. If empty a random UUID is generated.
+    PN_CPP_EXTERN explicit default_container(proton::handler& h, const std::string& id = "");
+
+    /// Create a default, single-threaded container without a handler.
+    ///
+    /// Connections get their handlesr via proton::connection_options.
+    /// Container-wide defaults are set with client_connection_options() and
+    /// server_connection_options(). Per-connection options are set in connect()
+    /// and proton_listen_handler::on_accept for the proton::listen_handler
+    /// passed to listen()
+    ///
+    /// Container ID should be unique within your system. If empty a random UUID is generated.
+    PN_CPP_EXTERN explicit default_container(const std::string& id = "");
+
+    /// Wrap an existing container implementation as a default_container.
+    /// Takes ownership of c.
+    PN_CPP_EXTERN  explicit default_container(container* c) : impl_(c) {}
+
+    // FIXME aconway 2016-05-13: @copydoc all.
+
+    PN_CPP_EXTERN returned<connection> connect(const std::string& url, const connection_options &) PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN listener listen(const std::string& url, listen_handler& l) PN_CPP_OVERRIDE;
+
+    PN_CPP_EXTERN void stop_listening(const std::string& url) PN_CPP_OVERRIDE;
+
+    PN_CPP_EXTERN void run() PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN void auto_stop(bool set) PN_CPP_OVERRIDE;
+
+    PN_CPP_EXTERN void stop(const error_condition& err = error_condition()) PN_CPP_OVERRIDE;
+
+    PN_CPP_EXTERN returned<sender> open_sender(
+        const std::string &url,
+        const proton::sender_options &o = proton::sender_options(),
+        const connection_options &c = connection_options()) PN_CPP_OVERRIDE;
+
+    PN_CPP_EXTERN returned<receiver> open_receiver(
+        const std::string&url,
+        const proton::receiver_options &o = proton::receiver_options(),
+        const connection_options &c = connection_options()) PN_CPP_OVERRIDE;
+
+    PN_CPP_EXTERN std::string id() const PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN void client_connection_options(const connection_options &o) PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN connection_options client_connection_options() const PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN void server_connection_options(const connection_options &o) PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN connection_options server_connection_options() const PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN void sender_options(const class sender_options &o) PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN class sender_options sender_options() const PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN void receiver_options(const class receiver_options & o) PN_CPP_OVERRIDE;
+    PN_CPP_EXTERN class receiver_options receiver_options() const PN_CPP_OVERRIDE;
+
+ private:
+    internal::pn_unique_ptr<container> impl_;
+};
+
+}
+#endif // PROTON_DEFAULT_CONTAINER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/endpoint.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/endpoint.hpp b/proton-c/bindings/cpp/include/proton/endpoint.hpp
index 593ff32..88259ca 100644
--- a/proton-c/bindings/cpp/include/proton/endpoint.hpp
+++ b/proton-c/bindings/cpp/include/proton/endpoint.hpp
@@ -21,10 +21,11 @@
  * under the License.
  *
  */
+
+#include "proton/comparable.hpp"
 #include "proton/config.hpp"
-#include "proton/export.hpp"
 #include "proton/error_condition.hpp"
-#include "proton/comparable.hpp"
+#include "proton/export.hpp"
 
 namespace proton {
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/event_loop.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/event_loop.hpp b/proton-c/bindings/cpp/include/proton/event_loop.hpp
new file mode 100644
index 0000000..ad358c8
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/event_loop.hpp
@@ -0,0 +1,71 @@
+#ifndef PROTON_IO_EVENT_LOOP_HPP
+#define PROTON_IO_EVENT_LOOP_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/config.hpp>
+
+#include <functional>
+
+#if PN_CPP_HAS_CPP11
+#include <future>
+#include <type_traits>
+#endif
+
+struct pn_connection_t;
+struct pn_session_t;
+struct pn_link_t;
+
+namespace proton {
+
+// FIXME aconway 2016-05-04: doc
+
+class inject_handler {
+  public:
+    virtual ~inject_handler() {}
+    virtual void on_inject() = 0;
+};
+
+class PN_CPP_CLASS_EXTERN event_loop {
+ public:
+    virtual ~event_loop() {}
+
+    // FIXME aconway 2016-05-05: doc, note bool return not throw because no
+    // atomic way to determine closd status and throw during shutdown is bad.
+    virtual bool inject(inject_handler&) = 0;
+#if PN_CPP_HAS_CPP11
+    virtual bool inject(std::function<void()>) = 0;
+#endif
+
+ protected:
+    event_loop() {}
+
+ private:
+    PN_CPP_EXTERN static event_loop* get(pn_connection_t*);
+    PN_CPP_EXTERN static event_loop* get(pn_session_t*);
+    PN_CPP_EXTERN static event_loop* get(pn_link_t*);
+
+  friend class connection;
+ template <class T> friend class thread_safe;
+};
+
+}
+
+#endif // PROTON_IO_EVENT_LOOP_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index d72e394..c145fed 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -75,7 +75,7 @@ PN_CPP_CLASS_EXTERN handler
     ///
     /// @{
 
-    /// The event loop is starting.
+    /// The container event loop is starting.
     PN_CPP_EXTERN virtual void on_container_start(container &c);
     /// A message is received.
     PN_CPP_EXTERN virtual void on_message(delivery &d, message &m);
@@ -151,6 +151,7 @@ PN_CPP_CLASS_EXTERN handler
     internal::pn_unique_ptr<messaging_adapter> messaging_adapter_;
 
     friend class container;
+    friend class container_impl;
     friend class io::connection_engine;
     friend class connection_options;
     friend class receiver_options;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
index ded68de..8b8838f 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp
@@ -38,8 +38,10 @@ struct pn_collector_t;
 
 namespace proton {
 
-class handler;
-class work_queue;            // Only used for multi-threaded connection_engines.
+class event_loop;
+class proton_handler;
+
+// FIXME aconway 2016-05-04: doc
 
 /** @page integration
 
@@ -51,17 +53,15 @@ from any IO source into proton::handler event calls, and generates AMQP
 byte-encoded output that can be written to any IO destination.
 
 The integration needs to implement two user-visible interfaces:
- - proton::controller lets the user initiate or listen for connections.
- - proton::work_queue lets the user serialize their own work with a connection.
-
- @see epoll_controller.cpp for an example of an integration.
-
-[TODO controller doesn't belong in the mt namespace, a single-threaded
-integration would need a controller too.]
+ - proton::container lets the user initiate or listen for connections.
+ - proton::event_loop lets the user serialize their own work with a connection.
 
+ @see epoll_container.cpp for an example of an integration.
 */
 namespace io {
 
+class link_namer;
+
 /// Pointer to a mutable memory region with a size.
 struct mutable_buffer {
     char* data;                 ///< Beginning of the buffered data.
@@ -111,12 +111,29 @@ struct const_buffer {
 class
 PN_CPP_CLASS_EXTERN connection_engine {
   public:
-    /// Create a connection engine that dispatches to handler.
-    // TODO aconway 2016-04-06: no options, only via handler.
-    PN_CPP_EXTERN connection_engine(handler&, const connection_options& = connection_options());
+    /// Create a connection engine. opts must contain a handler.
+    /// Takes ownership of loop, will be deleted only when the proton::connection is.
+    PN_CPP_EXTERN connection_engine(proton::container&, link_namer&, event_loop* loop = 0);
 
     PN_CPP_EXTERN ~connection_engine();
 
+    /// Configure a connection by applying exactly the options in opts (including proton::handler)
+    /// Does not apply any default options, to apply container defaults use connect() or accept()
+    /// instead.
+    void configure(const connection_options& opts=connection_options());
+
+    /// Call configure() with client options and call connection::open()
+    /// Options applied: container::id(), container::client_connection_options(), opts.
+    PN_CPP_EXTERN void connect(const connection_options& opts);
+
+    /// Call configure() with server options.
+    /// Options applied: container::id(), container::server_connection_options(), opts.
+    ///
+    /// Note this does not call connection::open(). If there is a handler in the
+    /// composed options it will receive handler::on_connection_open() and can
+    /// respond with connection::open() or connection::close()
+    PN_CPP_EXTERN void accept(const connection_options& opts);
+
     /// The engine's read buffer. Read data into this buffer then call read_done() when complete.
     /// Returns mutable_buffer(0, 0) if the engine cannot currently read data.
     /// Calling dispatch() may open up more buffer space.
@@ -144,10 +161,19 @@ PN_CPP_CLASS_EXTERN connection_engine {
     /// Note that there may still be events to dispatch() or data to read.
     PN_CPP_EXTERN void write_close();
 
-    /// Close the engine with an error that will be passed to handler::on_transport_error().
-    /// Calls read_close() and write_close().
-    /// Note: You still need to call dispatch() to process final close-down events.
-    PN_CPP_EXTERN void close(const error_condition&);
+    /// Inform the engine that the transport been disconnected unexpectedly,
+    /// without completing the AMQP connection close sequence.
+    ///
+    /// This calls read_close(), write_close(), sets the transport().error() and
+    /// queues an `on_transport_error` event. You must call dispatch() one more
+    /// time to dispatch the handler::on_transport_error() call and other final
+    /// events.
+    ///
+    /// Note this does not close the connection() so that a proton::handler can
+    /// distinguish between a connection close error sent by the remote peer and
+    /// a transport failure.
+    ///
+    PN_CPP_EXTERN void disconnected(const error_condition& = error_condition());
 
     /// Dispatch all available events and call the corresponding \ref handler methods.
     ///
@@ -168,20 +194,19 @@ PN_CPP_CLASS_EXTERN connection_engine {
     /// Get the transport associated with this connection_engine.
     PN_CPP_EXTERN proton::transport transport() const;
 
-    /// For controller connections, set the connection's work_queue. Set
-    /// via plain pointer, not std::shared_ptr so the connection_engine can be
-    /// compiled with C++03.  The work_queue must outlive the engine. The
-    /// std::shared_ptr<work_queue> will be available via work_queue::get(this->connection())
-    PN_CPP_EXTERN void work_queue(class work_queue*);
+    /// Get the container associated with this connection_engine.
+    PN_CPP_EXTERN proton::container& container() const;
 
-  private:
+ private:
     connection_engine(const connection_engine&);
     connection_engine& operator=(const connection_engine&);
 
-    proton::handler& handler_;
+    // FIXME aconway 2016-05-06: reduce binary compat footprint, move stuff to connection context.
+    proton::proton_handler* handler_;
     proton::connection connection_;
     proton::transport transport_;
     proton::internal::pn_ptr<pn_collector_t> collector_;
+    proton::container& container_;
 };
 
 }}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp b/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
new file mode 100644
index 0000000..d3fd74a
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
@@ -0,0 +1,120 @@
+#ifndef PROTON_IO_CONTAINER_IMPL_BASE_HPP
+#define PROTON_IO_CONTAINER_IMPL_BASE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/io/link_namer.hpp>
+#include <proton/container.hpp>
+
+#include <mutex>
+#include <sstream>
+
+namespace proton {
+namespace io {
+
+/// Thread-safe partial implementation of proton::container interface to reduce
+/// boilerplate code in container implementations. Requires C++11.
+///
+/// You can ignore this class if you want to implement the functions in a
+/// different way.
+///
+class container_impl_base : public container {
+  public:
+
+    void client_connection_options(const connection_options & opts) {
+        store(client_copts_, opts);
+    }
+    connection_options client_connection_options() const {
+        return load(client_copts_);
+    }
+    void server_connection_options(const connection_options & opts) {
+        store(server_copts_, opts);
+    }
+    connection_options server_connection_options() const {
+        return load(server_copts_);
+    }
+    void sender_options(const class sender_options & opts) {
+        store(sender_opts_, opts);
+    }
+    class sender_options sender_options() const {
+        return load(sender_opts_);
+    }
+    void receiver_options(const class receiver_options & opts) {
+        store(receiver_opts_, opts);
+    }
+    class receiver_options receiver_options() const {
+        return load(receiver_opts_);
+    }
+
+    returned<sender> open_sender(
+        const std::string &url, const class sender_options &opts, const connection_options &copts)
+    {
+        return open_link<sender, class sender_options>(url, opts, copts, &connection::open_sender);
+    }
+
+    returned<receiver> open_receiver(
+        const std::string &url, const class receiver_options &opts, const connection_options &copts)
+    {
+        return open_link<receiver>(url, opts, copts, &connection::open_receiver);
+    }
+
+
+  private:
+    template<class T, class Opts>
+    returned<T> open_link(
+        const std::string &url_str, const Opts& opts, const connection_options& copts,
+        T (connection::*open_fn)(const std::string&, const Opts&))
+    {
+        std::string addr = url(url_str).path();
+        std::shared_ptr<thread_safe<connection> > ts_connection = connect(url_str, copts);
+        std::promise<returned<T> > result_promise;
+        auto do_open = [ts_connection, addr, opts, open_fn, &result_promise]() {
+            try {
+                connection c = ts_connection->unsafe();
+                returned<T> s = make_thread_safe((c.*open_fn)(addr, opts));
+                result_promise.set_value(s);
+            } catch (...) {
+                result_promise.set_exception(std::current_exception());
+            }
+        };
+        ts_connection->event_loop()->inject(do_open);
+        std::future<returned<T> > result_future = result_promise.get_future();
+        if (!result_future.valid())
+            throw error(url_str+": connection closed");
+        return result_future.get();
+    }
+
+    mutable std::mutex lock_;
+    template <class T> T load(const T& v) const {
+        std::lock_guard<std::mutex> g(lock_);
+        return v;
+    }
+    template <class T> void store(T& v, const T& x) const {
+        std::lock_guard<std::mutex> g(lock_);
+        v = x;
+    }
+    connection_options client_copts_, server_copts_;
+    class receiver_options receiver_opts_;
+    class sender_options sender_opts_;
+};
+
+}}
+
+#endif // PROTON_IO_CONTAINER_IMPL_BASE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1b8450d6/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/default_controller.hpp b/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
deleted file mode 100644
index f876d5f..0000000
--- a/proton-c/bindings/cpp/include/proton/io/default_controller.hpp
+++ /dev/null
@@ -1,47 +0,0 @@
-#ifndef PROTON_IO_DRIVER_HPP
-#define PROTON_IO_DRIVER_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <functional>
-#include <memory>
-
-namespace proton {
-
-class controller;
-
-namespace io {
-
-/// A proton::controller implementation can create a static instance of default_controller
-/// to register as the default implementation.
-/// If more than one implementation is linked, which one becomes the default
-/// is undefined.
-struct default_controller {
-
-    /// A controller make-function takes a string container-id and returns a unique_ptr<controller>
-    typedef std::function<std::unique_ptr<controller>()> make_fn;
-
-    /// Construct a static instance of default_controller to register your controller factory.
-    PN_CPP_EXTERN default_controller(make_fn);
-};
-
-}}
-
-#endif // PROTON_IO_CONTROLLER_HPP


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message