qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject [09/20] qpid-proton git commit: PROTON-1400: [C++ binding] Implement container level event_loops
Date Fri, 21 Jul 2017 17:02:05 GMT
PROTON-1400: [C++ binding] Implement container level event_loops


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/570d0a1a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/570d0a1a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/570d0a1a

Branch: refs/heads/master
Commit: 570d0a1aae29adfab861a76f37d2aa9f14488a9a
Parents: ec2364f
Author: Andrew Stitcher <astitcher@apache.org>
Authored: Thu Apr 20 15:20:40 2017 -0400
Committer: Andrew Stitcher <astitcher@apache.org>
Committed: Fri Jul 21 12:50:06 2017 -0400

----------------------------------------------------------------------
 .../bindings/cpp/include/proton/container.hpp   |   1 +
 .../bindings/cpp/include/proton/event_loop.hpp  |   1 +
 proton-c/bindings/cpp/src/event_loop.cpp        |   3 +
 .../cpp/src/include/proactor_container_impl.hpp |  10 ++
 .../src/include/proactor_event_loop_impl.hpp    |  23 +--
 .../cpp/src/proactor_container_impl.cpp         | 156 ++++++++++++++-----
 6 files changed, 139 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index be83e5e..0262e0f 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -224,6 +224,7 @@ class PN_CPP_CLASS_EXTERN container {
   friend class session_options;
   friend class receiver_options;
   friend class sender_options;
+  friend class event_loop;
 };
 
 } // proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/include/proton/event_loop.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/event_loop.hpp b/proton-c/bindings/cpp/include/proton/event_loop.hpp
