qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject [05/20] qpid-proton git commit: PROTON-1400: [C++ binding] Proactor container implementation - Remove all reactor use - Rearrange object context code - Change container includes to proactor container includes - Add sender/receiver options API to connecti
Date Fri, 21 Jul 2017 17:02:01 GMT
PROTON-1400: [C++ binding] Proactor container implementation
- Remove all reactor use
- Rearrange object context code
- Change container includes to proactor container includes
- Add sender/receiver options API to connection so we never need container in handlers
- Rework connection_driver remove all use of container
- Change signature of listener_handler callbacks to supply the listener


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9fad779c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9fad779c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9fad779c

Branch: refs/heads/master
Commit: 9fad779c98dcc2ccc75e5055d7333e9dd862c235
Parents: 6f88f52
Author: Andrew Stitcher <astitcher@apache.org>
Authored: Wed Feb 8 02:32:36 2017 -0500
Committer: Andrew Stitcher <astitcher@apache.org>
Committed: Fri Jul 21 12:50:06 2017 -0400

----------------------------------------------------------------------
 examples/cpp/ssl.cpp                            |   9 +-
 proton-c/bindings/cpp/CMakeLists.txt            |   6 +-
 .../bindings/cpp/include/proton/connection.hpp  |   6 +
 .../bindings/cpp/include/proton/container.hpp   |   6 -
 .../cpp/include/proton/internal/config.hpp      |   8 +
 .../cpp/include/proton/io/connection_driver.hpp |  25 +-
 .../include/proton/io/container_impl_base.hpp   | 144 -------
 .../cpp/include/proton/listen_handler.hpp       |   6 +-
 .../bindings/cpp/include/proton/listener.hpp    |  18 +-
 proton-c/bindings/cpp/src/connection.cpp        |  27 +-
 .../bindings/cpp/src/connection_driver_test.cpp |  11 +-
 .../bindings/cpp/src/connection_options.cpp     |  30 +-
 proton-c/bindings/cpp/src/container.cpp         |  20 +-
 proton-c/bindings/cpp/src/container_test.cpp    |  11 +-
 proton-c/bindings/cpp/src/contexts.cpp          |  55 +--
 proton-c/bindings/cpp/src/event_loop.cpp        |   2 +-
 proton-c/bindings/cpp/src/include/contexts.hpp  |  56 +--
 .../cpp/src/include/messaging_adapter.hpp       |   2 -
 .../cpp/src/include/proactor_container_impl.hpp | 133 ++++++
 .../src/include/proactor_event_loop_impl.hpp    |  54 +++
 .../bindings/cpp/src/include/proton_bits.hpp    |  18 +-
 .../bindings/cpp/src/include/proton_event.hpp   |  16 +-
 .../cpp/src/include/test_dummy_container.hpp    |  82 ----
 .../bindings/cpp/src/io/connection_driver.cpp   |  31 +-
 proton-c/bindings/cpp/src/listener.cpp          |  11 +-
 proton-c/bindings/cpp/src/messaging_adapter.cpp |  21 +-
 .../cpp/src/proactor_container_impl.cpp         | 419 +++++++++++++++++++
 proton-c/bindings/cpp/src/receiver.cpp          |   1 -
 proton-c/bindings/cpp/src/receiver_options.cpp  |   2 +-
 proton-c/bindings/cpp/src/reconnect_timer.cpp   |   1 -
 proton-c/bindings/cpp/src/sender_options.cpp    |   2 +-
 proton-c/bindings/cpp/src/session_options.cpp   |   2 +-
 32 files changed, 765 insertions(+), 470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/examples/cpp/ssl.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/ssl.cpp b/examples/cpp/ssl.cpp
