qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/3] qpid-proton git commit: PROTON-1553: c++ wake() events
Date Mon, 28 Aug 2017 21:57:32 GMT
PROTON-1553: c++  wake() events


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/68c8cf43
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/68c8cf43
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/68c8cf43

Branch: refs/heads/master
Commit: 68c8cf43e51bbe71d7042688fbcda52de1ed5cce
Parents: f1ee268
Author: Alan Conway <aconway@redhat.com>
Authored: Thu Aug 24 15:34:35 2017 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Sat Aug 26 13:20:59 2017 -0400

----------------------------------------------------------------------
 examples/cpp/mt_queue.hpp                       | 102 +++++++++++++++++++
 .../bindings/cpp/include/proton/connection.hpp  |  20 ++++
 .../cpp/include/proton/internal/object.hpp      |   4 +-
 .../cpp/include/proton/messaging_handler.hpp    |  15 ++-
 proton-c/bindings/cpp/src/connection.cpp        |   5 +
 proton-c/bindings/cpp/src/handler.cpp           |   2 +
 proton-c/bindings/cpp/src/messaging_adapter.cpp |   7 ++
 .../cpp/src/proactor_container_impl.cpp         |   6 +-
 8 files changed, 153 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/examples/cpp/mt_queue.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt_queue.hpp b/examples/cpp/mt_queue.hpp
new file mode 100644
index 0000000..f053ebe
--- /dev/null
+++ b/examples/cpp/mt_queue.hpp
@@ -0,0 +1,102 @@
+#ifndef MT_QUEUE_HPP
+#define MT_QUEUE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <condition_variable>
+#include <stdexcept>
+#include <mutex>
+#include <queue>
+
+class closed_error : public std::runtime_error {
+  public:
+    closed_error() : std::runtime_error("closed") {}
+};
+
+// A bounded, thread-safe queue.
+// Objects are moved on and off the queue, not copied. Avoids overhead of copy operations.
+template <class T, size_t CAPACITY> class mt_queue {
+    std::queue<T> q_;
+    std::mutex lock_;
+    std::condition_variable push_;
+    std::condition_variable pop_;
+    bool closed_;
+
+    void do_push(T&& x) {
+        q_.push(std::move(x));
+        pop_.notify_one();
+    }
+
+    T do_pop() {
+        T x(std::move(q_.front()));
+        q_.pop();
+        push_.notify_one();
+        return x;
+    }
+
+    bool can_push() { return q_.size() < CAPACITY; }
+    bool can_pop() { return q_.size() > 0; }
+
+  public:
+
+    mt_queue() : closed_(false) {}
+
+    void push(T&& x) {
+        std::unique_lock<std::mutex> l(lock_);
+        while(!can_push())
+            push_.wait(l);
+        do_push(std::move(x));
+    }
+
+    T pop() {
+        std::unique_lock<std::mutex> l(lock_);
+        while(!can_pop())
+            pop_.wait(l);
+        return do_pop();
+    }
+
+    bool try_push(T&& x) noexcept {
+        std::lock_guard<std::mutex> l(lock_);
+        bool ok = can_push();
+        if (ok)
+            do_push(std::move(x));
+        return ok;
+    }
+
+    bool try_pop(T& x) noexcept {
+        std::lock_guard<std::mutex> l(lock_);
+        bool ok = can_pop();
+        if (ok)
+            x = std::move(do_pop());
+        return ok;
+    }
+
+    size_t capacity() noexcept {
+        return CAPACITY;
+    }
+
+    size_t size() noexcept {
+        std::lock_guard<std::mutex> l(lock_);
+        return q_.size();
+    }
+};
+
+
+#endif // MT_QUEUE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index 10ea61b..58e2afc 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -139,6 +139,26 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>,
publi
     /// @see @ref connection_options::idle_timeout
     PN_CPP_EXTERN uint32_t idle_timeout() const;
 
+    /// **Experimental** - trigger thread-safe call to messaging_handler::on_wake()
+    ///
+    /// *Thread safe*: this is the *only* @ref connection function that can be
+    /// called from outside the handler thread.
+    ///
+    /// messaging_handler::on_wake() will be called on the handler as soon as
+    /// possible after the call to wake(), possibly in a different thread.
+    ///
+    /// @note
+    /// * Multiple calls to wake() may be coalesced into a single call to on_wake()
+    ///   that occurs after all of them.
+    /// * Spurious on_wake() calls can occur even if the application does not call
+    ///   on_wake()
+    ///
+    /// wake() is the primitive building-block for thread-safe applications.
+    /// With C++11 or greater, @ref work_queue provides an easier way execute
+    /// code safely in the handler thread.
+    ///
+    PN_CPP_EXTERN void wake() const;
+
     /// @cond INTERNAL
   friend class internal::factory<connection>;
   friend class container;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/internal/object.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/object.hpp b/proton-c/bindings/cpp/include/proton/internal/object.hpp
