qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [84/89] [abbrv] qpid-proton git commit: PROTON-1847: [C++ binding] Ensure that excessive scheduled events can't starve other events
Date Tue, 03 Jul 2018 22:14:13 GMT
PROTON-1847: [C++ binding] Ensure that excessive scheduled events can't starve other events


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9fd19bcf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9fd19bcf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9fd19bcf

Branch: refs/heads/go1
Commit: 9fd19bcf960dacc046b2f6867c538149eceb8e5c
Parents: 1764c4d
Author: Andrew Stitcher <astitcher@apache.org>
Authored: Wed May 16 20:15:08 2018 -0400
Committer: Andrew Stitcher <astitcher@apache.org>
Committed: Wed May 16 20:17:31 2018 -0400

----------------------------------------------------------------------
 cpp/src/proactor_container_impl.cpp | 80 +++++++++++++++++++++-----------
 cpp/src/proactor_container_impl.hpp |  3 +-
 2 files changed, 54 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fd19bcf/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp
index f865a16..3a89816 100644
--- a/cpp/src/proactor_container_impl.cpp
+++ b/cpp/src/proactor_container_impl.cpp
@@ -459,31 +459,54 @@ void container::impl::receiver_options(const proton::receiver_options
&opts) {
 
 void container::impl::run_timer_jobs() {
     timestamp now = timestamp::now();
+    std::vector<scheduled> tasks;
 
-    // Check head of timer queue
-    for (;;) {
-        work task;
-        {
-            GUARD(deferred_lock_);
-            if  ( deferred_.size()==0 ) return;
+    // We first extract all the runnable tasks and then run them -  this is to avoid having
tasks
+    // injected as we are running them (which could potentially never end)
+    {
+        GUARD(deferred_lock_);
 
-            timestamp next_time = deferred_.front().time;
+        // Figure out how many tasks we need to execute and pop them to the back of the
+        // queue (in reverse order)
+        int i = 0;
+        for (;;) {
+            // Have we seen all the queued tasks?
+            if  ( deferred_.size()-i==0 ) break;
 
-            if (next_time>now) {
+            // Is the next task in the future?
+            timestamp next_time = deferred_.front().time;
+            if ( next_time>now ) {
                 pn_proactor_set_timeout(proactor_, (next_time-now).milliseconds());
-                return;
+                break;
             }
 
-            task = deferred_.front().task;
-            std::pop_heap(deferred_.begin(), deferred_.end());
-            deferred_.pop_back();
+            std::pop_heap(deferred_.begin(), deferred_.end()-i);
+            ++i;
+        }
+        // Nothing to do
+        if ( i==0 ) return;
+
+        // Now we know how many tasks to run
+        if ( deferred_.size()==i ) {
+            // If we sorted the entire heap, then we're executing every task
+            // so don't need to copy and can just swap
+            tasks.swap(deferred_);
+        } else {
+            // Otherwise just copy the ones we sorted
+            tasks = std::vector<scheduled>(deferred_.end()-i, deferred_.end());
+
+            // Remove tasks to be executed
+            deferred_.resize(deferred_.size()-i);
         }
-        task();
     }
+    // We've now taken the tasks to run from the deferred tasks
+    // so we can run them unlocked
+    // NB. We copied the due tasks in reverse order so execute from end
+    for (int i = tasks.size()-1; i>=0; --i) tasks[i].task();
 }
 
 // Return true if this thread is finished
-bool container::impl::handle(pn_event_t* event) {
+container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {
 
     // If we have any pending connection work, do it now
     pn_connection_t* c = pn_event_connection(event);
@@ -498,14 +521,14 @@ bool container::impl::handle(pn_event_t* event) {
     case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
         // If we're stopping interrupt all other threads still running
         if (auto_stop_) pn_proactor_interrupt(proactor_);
-        return false;
+        return ContinueLoop;
 
     // We only interrupt to stop threads
     case PN_PROACTOR_INTERRUPT: {
         // Interrupt any other threads still running
         GUARD(lock_);
         if (threads_>1) pn_proactor_interrupt(proactor_);
-        return true;
+        return EndLoop;
     }
 
     case PN_PROACTOR_TIMEOUT: {
@@ -523,7 +546,7 @@ bool container::impl::handle(pn_event_t* event) {
         for (work_queues::iterator queue = queues.begin(); queue!=queues.end(); ++queue)
{
             (*queue)->run_all_jobs();
         }
-        return false;
+        return EndBatch;
     }
     case PN_LISTENER_OPEN: {
         pn_listener_t* l = pn_event_listener(event);
@@ -537,7 +560,7 @@ bool container::impl::handle(pn_event_t* event) {
             listener lstnr(l);
             handler->on_open(lstnr);
         }
-        return false;
+        return ContinueLoop;
     }
     case PN_LISTENER_ACCEPT: {
         pn_listener_t* l = pn_event_listener(event);
@@ -568,7 +591,7 @@ bool container::impl::handle(pn_event_t* event) {
         pn_transport_set_server(pnt);
         opts.apply_unbound_server(pnt);
         pn_listener_accept2(l, c, pnt);
-        return false;
+        return ContinueLoop;
     }
     case PN_LISTENER_CLOSE: {
         pn_listener_t* l = pn_event_listener(event);
@@ -586,15 +609,15 @@ bool container::impl::handle(pn_event_t* event) {
             }
             handler->on_close(lstnr);
         }
-        return false;
+        return ContinueLoop;
     }
     // Connection driver will bind a new transport to the connection at this point
     case PN_CONNECTION_INIT:
-        return false;
+        return ContinueLoop;
 
     // We've already applied options, so don't need to do it here
     case PN_CONNECTION_BOUND:
-        return false;
+        return ContinueLoop;
 
     case PN_CONNECTION_REMOTE_OPEN: {
         // This is the only event that we get indicating that the connection succeeded so
@@ -618,7 +641,7 @@ bool container::impl::handle(pn_event_t* event) {
             pn_condition_t* tc = pn_transport_condition(t);
             pn_condition_copy(tc, cc);
             pn_connection_close(c);
-            return false;
+            return ContinueLoop;
         }
         break;
     }
@@ -628,7 +651,7 @@ bool container::impl::handle(pn_event_t* event) {
         pn_transport_t* t = pn_event_transport(event);
         // If we successfully schedule a re-connect then hide the event from
         // user handlers by returning here.
-        if (pn_condition_is_set(pn_transport_condition(t)) && setup_reconnect(c))
return false;
+        if (pn_condition_is_set(pn_transport_condition(t)) && setup_reconnect(c))
return ContinueLoop;
         // Otherwise, this connection will be freed by the proactor.
         // Mark its work_queue finished so it won't try to use the freed connection.
         connection_context::get(c).work_queue_.impl_.get()->finished();
@@ -657,10 +680,10 @@ bool container::impl::handle(pn_event_t* event) {
 
     // If we still have no handler don't do anything!
     // This is pretty unusual, but possible if we use the default constructor for container
-    if (!mh) return false;
+    if (!mh) return ContinueLoop;
 
     messaging_adapter::dispatch(*mh, event);
-    return false;
+    return ContinueLoop;
 }
 
 void container::impl::thread() {
@@ -676,8 +699,9 @@ void container::impl::thread() {
         error_condition error;
         try {
             while ((e = pn_event_batch_next(events))) {
-                finished = handle(e);
-                if (finished) break;
+                dispatch_result r = dispatch(e);
+                finished = r==EndLoop;
+                if (r!=ContinueLoop) break;
             }
         } catch (const std::exception& e) {
             // If we caught an exception then shutdown the (other threads of the) container

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fd19bcf/cpp/src/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/cpp/src/proactor_container_impl.hpp b/cpp/src/proactor_container_impl.hpp
index 43b695f..dcc9381 100644
--- a/cpp/src/proactor_container_impl.hpp
+++ b/cpp/src/proactor_container_impl.hpp
@@ -110,7 +110,8 @@ class container::impl {
 
     // Event loop to run in each container thread
     void thread();
-    bool handle(pn_event_t*);
+    enum dispatch_result {ContinueLoop, EndBatch, EndLoop};
+    dispatch_result dispatch(pn_event_t*);
     void run_timer_jobs();
 
     int threads_;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message