index 2e901c2..00bbccd 100644
--- a/examples/cpp/ssl.cpp
+++ b/examples/cpp/ssl.cpp
@@ -66,16 +66,16 @@ namespace {
 
 
 struct server_handler : public proton::messaging_handler {
-    std::string url;
+    proton::listener listener;
 
     void on_connection_open(proton::connection &c) OVERRIDE {
         std::cout << "Inbound server connection connected via SSL.  Protocol: " <<
             c.transport().ssl().protocol() << std::endl;
-        c.container().stop_listening(url);  // Just expecting the one connection.
+        listener.stop();  // Just expecting the one connection.
     }
 
     void on_transport_error(proton::transport &t) OVERRIDE {
-        t.connection().container().stop_listening(url);
+        listener.stop();
     }
 
     void on_message(proton::delivery &, proton::message &m) OVERRIDE {
@@ -122,8 +122,7 @@ class hello_world_direct : public proton::messaging_handler {
         } else throw std::logic_error("bad verify mode: " + verify);
 
         c.client_connection_options(client_opts);
-        s_handler.url = url;
-        c.listen(url);
+        s_handler.listener = c.listen(url);
         c.open_sender(url);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index d1c6fd1..295a99e 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -32,9 +32,8 @@ set(qpid-proton-cpp-source
   src/map.cpp
   src/connection.cpp
   src/connection_options.cpp
-  src/connector.cpp
   src/container.cpp
-  src/container_impl.cpp
+  src/proactor_container_impl.cpp
   src/contexts.cpp
   src/data.cpp
   src/decimal.cpp
@@ -58,7 +57,6 @@ set(qpid-proton-cpp-source
   src/proton_bits.cpp
   src/proton_event.cpp
   src/proton_handler.cpp
-  src/reactor.cpp
   src/receiver.cpp
   src/receiver_options.cpp
   src/reconnect_timer.cpp
@@ -91,7 +89,7 @@ set_source_files_properties (
 
 add_library(qpid-proton-cpp SHARED ${qpid-proton-cpp-source})
 
-target_link_libraries (qpid-proton-cpp ${PLATFORM_LIBS} qpid-proton)
+target_link_libraries (qpid-proton-cpp ${PLATFORM_LIBS} qpid-proton-core qpid-proton-proactor)
 
 set_target_properties (
   qpid-proton-cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index a4046be..331ba82 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -106,6 +106,12 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi
     PN_CPP_EXTERN receiver open_receiver(const std::string &addr,
                                          const receiver_options &);
 
+    /// @copydoc container::sender_options
+    PN_CPP_EXTERN class sender_options sender_options() const;
+
+    /// @copydoc container::receiver_options
+    PN_CPP_EXTERN class receiver_options receiver_options() const;
+
     /// Return all sessions on this connection.
     PN_CPP_EXTERN session_range sessions() const;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index 6f10c3c..be83e5e 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -73,12 +73,6 @@ class PN_CPP_CLASS_EXTERN container {
     /// Connect to `url` and send an open request to the remote peer.
     PN_CPP_EXTERN returned<connection> connect(const std::string& url);
 
-    /// @cond INTERNAL
-    /// Stop listening on url, must match the url string given to listen().
-    /// You can also use the proton::listener object returned by listen()
-    PN_CPP_EXTERN void stop_listening(const std::string& url);
-    /// @endcond
-
     /// Start listening on url.
     ///
     /// Calls to the @ref listen_handler are serialized for this listener,

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/internal/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp
index da7f480..79d201c 100644
--- a/proton-c/bindings/cpp/include/proton/internal/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp
@@ -95,6 +95,14 @@
 #define PN_CPP_HAS_CHRONO PN_CPP_HAS_CPP11
 #endif
 
+#ifndef PN_CPP_HAS_STD_MUTEX
+#define PN_CPP_HAS_STD_MUTEX PN_CPP_HAS_CPP11
+#endif
+
+#ifndef PN_CPP_HAS_STD_ATOMIC
+#define PN_CPP_HAS_STD_ATOMIC PN_CPP_HAS_CPP11
+#endif
+
 #endif // PROTON_INTERNAL_CONFIG_HPP
 
 /// @endcond

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 56deb00..8d0be85 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -94,28 +94,11 @@ struct const_buffer {
 class
 PN_CPP_CLASS_EXTERN connection_driver {
   public:
-    /// An engine that is not associated with a proton::container or
-    /// proton::event_loop.
-    ///
-    /// Accessing the container or event_loop for this connection in
-    /// a proton::messaging_handler will throw a proton::error exception.
-    ///
+    /// An engine without a container id.
     PN_CPP_EXTERN connection_driver();
 
-    /// Create a connection driver associated with a proton::container and
-    /// optional event_loop. If the event_loop is not provided attempts to use
-    /// it will throw proton::error.
-    ///
-    /// Takes ownership of the event_loop. Note the proton::connection created
-    /// by this connection_driver can outlive the connection_driver itself if
-    /// the user pins it in memory using the proton::thread_safe<> template.
-    /// The event_loop is deleted when, and only when, the proton::connection is.
-    ///
-    PN_CPP_EXTERN connection_driver(proton::container&);
-#if PN_CPP_HAS_RVALUE_REFERENCES
-    /// @copydoc connection_driver()
-    PN_CPP_EXTERN connection_driver(proton::container&, event_loop&& loop);
-#endif
+    /// Create a connection driver associated with a container id.
+    PN_CPP_EXTERN connection_driver(const std::string&);
 
     PN_CPP_EXTERN ~connection_driver();
 
@@ -207,8 +190,8 @@ PN_CPP_CLASS_EXTERN connection_driver {
     connection_driver(const connection_driver&);
     connection_driver& operator=(const connection_driver&);
 
+    std::string container_id_;
     messaging_handler* handler_;
-    proton::container* container_;
     pn_connection_driver_t driver_;
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp b/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
deleted file mode 100644
index a04b4ff..0000000
--- a/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp
+++ /dev/null
@@ -1,144 +0,0 @@
-#ifndef PROTON_IO_CONTAINER_IMPL_BASE_HPP
-#define PROTON_IO_CONTAINER_IMPL_BASE_HPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "../container.hpp"
-
-#include <future>
-#include <mutex>
-#include <sstream>
-
-namespace proton {
-namespace io {
-
-/// **Experimental** - A base container implementation.
-///
-/// This is a thread-safe partial implementation of the
-/// proton::container interface to reduce boilerplate code in
-/// container implementations. Requires C++11.
-///
-/// You can ignore this class if you want to implement the functions
-/// in a different way.
-class container_impl_base : public standard_container {
-  public:
-    // Pull in base class functions here so that name search finds all the overloads
-    using standard_container::open_receiver;
-    using standard_container::open_sender;
-
-    /// @see proton::container::client_connection_options
-    void client_connection_options(const connection_options & opts) {
-        store(client_copts_, opts);
-    }
-    
-    /// @see proton::container::client_connection_options
-    connection_options client_connection_options() const {
-        return load(client_copts_);
-    }
-    
-    /// @see proton::container::server_connection_options
-    void server_connection_options(const connection_options & opts) {
-        store(server_copts_, opts);
-    }
-    
-    /// @see proton::container::server_connection_options
-    connection_options server_connection_options() const {
-        return load(server_copts_);
-    }
-    
-    /// @see proton::container::sender_options
-    void sender_options(const class sender_options & opts) {
-        store(sender_opts_, opts);
-    }
-    
-    /// @see proton::container::sender_options
-    class sender_options sender_options() const {
-        return load(sender_opts_);
-    }
-    
-    /// @see proton::container::receiver_options
-    void receiver_options(const class receiver_options & opts) {
-        store(receiver_opts_, opts);
-    }
-    
-    /// @see proton::container::receiver_options
-    class receiver_options receiver_options() const {
-        return load(receiver_opts_);
-    }
-
-    /// @see proton::container::open_sender
-    returned<sender> open_sender(
-        const std::string &url, const class sender_options &opts, const connection_options &copts)
-    {
-        return open_link<sender, class sender_options>(url, opts, copts, &connection::open_sender);
-    }
-
-    /// @see proton::container::open_receiver
-    returned<receiver> open_receiver(
-        const std::string &url, const class receiver_options &opts, const connection_options &copts)
-    {
-        return open_link<receiver>(url, opts, copts, &connection::open_receiver);
-    }
-
-  private:
-    template<class T, class Opts>
-    returned<T> open_link(
-        const std::string &url_str, const Opts& opts, const connection_options& copts,
-        T (connection::*open_fn)(const std::string&, const Opts&))
-    {
-        std::string addr = url(url_str).path();
-        std::shared_ptr<thread_safe<connection> > ts_connection = connect(url_str, copts);
-        std::promise<returned<T> > result_promise;
-        auto do_open = [ts_connection, addr, opts, open_fn, &result_promise]() {
-            try {
-                connection c = ts_connection->unsafe();
-                returned<T> s = make_thread_safe((c.*open_fn)(addr, opts));
-                result_promise.set_value(s);
-            } catch (...) {
-                result_promise.set_exception(std::current_exception());
-            }
-        };
-        ts_connection->event_loop()->inject(do_open);
-        std::future<returned<T> > result_future = result_promise.get_future();
-        if (!result_future.valid())
-            throw error(url_str+": connection closed");
-        return result_future.get();
-    }
-
-    mutable std::mutex lock_;
-    template <class T> T load(const T& v) const {
-        std::lock_guard<std::mutex> g(lock_);
-        return v;
-    }
-    template <class T> void store(T& v, const T& x) const {
-        std::lock_guard<std::mutex> g(lock_);
-        v = x;
-    }
-    connection_options client_copts_, server_copts_;
-    class receiver_options receiver_opts_;
-    class sender_options sender_opts_;
-};
-
-} // io
-} // proton
-
-#endif // PROTON_IO_CONTAINER_IMPL_BASE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/listen_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/listen_handler.hpp b/proton-c/bindings/cpp/include/proton/listen_handler.hpp
index 99f7558..08d5e76 100644
--- a/proton-c/bindings/cpp/include/proton/listen_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/listen_handler.hpp
@@ -41,14 +41,14 @@ class listen_handler {
     /// the connection.  messaging_handler::on_connection_open() will be called with
     /// the proton::connection, it can call connection::open() to accept or
     /// connection::close() to reject the connection.
-    virtual connection_options on_accept()= 0;
+    virtual connection_options on_accept(listener&)= 0;
 
     /// Called if there is a listening error, with an error message.
     /// close() will also be called.
-    virtual void on_error(const std::string&) {}
+    virtual void on_error(listener&, const std::string&) {}
 
     /// Called when this listen_handler is no longer needed, and can be deleted.
-    virtual void on_close() {}
+    virtual void on_close(listener&) {}
 };
 
 } // proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/listener.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/listener.hpp b/proton-c/bindings/cpp/include/proton/listener.hpp
index 4b4ca24..c7f95a7 100644
--- a/proton-c/bindings/cpp/include/proton/listener.hpp
+++ b/proton-c/bindings/cpp/include/proton/listener.hpp
@@ -20,30 +20,30 @@
  * under the License.
  */
 
-#include "./fwd.hpp"
 #include "./internal/export.hpp"
 
-#include <string>
+struct pn_listener_t;
 
 namespace proton {
 
 /// A listener for incoming connections.
 class PN_CPP_CLASS_EXTERN listener {
+    /// @cond INTERNAL
+    listener(pn_listener_t*);
+    /// @endcond
+
   public:
     /// Create an empty listener.
     PN_CPP_EXTERN listener();
 
-    /// @cond INTERNAL
-    PN_CPP_EXTERN listener(container&, const std::string&);
-    /// @endcond
-
     /// Stop listening on the address provided to the call to
     /// container::listen that returned this listener.
     PN_CPP_EXTERN void stop();
 
- private:
-    std::string url_;
-    container* container_;
+  private:
+    pn_listener_t* listener_;
+
+  friend class container;
 };
 
 } // proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index f706df4..113a06f 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -38,7 +38,6 @@
 #include <proton/connection.h>
 #include <proton/session.h>
 #include <proton/transport.h>
-#include <proton/reactor.h>
 #include <proton/object.h>
 
 namespace proton {
@@ -72,13 +71,7 @@ std::string connection::user() const {
 
 container& connection::container() const {
     class container* c = connection_context::get(pn_object()).container;
-    if (!c) {
-        pn_reactor_t *r = pn_object_reactor(pn_object());
-        if (r)
-            c = &container_context::get(r);
-    }
-    if (!c)
-        throw proton::error("connection does not have a container");
+    if (!c) throw proton::error("No container");
     return *c;
 }
 
@@ -133,7 +126,7 @@ sender connection::open_sender(const std::string &addr) {
     return open_sender(addr, sender_options());
 }
 
-sender connection::open_sender(const std::string &addr, const sender_options &opts) {
+sender connection::open_sender(const std::string &addr, const class sender_options &opts) {
     return default_session().open_sender(addr, opts);
 }
 
@@ -141,11 +134,25 @@ receiver connection::open_receiver(const std::string &addr) {
     return open_receiver(addr, receiver_options());
 }
 
-receiver connection::open_receiver(const std::string &addr, const receiver_options &opts)
+receiver connection::open_receiver(const std::string &addr, const class receiver_options &opts)
 {
     return default_session().open_receiver(addr, opts);
 }
 
+class sender_options connection::sender_options() const {
+    connection_context& ctx = connection_context::get(pn_object());
+    return ctx.container ?
+        ctx.container->sender_options() :
+        proton::sender_options();
+}
+
+class receiver_options connection::receiver_options() const {
+    connection_context& ctx = connection_context::get(pn_object());
+    return ctx.container ?
+        ctx.container->receiver_options() :
+        proton::receiver_options();
+}
+
 error_condition connection::error() const {
     return make_wrapper(pn_connection_remote_condition(pn_object()));
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection_driver_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_driver_test.cpp b/proton-c/bindings/cpp/src/connection_driver_test.cpp
index a5771f9..ae18ebe 100644
--- a/proton-c/bindings/cpp/src/connection_driver_test.cpp
+++ b/proton-c/bindings/cpp/src/connection_driver_test.cpp
@@ -31,6 +31,7 @@
 #include "proton/sender.hpp"
 #include "proton/sender_options.hpp"
 #include "proton/source_options.hpp"
+#include "proton/thread_safe.hpp"
 #include "proton/types_fwd.hpp"
 #include "proton/uuid.hpp"
 
@@ -57,8 +58,8 @@ struct in_memory_driver : public connection_driver {
     byte_stream& writes;
     int spinning;
 
-    in_memory_driver(byte_stream& rd, byte_stream& wr) :
-        reads(rd), writes(wr), spinning(0) {}
+    in_memory_driver(byte_stream& rd, byte_stream& wr, const std::string& name) :
+        connection_driver(name), reads(rd), writes(wr), spinning(0)  {}
 
     void do_read() {
         mutable_buffer rbuf = read_buffer();
@@ -102,8 +103,10 @@ struct driver_pair {
     byte_stream ab, ba;
     in_memory_driver a, b;
 
-    driver_pair(const connection_options& oa, const connection_options& ob)
-        : a(ba, ab), b(ab, ba)
+    driver_pair(const connection_options& oa, const connection_options& ob,
+                const std::string& name=""
+    ) :
+        a(ba, ab, name+"a"), b(ab, ba, name+"b")
     {
         a.connect(oa);
         b.accept(ob);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index 506e84e..4644094 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -18,6 +18,7 @@
  * under the License.
  *
  */
+#include "proton/fwd.hpp"
 #include "proton/connection_options.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/reconnect_timer.hpp"
@@ -27,12 +28,12 @@
 
 #include "acceptor.hpp"
 #include "contexts.hpp"
-#include "connector.hpp"
 #include "messaging_adapter.hpp"
 #include "msg.hpp"
 #include "proton_bits.hpp"
 
 #include <proton/connection.h>
+#include <proton/proactor.h>
 #include <proton/transport.h>
 
 namespace proton {
@@ -74,15 +75,14 @@ class connection_options::impl {
      */
     void apply_unbound(connection& c) {
         pn_connection_t *pnc = unwrap(c);
-        container::impl::connector *outbound = dynamic_cast<container::impl::connector*>(
-            connection_context::get(unwrap(c)).handler.get());
 
         // Only apply connection options if uninit.
         bool uninit = c.uninitialized();
         if (!uninit) return;
 
+        bool outbound = !connection_context::get(pnc).listener_context_;
         if (reconnect.set && outbound)
-            outbound->reconnect_timer(reconnect.value);
+            connection_context::get(pnc).reconnect.reset(new reconnect_timer(reconnect.value));
         if (container_id.set)
             pn_connection_set_container(pnc, container_id.value.c_str());
         if (virtual_host.set)
@@ -97,31 +97,23 @@ class connection_options::impl {
         // Transport options.  pnt is NULL between reconnect attempts
         // and if there is a pipelined open frame.
         pn_connection_t *pnc = unwrap(c);
-        container::impl::connector *outbound = dynamic_cast<container::impl::connector*>(
-            connection_context::get(unwrap(c)).handler.get());
-
         pn_transport_t *pnt = pn_connection_transport(pnc);
         if (!pnt) return;
 
         // SSL
-        if (outbound && outbound->address().scheme() == url::AMQPS) {
+        connection_context& cc = connection_context::get(pnc);
+        bool outbound = !cc.listener_context_;
+        if (outbound && ssl_client_options.set) {
             // A side effect of pn_ssl() is to set the ssl peer
             // hostname to the connection hostname, which has
             // already been adjusted for the virtual_host option.
             pn_ssl_t *ssl = pn_ssl(pnt);
             if (pn_ssl_init(ssl, ssl_client_options.value.pn_domain(), NULL))
                 throw error(MSG("client SSL/TLS initialization error"));
-        } else if (!outbound) {
-            // TODO aconway 2016-05-13: reactor only
-            pn_acceptor_t *pnp = pn_connection_acceptor(pnc);
-            if (pnp) {
-                listener_context &lc(listener_context::get(pnp));
-                if (lc.ssl) {
-                    pn_ssl_t *ssl = pn_ssl(pnt);
-                    if (pn_ssl_init(ssl, ssl_server_options.value.pn_domain(), NULL))
-                        throw error(MSG("server SSL/TLS initialization error"));
-                }
-            }
+        } else if (!outbound && ssl_server_options.set) {
+                pn_ssl_t *ssl = pn_ssl(pnt);
+                if (pn_ssl_init(ssl, ssl_server_options.value.pn_domain(), NULL))
+                    throw error(MSG("server SSL/TLS initialization error"));
         }
 
         // SASL

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index 3daa925..b98da78 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -27,7 +27,7 @@
 #include "proton/listener.hpp"
 #include "proton/thread_safe.hpp"
 
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
 
 namespace proton {
 
@@ -65,24 +65,12 @@ returned<receiver> container::open_receiver(const std::string &url, const proton
     return open_receiver(url, receiver_options(), co);
 }
 
-namespace{
-    struct listen_opts : public listen_handler {
-        connection_options  opts;
-        listen_opts(const connection_options& o) : opts(o) {}
-        connection_options on_accept() { return opts; }
-        void on_close() { delete this; }
-    };
-}
-
 listener container::listen(const std::string& url, const connection_options& opts) {
-    // Note: listen_opts::on_close() calls delete(this) so this is not a leak.
-    // The container will always call on_closed() even if there are errors or exceptions.
-    listen_opts* lh = new listen_opts(opts);
-    return listen(url, *lh);
+    return impl_->listen(url, opts);
 }
 
 listener container::listen(const std::string &url) {
-    return listen(url, connection_options());
+    return impl_->listen(url);
 }
 
 void container::stop() { stop(error_condition()); }
@@ -93,8 +81,6 @@ returned<connection> container::connect(const std::string& url, const connection
 
 listener container::listen(const std::string& url, listen_handler& l) { return impl_->listen(url, l); }
 
-void container::stop_listening(const std::string& url) { impl_->stop_listening(url); }
-
 void container::run() { impl_->run(); }
 
 void container::auto_stop(bool set) { impl_->auto_stop(set); }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/container_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp
index e02aff5..d210268 100644
--- a/proton-c/bindings/cpp/src/container_test.cpp
+++ b/proton-c/bindings/cpp/src/container_test.cpp
@@ -124,12 +124,12 @@ struct test_listener : public proton::listen_handler {
     bool on_accept_, on_close_;
     std::string on_error_;
     test_listener() : on_accept_(false), on_close_(false) {}
-    proton::connection_options on_accept() PN_CPP_OVERRIDE {
+    proton::connection_options on_accept(proton::listener&) PN_CPP_OVERRIDE {
         on_accept_ = true;
         return proton::connection_options();
     }
-    void on_close() PN_CPP_OVERRIDE { on_close_ = true; }
-    void on_error(const std::string& e) PN_CPP_OVERRIDE { on_error_ = e; }
+    void on_close(proton::listener&) PN_CPP_OVERRIDE { on_close_ = true; }
+    void on_error(proton::listener&, const std::string& e) PN_CPP_OVERRIDE { on_error_ = e; }
 };
 
 int test_container_bad_address() {
@@ -179,6 +179,11 @@ class stop_tester : public proton::messaging_handler {
         state = 5;
     }
 
+    void on_transport_error(proton::transport & t) PN_CPP_OVERRIDE {
+        // Do nothing - ignore transport errors - we're going to get one when
+        // the container stops.
+    }
+
 public:
     stop_tester(): state(0) {}
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index b1a234f..81ef5eb 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -21,7 +21,6 @@
 
 #include "contexts.hpp"
 #include "msg.hpp"
-#include "reactor.hpp"
 #include "proton_bits.hpp"
 
 #include "proton/error.hpp"
@@ -29,8 +28,9 @@
 #include <proton/connection.h>
 #include <proton/object.h>
 #include <proton/link.h>
+#include <proton/listener.h>
 #include <proton/message.h>
-#include <proton/reactor.h>
+#include "proton/reconnect_timer.hpp"
 #include <proton/session.h>
 
 #include <typeinfo>
@@ -48,16 +48,10 @@ pn_class_t cpp_context_class = PN_CLASS(cpp_context);
 
 // Handles
 PN_HANDLE(CONNECTION_CONTEXT)
-PN_HANDLE(CONTAINER_CONTEXT)
 PN_HANDLE(LISTENER_CONTEXT)
+PN_HANDLE(SESSION_CONTEXT)
 PN_HANDLE(LINK_CONTEXT)
 
-void set_context(pn_record_t* record, pn_handle_t handle, const pn_class_t *clazz, void* value)
-{
-    pn_record_def(record, handle, clazz);
-    pn_record_set(record, handle, value);
-}
-
 template <class T>
 T* get_context(pn_record_t* record, pn_handle_t handle) {
     return reinterpret_cast<T*>(pn_record_get(record, handle));
@@ -71,45 +65,24 @@ void *context::alloc(size_t n) { return pn_object_new(&cpp_context_class, n); }
 
 pn_class_t* context::pn_class() { return &cpp_context_class; }
 
+connection_context::connection_context() :
+    container(0), default_session(0), link_gen(0), handler(0), listener_context_(0)
+{}
 
-context::id connection_context::id(pn_connection_t* c) {
-    return context::id(pn_connection_attachments(c), CONNECTION_CONTEXT);
-}
-
-void container_context::set(const reactor& r, container& c) {
-    set_context(pn_reactor_attachments(unwrap(r)), CONTAINER_CONTEXT, PN_VOID, &c);
+connection_context& connection_context::get(pn_connection_t *c) {
+    return ref<connection_context>(id(pn_connection_attachments(c), CONNECTION_CONTEXT));
 }
 
-container &container_context::get(pn_reactor_t *pn_reactor) {
-    container *ctx = get_context<container>(pn_reactor_attachments(pn_reactor), CONTAINER_CONTEXT);
-    if (!ctx) throw error(MSG("Reactor has no C++ container context"));
-    return *ctx;
+listener_context& listener_context::get(pn_listener_t* l) {
+    return ref<listener_context>(id(pn_listener_attachments(l), LISTENER_CONTEXT));
 }
 
-listener_context& listener_context::get(pn_acceptor_t* a) {
-    // TODO aconway 2016-05-13: reactor only
-    // A Proton C pn_acceptor_t is really just a selectable
-    pn_selectable_t *sel = reinterpret_cast<pn_selectable_t*>(a);
-
-    listener_context* ctx =
-        get_context<listener_context>(pn_selectable_attachments(sel), LISTENER_CONTEXT);
-    if (!ctx) {
-        ctx =  context::create<listener_context>();
-        set_context(pn_selectable_attachments(sel), LISTENER_CONTEXT, context::pn_class(), ctx);
-        pn_decref(ctx);
-    }
-    return *ctx;
+link_context& link_context::get(pn_link_t* l) {
+    return ref<link_context>(id(pn_link_attachments(l), LINK_CONTEXT));
 }
 
-link_context& link_context::get(pn_link_t* l) {
-    link_context* ctx =
-        get_context<link_context>(pn_link_attachments(l), LINK_CONTEXT);
-    if (!ctx) {
-        ctx =  context::create<link_context>();
-        set_context(pn_link_attachments(l), LINK_CONTEXT, context::pn_class(), ctx);
-        pn_decref(ctx);
-    }
-    return *ctx;
+session_context& session_context::get(pn_session_t* s) {
+    return ref<session_context>(id(pn_session_attachments(s), SESSION_CONTEXT));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/event_loop.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/event_loop.cpp b/proton-c/bindings/cpp/src/event_loop.cpp
index ea4ee71..ab39aa7 100644
--- a/proton-c/bindings/cpp/src/event_loop.cpp
+++ b/proton-c/bindings/cpp/src/event_loop.cpp
@@ -20,7 +20,7 @@
 #include "proton/event_loop.hpp"
 
 #include "contexts.hpp"
-#include "event_loop_impl.hpp"
+#include "proactor_event_loop_impl.hpp"
 
 #include <proton/session.h>
 #include <proton/link.h>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp
index 742b346..c096a6e 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -34,16 +34,16 @@
 
 #include "proton_handler.hpp"
 
-struct pn_session_t;
-struct pn_event_t;
-struct pn_reactor_t;
 struct pn_record_t;
-struct pn_acceptor_t;
+struct pn_link_t;
+struct pn_session_t;
+struct pn_connection_t;
+struct pn_listener_t;
 
 namespace proton {
 
 class proton_handler;
-class reactor;
+class reconnect_timer;
 
 // Base class for C++ classes that are used as proton contexts.
 // Contexts are pn_objects managed by pn reference counts, the C++ value is allocated in-place.
@@ -82,51 +82,53 @@ class context {
     static void *alloc(size_t n);
 };
 
+class listener_context;
+
 // Connection context used by all connections.
 class connection_context : public context {
   public:
-    connection_context() : container(0), default_session(0), link_gen(0) {}
+    connection_context();
+    static connection_context& get(pn_connection_t *c);
 
     class container* container;
     pn_session_t *default_session; // Owned by connection.
     message event_message;      // re-used by messaging_adapter for performance.
     io::link_namer* link_gen;      // Link name generator.
 
-    internal::pn_unique_ptr<proton_handler> handler;
+    messaging_handler* handler;
+    internal::pn_unique_ptr<reconnect_timer> reconnect;
+    listener_context* listener_context_;
     event_loop event_loop_;
-
-    static connection_context& get(pn_connection_t *c) { return ref<connection_context>(id(c)); }
-
-  protected:
-    static context::id id(pn_connection_t*);
-};
-
-void container_context(const reactor&, container&);
-
-class container_context {
-  public:
-    static void set(const reactor& r, container& c);
-    static container& get(pn_reactor_t*);
 };
 
 class listener_context : public context {
   public:
-    static listener_context& get(pn_acceptor_t* c);
-    listener_context() : listen_handler_(0), ssl(false) {}
-    connection_options  get_options() { return listen_handler_->on_accept(); }
-    class listen_handler* listen_handler_;
-    bool ssl;
+    listener_context() : listen_handler_(0) {}
+    static listener_context& get(pn_listener_t* c);
+
+    listen_handler* listen_handler_;
+    internal::pn_unique_ptr<const connection_options> connection_options_;
 };
 
 class link_context : public context {
   public:
+    link_context() : handler(0), credit_window(10), pending_credit(0), auto_accept(true), auto_settle(true), draining(false) {}
     static link_context& get(pn_link_t* l);
-    link_context() : credit_window(10), auto_accept(true), auto_settle(true), draining(false), pending_credit(0) {}
+
+    messaging_handler* handler;
     int credit_window;
+    uint32_t pending_credit;
     bool auto_accept;
     bool auto_settle;
     bool draining;
-    uint32_t pending_credit;
+};
+
+class session_context : public context {
+  public:
+    session_context() : handler(0) {}
+    static session_context& get(pn_session_t* s);
+
+    messaging_handler* handler;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
index 5371eec..d7eb6a0 100644
--- a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
+++ b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp
@@ -39,8 +39,6 @@ class messaging_adapter : public proton_handler
   public:
     messaging_adapter(messaging_handler &delegate) : delegate_(delegate) {}
 
-    void on_reactor_init(proton_event &e);
-    void on_reactor_final(proton_event & e);
     void on_link_flow(proton_event &e);
     void on_delivery(proton_event &e);
     void on_connection_remote_open(proton_event &e);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
new file mode 100644
index 0000000..8c12c02
--- /dev/null
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -0,0 +1,133 @@
+#ifndef PROTON_CPP_PROACTOR_CONTAINERIMPL_H
+#define PROTON_CPP_PROACTOR_CONTAINERIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/fwd.hpp"
+#include "proton/container.hpp"
+#include "proton/connection.hpp"
+#include "proton/connection_options.hpp"
+#include "proton/duration.hpp"
+#include "proton/error_condition.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/receiver.hpp"
+#include "proton/receiver_options.hpp"
+#include "proton/sender.hpp"
+#include "proton/sender_options.hpp"
+
+#include "proton_bits.hpp"
+#include "proton_handler.hpp"
+
+#include <list>
+#include <map>
+#include <string>
+#include <vector>
+
+struct pn_proactor_t;
+struct pn_listener_t;
+struct pn_event_t;
+
+namespace proton {
+
+class container::impl {
+  public:
+    impl(container& c, const std::string& id, messaging_handler* = 0);
+    ~impl();
+    std::string id() const { return id_; }
+    returned<connection> connect(const std::string&, const connection_options&);
+    returned<sender> open_sender(
+        const std::string&, const proton::sender_options &, const connection_options &);
+    returned<receiver> open_receiver(
+        const std::string&, const proton::receiver_options &, const connection_options &);
+    listener listen(const std::string&);
+    listener listen(const std::string&, const connection_options& lh);
+    listener listen(const std::string&, listen_handler& lh);
+    void client_connection_options(const connection_options &);
+    connection_options client_connection_options() const { return client_connection_options_; }
+    void server_connection_options(const connection_options &);
+    connection_options server_connection_options() const { return server_connection_options_; }
+    void sender_options(const proton::sender_options&);
+    class sender_options sender_options() const { return sender_options_; }
+    void receiver_options(const proton::receiver_options&);
+    class receiver_options receiver_options() const { return receiver_options_; }
+    void run();
+    void stop(const error_condition& err);
+    void auto_stop(bool set);
+    void schedule(duration, void_function0&);
+#if PN_CPP_HAS_STD_FUNCTION
+    void schedule(duration, std::function<void()>);
+#endif
+    template <class T> static void set_handler(T s, messaging_handler* h);
+    template <class T> static messaging_handler* get_handler(T s);
+
+  private:
+    pn_listener_t* listen_common_lh(const std::string&);
+    connection connect_common(const std::string&, const connection_options&);
+
+    // Event loop to run in each container thread
+    static void thread(impl&);
+    bool handle(pn_event_t*);
+    void run_timer_jobs();
+
+    container& container_;
+
+    struct scheduled {
+        timestamp time; // duration from epoch for task
+#if PN_CPP_HAS_STD_FUNCTION
+        std::function<void()>  task;
+#else
+        void_function0* task_;
+        void task();
+#endif
+
+        // We want to get to get the *earliest* first so test is "reversed"
+        bool operator < (const scheduled& r) const { return  r.time < time; }
+    };
+    std::vector<scheduled> deferred_; // This vector is kept as a heap
+
+    pn_proactor_t* proactor_;
+    messaging_handler* handler_;
+    std::string id_;
+    connection_options client_connection_options_;
+    connection_options server_connection_options_;
+    proton::sender_options sender_options_;
+    proton::receiver_options receiver_options_;
+
+    proton::error_condition stop_err_;
+    bool auto_stop_;
+    bool stopping_;
+};
+
+template <class T>
+void container::impl::set_handler(T s, messaging_handler* mh) {
+    internal::set_messaging_handler(s, mh);
+}
+
+template <class T>
+messaging_handler* container::impl::get_handler(T s) {
+    return internal::get_messaging_handler(s);
+}
+
+
+}
+
+#endif  /*!PROTON_CPP_PROACTOR_CONTAINERIMPL_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
new file mode 100644
index 0000000..8fa7acf
--- /dev/null
+++ b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
@@ -0,0 +1,54 @@
+#ifndef PROTON_CPP_EVENT_LOOP_IMPL_HPP
+#define PROTON_CPP_EVENT_LOOP_IMPL_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/fwd.hpp"
+
+struct pn_connection_t;
+
+namespace proton {
+
+class event_loop::impl {
+  public:
+    impl(pn_connection_t*);
+
+    bool inject(void_function0& f);
+#if PN_CPP_HAS_STD_FUNCTION
+    bool inject(std::function<void()> f);
+    typedef std::vector<std::function<void()> > jobs;
+#else
+    typedef std::vector<void_function0*> jobs;
+#endif
+
+
+    void run_all_jobs();
+    void finished();
+
+    jobs jobs_;
+    pn_connection_t* connection_;
+    bool finished_;
+};
+
+}
+
+#endif // PROTON_CPP_EVENT_LOOP_IMPL_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proton_bits.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp
index 53f2230..e72f343 100644
--- a/proton-c/bindings/cpp/src/include/proton_bits.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp
@@ -24,6 +24,8 @@
 #include <string>
 #include <iosfwd>
 
+#include "contexts.hpp"
+
 /**@file
  *
  * Assorted internal proton utilities.
@@ -65,6 +67,7 @@ class terminus;
 class source;
 class target;
 class reactor;
+class messaging_handler;
 
 std::string error_str(long code);
 
@@ -127,12 +130,19 @@ public:
     static typename wrapped<T>::type* unwrap(const T& t) { return t.pn_object(); }
 };
 
-// Get attachments for various proton-c types
+template <class T> struct context {};
+template <> struct context<link> {typedef link_context type; };
+template <> struct context<receiver> {typedef link_context type; };
+template <> struct context<sender> {typedef link_context type; };
+template <> struct context<session> {typedef session_context type; };
+template <> struct context<connection> {typedef connection_context type; };
+
+template <class T>
+inline void set_messaging_handler(T t, messaging_handler* mh) { context<T>::type::get(factory<T>::unwrap(t)).handler = mh; }
+
 template <class T>
-inline pn_record_t* get_attachments(T*);
+inline messaging_handler* get_messaging_handler(T* t) { return context<typename internal::wrapper<T>::type>::type::get(t).handler; }
 
-template <> inline pn_record_t* get_attachments(pn_session_t* s) { return pn_session_attachments(s); }
-template <> inline pn_record_t* get_attachments(pn_link_t* l) { return pn_link_attachments(l); }
 }
 
 template <class T>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proton_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proton_event.hpp b/proton-c/bindings/cpp/src/include/proton_event.hpp
index 374da85..be324e7 100644
--- a/proton-c/bindings/cpp/src/include/proton_event.hpp
+++ b/proton-c/bindings/cpp/src/include/proton_event.hpp
@@ -266,23 +266,12 @@ class proton_event
     };
     ///@}
 
-    proton_event(pn_event_t *ce, class container* cont) :
-      pn_event_(ce),
-      container_(cont)
+    proton_event(pn_event_t *ce) :
+      pn_event_(ce)
     {}
 
     pn_event_t* pn_event() const { return pn_event_; }
 
-    /** Return a reference to the container, throws proton::error if there is none. */
-    class container& container() const {
-        if (!container_)
-            throw proton::error("event does not have a container");
-        return *container_;
-    }
-
-    /** Return a pointer to the container if there is one, NULL otherwise. */
-    class container* container_ptr() const { return container_; }
-
     /// Get type of event
     event_type type() const { return event_type(pn_event_type(pn_event_)); }
 
@@ -290,7 +279,6 @@ class proton_event
 
   private:
     pn_event_t *pn_event_;
-    class container* container_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/test_dummy_container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/test_dummy_container.hpp b/proton-c/bindings/cpp/src/include/test_dummy_container.hpp
deleted file mode 100644
index daed435..0000000
--- a/proton-c/bindings/cpp/src/include/test_dummy_container.hpp
+++ /dev/null
@@ -1,82 +0,0 @@
-#ifndef TEST_DUMMY_CONTAINER_HPP
-#define TEST_DUMMY_CONTAINER_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "proton/container.hpp"
-#include "proton/event_loop.hpp"
-#include "proton/thread_safe.hpp"
-
-namespace test {
-
-using namespace proton;
-
-class dummy_container : public standard_container {
-  public:
-    dummy_container(const std::string cid="") :
-        id_(cid), fail("not implemented for dummy_container") {}
-
-    // Pull in base class functions here so that name search finds all the overloads
-    using standard_container::stop;
-    using standard_container::connect;
-    using standard_container::listen;
-    using standard_container::open_receiver;
-    using standard_container::open_sender;
-
-    returned<connection> connect(const std::string&, const connection_options&) { throw fail; }
-    listener listen(const std::string& , listen_handler& ) { throw fail; }
-    void stop_listening(const std::string&) { throw fail; }
-    void run() { throw fail; }
-    void auto_stop(bool) { throw fail; }
-    void stop(const proton::error_condition& ) { throw fail; }
-    returned<sender> open_sender(const std::string &, const proton::sender_options &, const connection_options&) { throw fail; }
-    returned<receiver> open_receiver( const std::string &, const proton::receiver_options &, const connection_options &) { throw fail; }
-    std::string id() const { return id_; }
-    void client_connection_options(const connection_options &o) { ccopts_ = o; }
-    connection_options client_connection_options() const { return ccopts_; }
-    void server_connection_options(const connection_options &o) { scopts_ = o; }
-    connection_options server_connection_options() const { return scopts_; }
-    void sender_options(const class sender_options &o) { sopts_ = o; }
-    class sender_options sender_options() const { return sopts_; }
-    void receiver_options(const class receiver_options &o) { ropts_ = o; }
-    class receiver_options receiver_options() const { return ropts_; }
-#if PN_CPP_HAS_STD_FUNCTION
-    void schedule(duration, std::function<void()>) { throw fail; }
-#endif
-    void schedule(duration, void_function0&) { throw fail; }
-
-  private:
-    std::string id_;
-    connection_options ccopts_, scopts_;
-    class sender_options sopts_;
-    class receiver_options ropts_;
-    std::runtime_error fail;
-};
-
-class dummy_event_loop : public event_loop {
-#if PN_CPP_HAS_CPP11
-    bool inject(std::function<void()> f) PN_CPP_OVERRIDE { f(); return true; }
-#endif
-    bool inject(proton::void_function0& h) PN_CPP_OVERRIDE { h(); return true; }
-};
-
-}
-
-#endif // TEST_DUMMY_CONTAINER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/io/connection_driver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp
index da8c2a4..d7c5e5c 100644
--- a/proton-c/bindings/cpp/src/io/connection_driver.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp
@@ -47,23 +47,12 @@ void connection_driver::init() {
     }
 }
 
-connection_driver::connection_driver() : handler_(0), container_(0) { init(); }
+connection_driver::connection_driver() : handler_(0) { init(); }
 
-connection_driver::connection_driver(class container& cont) : handler_(0), container_(&cont) {
+connection_driver::connection_driver(const std::string& id) : container_id_(id), handler_(0) {
     init();
-    connection_context& ctx = connection_context::get(unwrap(connection()));
-    ctx.container = container_;
 }
 
-#if PN_CPP_HAS_RVALUE_REFERENCES
-connection_driver::connection_driver(class container& cont, event_loop&& loop) : handler_(0), container_(&cont) {
-    init();
-    connection_context& ctx = connection_context::get(unwrap(connection()));
-    ctx.container = container_;
-    ctx.event_loop_ = loop.impl_.get();
-}
-#endif
-
 connection_driver::~connection_driver() {
     pn_connection_driver_destroy(&driver_);
 }
@@ -79,10 +68,7 @@ void connection_driver::configure(const connection_options& opts, bool server) {
 
 void connection_driver::connect(const connection_options& opts) {
     connection_options all;
-    if (container_) {
-        all.container_id(container_->id());
-        all.update(container_->client_connection_options());
-    }
+    all.container_id(container_id_);
     all.update(opts);
     configure(all, false);
     connection().open();
@@ -90,10 +76,7 @@ void connection_driver::connect(const connection_options& opts) {
 
 void connection_driver::accept(const connection_options& opts) {
     connection_options all;
-    if (container_) {
-        all.container_id(container_->id());
-        all.update(container_->server_connection_options());
-    }
+    all.container_id(container_id_);
     all.update(opts);
     configure(all, true);
 }
@@ -105,7 +88,7 @@ bool connection_driver::has_events() const {
 bool connection_driver::dispatch() {
     pn_event_t* c_event;
     while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) {
-        proton_event cpp_event(c_event, container_);
+        proton_event cpp_event(c_event);
         try {
             if (handler_ != 0) {
                 messaging_adapter adapter(*handler_);
@@ -163,8 +146,4 @@ proton::transport connection_driver::transport() const {
     return make_wrapper(driver_.transport);
 }
 
-proton::container* connection_driver::container() const {
-    return container_;
-}
-
 }}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/listener.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/listener.cpp b/proton-c/bindings/cpp/src/listener.cpp
index 2639f5e..a9ca53d 100644
--- a/proton-c/bindings/cpp/src/listener.cpp
+++ b/proton-c/bindings/cpp/src/listener.cpp
@@ -18,12 +18,15 @@
  */
 
 #include "proton/listener.hpp"
-#include "proton/container.hpp"
+
+#include <proton/listener.h>
+
+#include "contexts.hpp"
 
 namespace proton {
 
-listener::listener() : container_(0) {}
-listener::listener(container& c, const std::string& u) : url_(u), container_(&c) {}
-void listener::stop() { if (container_) container_->stop_listening(url_); }
+listener::listener(): listener_(0) {}
+listener::listener(pn_listener_t* l) : listener_(l) {}
+void listener::stop() { if (listener_) pn_listener_close(listener_); }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index a70703e..613808b 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -58,16 +58,6 @@ void credit_topup(pn_link_t *link) {
 }
 }
 
-void messaging_adapter::on_reactor_init(proton_event &pe) {
-    container* c = pe.container_ptr();
-    if (c) delegate_.on_container_start(*c);
-}
-
-void messaging_adapter::on_reactor_final(proton_event &pe) {
-    container* c = pe.container_ptr();
-    if (c) delegate_.on_container_stop(*c);
-}
-
 void messaging_adapter::on_link_flow(proton_event &pe) {
     pn_event_t *pne = pe.pn_event();
     pn_link_t *lnk = pn_event_link(pne);
@@ -281,24 +271,17 @@ void messaging_adapter::on_link_local_open(proton_event &pe) {
 
 void messaging_adapter::on_link_remote_open(proton_event &pe) {
     pn_link_t *lnk = pn_event_link(pe.pn_event());
-    container* c = pe.container_ptr();
     if (pn_link_is_receiver(lnk)) {
       receiver r(make_wrapper<receiver>(lnk));
       delegate_.on_receiver_open(r);
       if (is_local_unititialised(pn_link_state(lnk))) {
-          if (c)
-              r.open(c->receiver_options());
-          else
-              r.open();
+          r.open(r.connection().receiver_options());
       }
     } else {
       sender s(make_wrapper<sender>(lnk));
       delegate_.on_sender_open(s);
       if (is_local_unititialised(pn_link_state(lnk))) {
-          if (c)
-              s.open(c->sender_options());
-          else
-              s.open();
+          s.open(s.connection().sender_options());
       }
     }
     credit_topup(lnk);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
new file mode 100644
index 0000000..2b6b1de
--- /dev/null
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proactor_container_impl.hpp"
+#include "proactor_event_loop_impl.hpp"
+
+#include "proton/error_condition.hpp"
+#include "proton/function.hpp"
+#include "proton/listener.hpp"
+#include "proton/listen_handler.hpp"
+#include "proton/thread_safe.hpp"
+#include "proton/url.hpp"
+
+#include "proton/connection.h"
+#include "proton/listener.h"
+#include "proton/proactor.h"
+#include "proton/transport.h"
+
+#include "contexts.hpp"
+#include "messaging_adapter.hpp"
+#include "proton_bits.hpp"
+#include "proton_event.hpp"
+
+#include <assert.h>
+
+#include <algorithm>
+#include <vector>
+
+namespace proton {
+
+event_loop::impl::impl(pn_connection_t* c)
+    : connection_(c), finished_(false)
+{}
+
+void event_loop::impl::finished() {
+    finished_ = true;
+}
+
+#if PN_CPP_HAS_STD_FUNCTION
+bool event_loop::impl::inject(std::function<void()> f) {
+    // Note this is an unbounded work queue.
+    // A resource-safe implementation should be bounded.
+    if (finished_)
+         return false;
+    jobs_.push_back(f);
+    pn_connection_wake(connection_);
+    return true;
+}
+
+bool event_loop::impl::inject(proton::void_function0& f) {
+    return inject([&f]() { f(); });
+}
+
+void event_loop::impl::run_all_jobs() {
+    decltype(jobs_) j;
+    {
+        std::swap(j, jobs_);
+    }
+    // Run queued work, but ignore any exceptions
+    for (auto& f : j) try {
+        f();
+    } catch (...) {};
+}
+#else
+bool event_loop::impl::inject(proton::void_function0& f) {
+    // Note this is an unbounded work queue.
+    // A resource-safe implementation should be bounded.
+    if (finished_)
+         return false;
+    jobs_.push_back(&f);
+    pn_connection_wake(connection_);
+    return true;
+}
+
+void event_loop::impl::run_all_jobs() {
+    // Run queued work, but ignore any exceptions
+    for (event_loop::impl::jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try {
+        (**f)();
+    } catch (...) {};
+    jobs_.clear();
+    return;
+}
+#endif
+container::impl::impl(container& c, const std::string& id, messaging_handler* mh)
+    : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
+      auto_stop_(true), stopping_(false)
+{}
+
+container::impl::~impl() {
+    try {
+        stop(error_condition("exception", "container shut-down"));
+        //wait();
+    } catch (...) {}
+    pn_proactor_free(proactor_);
+}
+
+proton::connection container::impl::connect_common(
+    const std::string& addr,
+    const proton::connection_options& user_opts)
+{
+    if (stopping_)
+        throw proton::error("container is stopping");
+
+    connection_options opts = client_connection_options_; // Defaults
+    opts.update(user_opts);
+    messaging_handler* mh = opts.handler();
+
+    proton::url url(addr);
+    pn_connection_t *pnc = pn_connection();
+    connection_context& cc(connection_context::get(pnc));
+    cc.container = &container_;
+    cc.handler = mh;
+    cc.event_loop_ = new event_loop::impl(pnc);
+
+    pn_connection_set_container(pnc, id_.c_str());
+    pn_connection_set_hostname(pnc, url.host().c_str());
+    if (!url.user().empty())
+        pn_connection_set_user(pnc, url.user().c_str());
+    if (!url.password().empty())
+        pn_connection_set_password(pnc, url.password().c_str());
+
+    connection conn = make_wrapper(pnc);
+    conn.open(opts);
+    // Figure out correct string len then create connection address
+    int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str());
+    std::vector<char> caddr(len+1);
+    pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str());
+    pn_proactor_connect(proactor_, pnc, &caddr[0]);
+    return conn;
+}
+
+proton::returned<proton::connection> container::impl::connect(
+    const std::string& addr,
+    const proton::connection_options& user_opts)
+{
+    connection conn = connect_common(addr, user_opts);
+    return make_thread_safe(conn);
+}
+
+returned<sender> container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) {
+    proton::sender_options lopts(sender_options_);
+    lopts.update(o1);
+    connection conn = connect_common(url, o2);
+
+    return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts));
+}
+
+returned<receiver> container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) {
+    proton::receiver_options lopts(receiver_options_);
+    lopts.update(o1);
+    connection conn = connect_common(url, o2);
+
+    return make_thread_safe(
+        conn.default_session().open_receiver(proton::url(url).path(), lopts));
+}
+
+pn_listener_t* container::impl::listen_common_lh(const std::string& addr) {
+    if (stopping_)
+        throw proton::error("container is stopping");
+
+    proton::url url(addr);
+
+    // Figure out correct string len then create connection address
+    int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str());
+    std::vector<char> caddr(len+1);
+    pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str());
+
+    pn_listener_t* listener = pn_listener();
+    pn_proactor_listen(proactor_, listener, &caddr[0], 16);
+    return listener;
+}
+
+proton::listener container::impl::listen(const std::string& addr) {
+    pn_listener_t* listener = listen_common_lh(addr);
+    return proton::listener(listener);
+}
+
+proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) {
+    pn_listener_t* listener = listen_common_lh(addr);
+    listener_context& lc=listener_context::get(listener);
+    lc.connection_options_.reset(new connection_options(opts));
+    return proton::listener(listener);
+}
+
+proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) {
+    pn_listener_t* listener = listen_common_lh(addr);
+    listener_context& lc=listener_context::get(listener);
+    lc.listen_handler_ = &lh;
+    return proton::listener(listener);
+}
+
+#if PN_CPP_HAS_STD_FUNCTION
+void container::impl::schedule(duration delay, void_function0& f) {
+    schedule(delay, [&f](){ f(); } );
+}
+
+void container::impl::schedule(duration delay, std::function<void()> f) {
+    // Set timeout
+    pn_proactor_set_timeout(proactor_, delay.milliseconds());
+
+    // Record timeout; Add callback to timeout sorted list
+    deferred_.emplace_back(scheduled{timestamp::now()+delay, f});
+    std::push_heap(deferred_.begin(), deferred_.end());
+}
+#else
+void container::impl::scheduled::task() {(*task_)();}
+
+void container::impl::schedule(duration delay, void_function0& f) {
+    // Set timeout
+    pn_proactor_set_timeout(proactor_, delay.milliseconds());
+
+    // Record timeout; Add callback to timeout sorted list
+    scheduled s={timestamp::now()+delay, &f};
+    deferred_.push_back(s);
+    std::push_heap(deferred_.begin(), deferred_.end());
+}
+#endif
+
+void container::impl::client_connection_options(const connection_options &opts) {
+    client_connection_options_ = opts;
+}
+
+void container::impl::server_connection_options(const connection_options &opts) {
+    server_connection_options_ = opts;
+}
+
+void container::impl::sender_options(const proton::sender_options &opts) {
+    sender_options_ = opts;
+}
+
+void container::impl::receiver_options(const proton::receiver_options &opts) {
+    receiver_options_ = opts;
+}
+
+void container::impl::run_timer_jobs() {
+    // Check head of timer queue
+    timestamp now = timestamp::now();
+    scheduled* next = &deferred_.front();
+
+    // So every scheduled element that has past run and remove head
+    while ( next->time<=now ) {
+        next->task();
+        std::pop_heap(deferred_.begin(), deferred_.end());
+        deferred_.pop_back();
+        // If there are no more scheduled items finish now
+        if  ( deferred_.size()==0 ) return;
+        next = &deferred_.front();
+    };
+
+    // To get here we know we must have at least one more thing scheduled
+    pn_proactor_set_timeout(proactor_, (next->time-now).milliseconds());
+}
+
+bool container::impl::handle(pn_event_t* event) {
+
+    // If we have any pending connection work, do it now
+    pn_connection_t* c = pn_event_connection(event);
+    if (c) {
+        event_loop::impl* loop = connection_context::get(c).event_loop_.impl_.get();
+        loop->run_all_jobs();
+    }
+
+    // Process events that shouldn't be sent to messaging_handler
+    switch (pn_event_type(event)) {
+
+    case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
+        return auto_stop_;
+
+    // We never interrupt the proactor so ignore
+    case PN_PROACTOR_INTERRUPT:
+        return false;
+
+    case PN_PROACTOR_TIMEOUT:
+        // Maybe we got a timeout and have nothing scheduled (not sure if this is possible)
+        if  ( deferred_.size()==0 ) return false;
+
+        run_timer_jobs();
+        return false;
+
+    case PN_LISTENER_OPEN:
+        return false;
+
+    case PN_LISTENER_ACCEPT: {
+        pn_listener_t* l = pn_event_listener(event);
+        pn_connection_t* c = pn_connection();
+        listener_context &lc(listener_context::get(l));
+        pn_connection_set_container(c, id_.c_str());
+        connection_options opts = server_connection_options_;
+        if (lc.listen_handler_) {
+            listener lstr(l);
+            opts.update(lc.listen_handler_->on_accept(lstr));
+        }
+        else if (!!lc.connection_options_) opts.update(*lc.connection_options_);
+        lc.connection_options_.reset(new connection_options(opts));
+        // Handler applied separately
+        connection_context& cc = connection_context::get(c);
+        cc.container = &container_;
+        cc.listener_context_ = &lc;
+        cc.handler = opts.handler();
+        cc.event_loop_ = new event_loop::impl(c);
+        pn_listener_accept(l, c);
+        return false;
+    }
+    case PN_LISTENER_CLOSE: {
+        pn_listener_t* l = pn_event_listener(event);
+        listener_context &lc(listener_context::get(l));
+        listener lstnr(l);
+        if (lc.listen_handler_) {
+            pn_condition_t* c = pn_listener_condition(l);
+            if (pn_condition_is_set(c)) {
+                lc.listen_handler_->on_error(lstnr, make_wrapper(c).what());
+            }
+            lc.listen_handler_->on_close(lstnr);
+        }
+        return false;
+    }
+    // If the event was just connection wake then there isn't anything more to do
+    case PN_CONNECTION_WAKE:
+        return false;
+
+    // Connection driver will bind a new transport to the connection at this point
+    case PN_CONNECTION_INIT:
+        return false;
+
+    case PN_CONNECTION_BOUND: {
+        // Need to apply post bind connection options
+        pn_connection_t* c = pn_event_connection(event);
+        connection conn = make_wrapper(c);
+        connection_context& cc = connection_context::get(c);
+        if (cc.listener_context_) {
+            cc.listener_context_->connection_options_->apply_bound(conn);
+        } else {
+            client_connection_options_.apply_bound(conn);
+        }
+
+        return false;
+    }
+    default:
+        break;
+    }
+
+    // Figure out the handler for the primary object for event
+    messaging_handler* mh = 0;
+
+    // First try for a link (send/receiver) handler
+    pn_link_t *link = pn_event_link(event);
+    if (link) mh = get_handler(link);
+
+    // Try for session handler if no link handler
+    pn_session_t *session = pn_event_session(event);
+    if (session && !mh) mh = get_handler(session);
+
+    // Try for connection handler if none of the above
+    pn_connection_t *connection = pn_event_connection(event);
+    if (connection && !mh) mh = get_handler(connection);
+
+    // Use container handler if nothing more specific (must be a container handler)
+    if (!mh) mh = handler_;
+
+    // If we still have no handler don't do anything!
+    // This is pretty unusual, but possible if we use the default constructor for container
+    if (!mh) return false;
+
+    // TODO: Currently create a throwaway messaging_adapter and proton_event so we can call dispatch, a bit inefficient
+    messaging_adapter ma(*mh);
+    proton_event pe(event);
+    pe.dispatch(ma);
+    return false;
+}
+
+void container::impl::thread(container::impl& ci) {
+  bool finished = false;
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(ci.proactor_);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      finished = ci.handle(e) || finished;
+    }
+    pn_proactor_done(ci.proactor_, events);
+  } while(!finished);
+}
+
+void container::impl::run() {
+    // Have to "manually" generate container events
+    if (handler_) handler_->on_container_start(container_);
+    thread(*this);
+    if (handler_) handler_->on_container_stop(container_);
+}
+
+void container::impl::auto_stop(bool set) {
+    auto_stop_ = set;
+}
+
+void container::impl::stop(const proton::error_condition& err) {
+    auto_stop_ = true;
+    stopping_ = true;
+    pn_condition_t* error_condition = pn_condition();
+    set_error_condition(err, error_condition);
+    pn_proactor_disconnect(proactor_, error_condition);
+    pn_condition_free(error_condition);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index 68d55d0..b7239a5 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -34,7 +34,6 @@
 #include <proton/session.h>
 #include <proton/link.h>
 #include <proton/event.h>
-#include <proton/reactor.h>
 
 namespace proton {
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/receiver_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp
index 4a4d80f..2b134bc 100644
--- a/proton-c/bindings/cpp/src/receiver_options.cpp
+++ b/proton-c/bindings/cpp/src/receiver_options.cpp
@@ -27,7 +27,7 @@
 #include <proton/link.h>
 
 #include "contexts.hpp"
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
 #include "messaging_adapter.hpp"
 #include "proton_bits.hpp"
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/reconnect_timer.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp b/proton-c/bindings/cpp/src/reconnect_timer.cpp
index c63f8a1..a299b0e 100644
--- a/proton-c/bindings/cpp/src/reconnect_timer.cpp
+++ b/proton-c/bindings/cpp/src/reconnect_timer.cpp
@@ -23,7 +23,6 @@
 #include "proton/error.hpp"
 #include "msg.hpp"
 #include <proton/types.h>
-#include <proton/reactor.h>
 
 namespace proton {
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/sender_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp
index 4f501e6..9305666 100644
--- a/proton-c/bindings/cpp/src/sender_options.cpp
+++ b/proton-c/bindings/cpp/src/sender_options.cpp
@@ -24,7 +24,7 @@
 #include "proton/source_options.hpp"
 #include "proton/target_options.hpp"
 
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
 #include "contexts.hpp"
 #include "messaging_adapter.hpp"
 #include "proton_bits.hpp"

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/session_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/session_options.cpp b/proton-c/bindings/cpp/src/session_options.cpp
index 2147fd4..fc03ebb 100644
--- a/proton-c/bindings/cpp/src/session_options.cpp
+++ b/proton-c/bindings/cpp/src/session_options.cpp
@@ -27,7 +27,7 @@
 #include <proton/session.h>
 
 #include "messaging_adapter.hpp"
-#include "container_impl.hpp"
+#include "proactor_container_impl.hpp"
 #include "proton_bits.hpp"
 
 namespace proton {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message