index f49d211..6d1646e 100644
--- a/proton-c/bindings/cpp/include/proton/event_loop.hpp
+++ b/proton-c/bindings/cpp/include/proton/event_loop.hpp
@@ -50,6 +50,7 @@ class PN_CPP_CLASS_EXTERN event_loop {
   public:
     /// Create event_loop
     PN_CPP_EXTERN event_loop();
+    PN_CPP_EXTERN event_loop(container&);
 
     PN_CPP_EXTERN ~event_loop();
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/event_loop.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/event_loop.cpp b/proton-c/bindings/cpp/src/event_loop.cpp
index ab39aa7..5320011 100644
--- a/proton-c/bindings/cpp/src/event_loop.cpp
+++ b/proton-c/bindings/cpp/src/event_loop.cpp
@@ -20,6 +20,7 @@
 #include "proton/event_loop.hpp"
 
 #include "contexts.hpp"
+#include "proactor_container_impl.hpp"
 #include "proactor_event_loop_impl.hpp"
 
 #include <proton/session.h>
@@ -28,6 +29,8 @@
 namespace proton {
 
 event_loop::event_loop() {}
+event_loop::event_loop(container& c) { *this = container::impl::make_event_loop(c); }
+
 event_loop::~event_loop() {}
 
 event_loop& event_loop::operator=(impl* i) { impl_.reset(i); return *this; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
index 859493d..4b84a6e 100644
--- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -28,6 +28,7 @@
 #include "proton/connection_options.hpp"
 #include "proton/duration.hpp"
 #include "proton/error_condition.hpp"
+#include "proton/event_loop.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/receiver.hpp"
 #include "proton/receiver_options.hpp"
@@ -38,6 +39,7 @@
 
 #include <list>
 #include <map>
+#include <set>
 #include <string>
 #include <vector>
 
@@ -77,8 +79,12 @@ class container::impl {
 #endif
     template <class T> static void set_handler(T s, messaging_handler* h);
     template <class T> static messaging_handler* get_handler(T s);
+    static event_loop::impl* make_event_loop(container&);
 
   private:
+    class common_event_loop;
+    class connection_event_loop;
+    class container_event_loop;
     pn_listener_t* listen_common_lh(const std::string&);
     connection connect_common(const std::string&, const connection_options&);
 
@@ -89,6 +95,10 @@ class container::impl {
 
     container& container_;
 
+    typedef std::set<container_event_loop*> event_loops;
+    event_loops event_loops_;
+    container_event_loop* add_event_loop();
+    void remove_event_loop(container_event_loop*);
     struct scheduled {
         timestamp time; // duration from epoch for task
 #if PN_CPP_HAS_STD_FUNCTION

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
index 8fa7acf..82ec129 100644
--- a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp
@@ -23,30 +23,19 @@
  */
 
 #include "proton/fwd.hpp"
-
-struct pn_connection_t;
+#include "proton/internal/config.hpp"
 
 namespace proton {
 
 class event_loop::impl {
   public:
-    impl(pn_connection_t*);
-
-    bool inject(void_function0& f);
+    virtual ~impl() {};
+    virtual bool inject(void_function0& f) = 0;
 #if PN_CPP_HAS_STD_FUNCTION
-    bool inject(std::function<void()> f);
-    typedef std::vector<std::function<void()> > jobs;
-#else
-    typedef std::vector<void_function0*> jobs;
+    virtual bool inject(std::function<void()> f) = 0;
 #endif
-
-
-    void run_all_jobs();
-    void finished();
-
-    jobs jobs_;
-    pn_connection_t* connection_;
-    bool finished_;
+    virtual void run_all_jobs() = 0;
+    virtual void finished() = 0;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/570d0a1a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 2486e2b..4d526f2 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -43,30 +43,25 @@
 
 namespace proton {
 
-event_loop::impl::impl(pn_connection_t* c)
-    : connection_(c), finished_(false)
-{}
-
-void event_loop::impl::finished() {
-    finished_ = true;
-}
+class container::impl::common_event_loop : public event_loop::impl {
+  public:
+    common_event_loop(): finished_(false) {}
 
 #if PN_CPP_HAS_STD_FUNCTION
-bool event_loop::impl::inject(std::function<void()> f) {
-    // Note this is an unbounded work queue.
-    // A resource-safe implementation should be bounded.
-    if (finished_)
-         return false;
-    jobs_.push_back(f);
-    pn_connection_wake(connection_);
-    return true;
-}
+    typedef std::vector<std::function<void()> > jobs;
+#else
+    typedef std::vector<void_function0*> jobs;
+#endif
 
-bool event_loop::impl::inject(proton::void_function0& f) {
-    return inject([&f]() { f(); });
-}
+    void run_all_jobs();
+    void finished() { finished_ = true; }
 
-void event_loop::impl::run_all_jobs() {
+    jobs jobs_;
+    bool finished_;
+};
+
+#if PN_CPP_HAS_STD_FUNCTION
+void container::impl::common_event_loop::run_all_jobs() {
     decltype(jobs_) j;
     {
         std::swap(j, jobs_);
@@ -77,25 +72,93 @@ void event_loop::impl::run_all_jobs() {
     } catch (...) {};
 }
 #else
-bool event_loop::impl::inject(proton::void_function0& f) {
+void container::impl::common_event_loop::run_all_jobs() {
+    // Run queued work, but ignore any exceptions
+    for (jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try {
+        (**f)();
+    } catch (...) {};
+    jobs_.clear();
+    return;
+}
+#endif
+
+class container::impl::connection_event_loop : public common_event_loop {
+  public:
+    connection_event_loop(pn_connection_t* c): connection_(c) {}
+
+    bool inject(void_function0& f);
+#if PN_CPP_HAS_STD_FUNCTION
+    bool inject(std::function<void()> f);
+#endif
+
+    pn_connection_t* connection_;
+};
+
+#if PN_CPP_HAS_STD_FUNCTION
+bool container::impl::connection_event_loop::inject(std::function<void()> f) {
+    // Note this is an unbounded work queue.
+    // A resource-safe implementation should be bounded.
+    if (finished_) return false;
+    jobs_.emplace_back(std::move(f));
+    pn_connection_wake(connection_);
+    return true;
+}
+
+bool container::impl::connection_event_loop::inject(proton::void_function0& f) {
+    return inject([&f]() { f(); });
+}
+#else
+bool container::impl::connection_event_loop::inject(proton::void_function0& f) {
     // Note this is an unbounded work queue.
     // A resource-safe implementation should be bounded.
-    if (finished_)
-         return false;
+    if (finished_) return false;
     jobs_.push_back(&f);
     pn_connection_wake(connection_);
     return true;
 }
+#endif
 
-void event_loop::impl::run_all_jobs() {
-    // Run queued work, but ignore any exceptions
-    for (event_loop::impl::jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try {
-        (**f)();
-    } catch (...) {};
-    jobs_.clear();
-    return;
+class container::impl::container_event_loop : public common_event_loop {
+  public:
+    container_event_loop(container::impl& c): container_(c) {}
+    ~container_event_loop() { container_.remove_event_loop(this); }
+
+    bool inject(void_function0& f);
+#if PN_CPP_HAS_STD_FUNCTION
+    bool inject(std::function<void()> f);
+#endif
+
+    container::impl& container_;
+};
+
+#if PN_CPP_HAS_STD_FUNCTION
+bool container::impl::container_event_loop::inject(std::function<void()> f) {
+    // Note this is an unbounded work queue.
+    // A resource-safe implementation should be bounded.
+    if (finished_) return false;
+    jobs_.emplace_back(std::move(f));
+    pn_proactor_set_timeout(container_.proactor_, 0);
+    return true;
+}
+
+bool container::impl::container_event_loop::inject(proton::void_function0& f) {
+    return inject([&f]() { f(); });
+}
+#else
+bool container::impl::container_event_loop::inject(proton::void_function0& f) {
+    // Note this is an unbounded work queue.
+    // A resource-safe implementation should be bounded.
+    if (finished_) return false;
+    jobs_.push_back(&f);
+    pn_proactor_set_timeout(container_.proactor_, 0);
+    return true;
 }
 #endif
+
+class event_loop::impl* container::impl::make_event_loop(container& c) {
+    return c.impl_->add_event_loop();
+}
+
 container::impl::impl(container& c, const std::string& id, messaging_handler* mh)
     : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id),
       auto_stop_(true), stopping_(false)
@@ -109,6 +172,16 @@ container::impl::~impl() {
     pn_proactor_free(proactor_);
 }
 
+container::impl::container_event_loop* container::impl::add_event_loop() {
+    container_event_loop* c = new container_event_loop(*this);
+    event_loops_.insert(c);
+    return c;
+}
+
+void container::impl::remove_event_loop(container::impl::container_event_loop* l) {
+    event_loops_.erase(l);
+}
+
 proton::connection container::impl::connect_common(
     const std::string& addr,
     const proton::connection_options& user_opts)
@@ -125,7 +198,7 @@ proton::connection container::impl::connect_common(
     connection_context& cc(connection_context::get(pnc));
     cc.container = &container_;
     cc.handler = mh;
-    cc.event_loop_ = new event_loop::impl(pnc);
+    cc.event_loop_ = new container::impl::connection_event_loop(pnc);
 
     pn_connection_set_container(pnc, id_.c_str());
     pn_connection_set_hostname(pnc, url.host().c_str());
@@ -225,7 +298,7 @@ void container::impl::schedule(duration delay, void_function0& f)
{
     pn_proactor_set_timeout(proactor_, delay.milliseconds());
 
     // Record timeout; Add callback to timeout sorted list
-    scheduled s={timestamp::now()+delay, &f};
+    scheduled s = {timestamp::now()+delay, &f};
     deferred_.push_back(s);
     std::push_heap(deferred_.begin(), deferred_.end());
 }
@@ -285,13 +358,20 @@ bool container::impl::handle(pn_event_t* event) {
     case PN_PROACTOR_INTERRUPT:
         return false;
 
-    case PN_PROACTOR_TIMEOUT:
-        // Maybe we got a timeout and have nothing scheduled (not sure if this is possible)
-        if  ( deferred_.size()==0 ) return false;
+    case PN_PROACTOR_TIMEOUT: {
+        // Can get an immediate timeout, if we have a container event loop inject
+        if  ( deferred_.size()>0 ) {
+            run_timer_jobs();
+        }
 
-        run_timer_jobs();
+        // Run every container event loop job
+        // This is not at all efficient and single threads all these jobs, but it does correctly
+        // serialise them
+        for (event_loops::iterator loop = event_loops_.begin(); loop!=event_loops_.end();
++loop) {
+            (*loop)->run_all_jobs();
+        }
         return false;
-
+    }
     case PN_LISTENER_OPEN:
         return false;
 
@@ -312,7 +392,7 @@ bool container::impl::handle(pn_event_t* event) {
         cc.container = &container_;
         cc.listener_context_ = &lc;
         cc.handler = opts.handler();
-        cc.event_loop_ = new event_loop::impl(c);
+        cc.event_loop_ = new container::impl::connection_event_loop(c);
         pn_listener_accept(l, c);
         return false;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message