qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject qpid-proton git commit: PROTON-1243: Stop pn_reactor_stop from causing an infinite loop when called from within callback
Date Thu, 04 Aug 2016 17:33:08 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/0.14.x 3f9004ca0 -> 43461aa03


PROTON-1243: Stop pn_reactor_stop from causing an infinite loop when called from within callback


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/43461aa0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/43461aa0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/43461aa0

Branch: refs/heads/0.14.x
Commit: 43461aa037ee2da6a5bb2c2e24849d0df1c49452
Parents: 3f9004c
Author: Andrew Stitcher <astitcher@apache.org>
Authored: Thu Jul 28 23:49:27 2016 -0400
Committer: Andrew Stitcher <astitcher@apache.org>
Committed: Thu Aug 4 13:32:55 2016 -0400

----------------------------------------------------------------------
 proton-c/bindings/cpp/src/container_impl.cpp    |  1 +
 proton-c/bindings/cpp/src/container_test.cpp    | 72 ++++++++++++++++----
 proton-c/bindings/python/proton/reactor.py      |  5 +-
 proton-c/src/reactor/reactor.c                  | 28 ++++----
 .../qpid/proton/reactor/impl/ReactorImpl.java   | 11 +--
 5 files changed, 84 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43461aa0/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp
index 52ae42b..f2cad7f 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -354,6 +354,7 @@ void container_impl::run() {
 
 void container_impl::stop(const error_condition&) {
     reactor_.stop();
+    auto_stop_ = true;
 }
 
 void container_impl::auto_stop(bool set) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43461aa0/proton-c/bindings/cpp/src/container_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp
index e6e02a0..1a3e2c4 100644
--- a/proton-c/bindings/cpp/src/container_test.cpp
+++ b/proton-c/bindings/cpp/src/container_test.cpp
@@ -40,6 +40,22 @@ static std::string int2string(int n) {
     return strm.str();
 }
 
+int listen_on_random_port(proton::container& c, proton::listener& l) {
+    int port;
+    // I'm going to hell for this:
+    srand((unsigned int)time(0));
+    while (true) {
+        port = 20000 + (rand() % 30000);
+        try {
+            l = c.listen("0.0.0.0:" + int2string(port));
+            break;
+        } catch (...) {
+            // keep trying
+        }
+    }
+    return port;
+}
+
 class test_handler : public proton::messaging_handler {
   public:
     const std::string host;
@@ -55,19 +71,7 @@ class test_handler : public proton::messaging_handler {
     {}
 
     void on_container_start(proton::container &c) PN_CPP_OVERRIDE {
-        int port;
-
-        // I'm going to hell for this:
-        srand((unsigned int)time(0));
-        while (true) {
-            port = 20000 + (rand() % 30000);
-            try {
-                listener = c.listen("0.0.0.0:" + int2string(port));
-                break;
-            } catch (...) {
-                // keep trying
-            }
-        }
+        int port = listen_on_random_port(c, listener);
         proton::connection conn = c.connect(host + ":" + int2string(port), opts);
     }
 
@@ -143,6 +147,47 @@ int test_container_bad_address() {
     return 0;
 }
 
+class stop_tester : public proton::messaging_handler {
+    proton::listener listener;
+
+    // Set up a listener which would block forever
+    void on_container_start(proton::container& c) PN_CPP_OVERRIDE {
+        ASSERT(state==0);
+        int port = listen_on_random_port(c, listener);
+        c.connect("127.0.0.1:" + int2string(port));
+        c.auto_stop(false);
+        state = 1;
+    }
+
+    // Get here twice - once for listener, once for connector
+    void on_connection_open(proton::connection &c) PN_CPP_OVERRIDE {
+        c.close();
+        state++;
+    }
+
+    void on_connection_close(proton::connection &c) PN_CPP_OVERRIDE {
+        ASSERT(state==3);
+        c.container().stop();
+        state = 4;
+    }
+    void on_container_stop(proton::container & ) PN_CPP_OVERRIDE {
+        ASSERT(state==4);
+        state = 5;
+    }
+
+public:
+    stop_tester(): state(0) {}
+
+    int state;
+};
+
+int test_container_stop() {
+    stop_tester t;
+    proton::default_container(t).run();
+    ASSERT(t.state==5);
+    return 0;
+}
+
 }
 
 int main(int, char**) {
@@ -151,6 +196,7 @@ int main(int, char**) {
     RUN_TEST(failed, test_container_default_vhost());
     RUN_TEST(failed, test_container_no_vhost());
     RUN_TEST(failed, test_container_bad_address());
+    RUN_TEST(failed, test_container_stop());
     return failed;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43461aa0/proton-c/bindings/python/proton/reactor.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py
index 202820c..ee9cfde 100644
--- a/proton-c/bindings/python/proton/reactor.py
+++ b/proton-c/bindings/python/proton/reactor.py
@@ -132,6 +132,9 @@ class Reactor(Wrapper):
         self.start()
         while self.process(): pass
         self.stop()
+        self.process()
+        self.global_handler = None
+        self.handler = None
 
     def wakeup(self):
         n = pn_reactor_wakeup(self._impl)
@@ -159,8 +162,6 @@ class Reactor(Wrapper):
     def stop(self):
         pn_reactor_stop(self._impl)
         self._check_errors()
-        self.global_handler = None
-        self.handler = None
 
     def schedule(self, delay, task):
         impl = _chandler(task, self.on_error)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43461aa0/proton-c/src/reactor/reactor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c
index 3397e40..31cce08 100644
--- a/proton-c/src/reactor/reactor.c
+++ b/proton-c/src/reactor/reactor.c
@@ -51,6 +51,7 @@ struct pn_reactor_t {
   int selectables;
   int timeout;
   bool yield;
+  bool stop;
 };
 
 pn_timestamp_t pn_reactor_mark(pn_reactor_t *reactor) {
@@ -79,6 +80,7 @@ static void pn_reactor_initialize(pn_reactor_t *reactor) {
   reactor->selectables = 0;
   reactor->timeout = 0;
   reactor->yield = false;
+  reactor->stop = false;
   pn_reactor_mark(reactor);
 }
 
@@ -406,14 +408,21 @@ bool pn_reactor_process(pn_reactor_t *reactor) {
       previous = reactor->previous = type;
       pn_decref(event);
       pn_collector_pop(reactor->collector);
-    } else if (pni_reactor_more(reactor)) {
+    } else if (!reactor->stop && pni_reactor_more(reactor)) {
       if (previous != PN_REACTOR_QUIESCED && reactor->previous != PN_REACTOR_FINAL)
{
         pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_QUIESCED);
       } else {
         return true;
       }
     } else {
-      return false;
+      if (reactor->selectable) {
+        pn_selectable_terminate(reactor->selectable);
+        pn_reactor_update(reactor, reactor->selectable);
+        reactor->selectable = NULL;
+      } else {
+        pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL);
+        return false;
+      }
     }
   }
 }
@@ -458,19 +467,11 @@ void pn_reactor_start(pn_reactor_t *reactor) {
   assert(reactor);
   pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_INIT);
   reactor->selectable = pni_timer_selectable(reactor);
- }
+}
 
 void pn_reactor_stop(pn_reactor_t *reactor) {
   assert(reactor);
-  if (reactor->selectable) {
-    pn_selectable_terminate(reactor->selectable);
-    pn_reactor_update(reactor, reactor->selectable);
-    reactor->selectable = NULL;
-  }
-  pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL);
-  // XXX: should consider removing this from stop to avoid reentrance
-  pn_reactor_process(reactor);
-  pn_collector_release(reactor->collector);
+  reactor->stop = true;
 }
 
 void pn_reactor_run(pn_reactor_t *reactor) {
@@ -478,5 +479,6 @@ void pn_reactor_run(pn_reactor_t *reactor) {
   pn_reactor_set_timeout(reactor, 3141);
   pn_reactor_start(reactor);
   while (pn_reactor_process(reactor)) {}
-  pn_reactor_stop(reactor);
+  pn_reactor_process(reactor);
+  pn_collector_release(reactor->collector);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43461aa0/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 5949ae8..7448648 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -64,6 +64,7 @@ public class ReactorImpl implements Reactor, Extendable {
     private Set<ReactorChild> children;
     private int selectables;
     private boolean yield;
+    private boolean stop;
     private Selectable selectable;
     private EventType previous;
     private Timer timer;
@@ -283,7 +284,7 @@ public class ReactorImpl implements Reactor, Extendable {
                 collector.pop();
 
             } else {
-                if (more()) {
+                if (!stop && more()) {
                     if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL)
{
                         collector.put(Type.REACTOR_QUIESCED, this);
                     } else {
@@ -295,6 +296,7 @@ public class ReactorImpl implements Reactor, Extendable {
                         update(selectable);
                         selectable = null;
                     } else {
+                        collector.put(Type.REACTOR_FINAL, this);
                         return false;
                     }
                 }
@@ -326,10 +328,7 @@ public class ReactorImpl implements Reactor, Extendable {
 
     @Override
     public void stop() throws HandlerException {
-        collector.put(Type.REACTOR_FINAL, this);
-        // (Comment from C code) XXX: should consider removing this from stop to avoid reentrance
-        process();
-        collector = null;
+        stop = true;
     }
 
     private boolean more() {
@@ -342,6 +341,8 @@ public class ReactorImpl implements Reactor, Extendable {
         start();
         while(process()) {}
         stop();
+        process();
+        collector = null;
     }
 
     // pn_reactor_schedule from reactor.c


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message