index 442b09d..388c65c 100644
--- a/proton-c/bindings/cpp/include/proton/internal/object.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/object.hpp
@@ -85,9 +85,9 @@ template <class T> pn_ptr<T> take_ownership(T* p) { return pn_ptr<T>::take_owner
 /// Base class for proton object types.
 template <class T> class object : private comparable<object<T> > {
   public:
-    bool operator!() const { return !object_; }
+    bool operator!() const { return !object_.get(); }
 #if PN_CPP_HAS_EXPLICIT_CONVERSIONS
-    explicit operator bool() const { return object_; }
+    explicit operator bool() const { return object_.get(); }
 #endif
 
   protected:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index a5e2bdd..0792dba 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -146,11 +146,24 @@ PN_CPP_CLASS_EXTERN messaging_handler {
     /// **Experimental** - The receiving peer has requested a drain of
     /// remaining credit.
     PN_CPP_EXTERN virtual void on_sender_drain_start(sender &s);
-    
+
     /// **Experimental** - The credit outstanding at the time of the
     /// call to receiver::drain has been consumed or returned.
     PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver &r);
 
+    /// *Experimental** - a wakeup event that can be triggered from another thread.
+    ///
+    /// @see connection::wake()
+    ///
+    /// The on_wake() event carries no information about why it was called.
+    /// Recommended use is that the messaging_handler have shared, thread-safe
+    /// members that it examines to decide how/if to respond to the wake.
+    ///
+    /// @note on_wake() can be called internally by proton without any
+    /// application calls to connection::wake()
+    ///
+    PN_CPP_EXTERN virtual void on_wake(connection&);
+
     /// Fallback error handling.
     PN_CPP_EXTERN virtual void on_error(const error_condition &c);
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index a37d3b5..1d66c41 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -40,6 +40,7 @@
 #include <proton/session.h>
 #include <proton/transport.h>
 #include <proton/object.h>
+#include <proton/proactor.h>
 
 namespace proton {
 
@@ -174,4 +175,8 @@ uint32_t connection::idle_timeout() const {
     return pn_transport_get_remote_idle_timeout(pn_connection_transport(pn_object()));
 }
 
+void connection::wake() const {
+    pn_connection_wake(pn_object());
+}
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp
index 84d9e8a..8f7fbe9 100644
--- a/proton-c/bindings/cpp/src/handler.cpp
+++ b/proton-c/bindings/cpp/src/handler.cpp
@@ -85,7 +85,9 @@ void messaging_handler::on_tracker_settle(tracker &) {}
 void messaging_handler::on_delivery_settle(delivery &) {}
 void messaging_handler::on_sender_drain_start(sender &) {}
 void messaging_handler::on_receiver_drain_finish(receiver &) {}
+void messaging_handler::on_wake(connection&) {}
 
 void messaging_handler::on_error(const error_condition& c) { throw proton::error(c.what());
}
 
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index cb1b776..2357cfa 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -296,6 +296,11 @@ void on_transport_closed(messaging_handler& handler, pn_event_t*
event) {
     handler.on_transport_close(t);
 }
 
+void on_wake(messaging_handler& handler, pn_event_t* event) {
+    connection c(make_wrapper(pn_event_connection(event)));
+    handler.on_wake(c);
+}
+
 }
 
 void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event)
@@ -321,6 +326,8 @@ void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t*
event)
 
       case PN_TRANSPORT_CLOSED: on_transport_closed(handler, event); break;
 
+      case PN_CONNECTION_WAKE: on_wake(handler, event); break;
+
       // Ignore everything else
       default: break;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/68c8cf43/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 1389306..b262f8c 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -381,10 +381,6 @@ bool container::impl::handle(pn_event_t* event) {
         }
         return false;
     }
-    // If the event was just connection wake then there isn't anything more to do
-    case PN_CONNECTION_WAKE:
-        return false;
-
     // Connection driver will bind a new transport to the connection at this point
     case PN_CONNECTION_INIT:
         return false;
@@ -443,7 +439,7 @@ void container::impl::thread() {
           finished = handle(e);
           if (finished) break;
         }
-      } catch (proton::error& e) {
+      } catch (std::exception& e) {
         // If we caught an exception then shutdown the (other threads of the) container
         disconnect_error_ = error_condition("exception", e.what());
         if (!stopping_) stop(disconnect_error_);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message