qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject [qpid-proton] 02/03: PROTON-2156: [python] Rework connector scheme to work with tornado ioloop - This also required a small change to the tornado interface code - Add back the helloworld_tornado and helloworld_direct_tornado tests
Date Mon, 20 Jan 2020 17:36:26 GMT
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 826b6eff203dee34bc45082e9d05baa805327473
Author: Andrew Stitcher <astitcher@apache.org>
AuthorDate: Fri May 10 12:58:07 2019 -0400

    PROTON-2156: [python] Rework connector scheme to work with tornado ioloop
    - This also required a small change to the tornado interface code
    - Add back the helloworld_tornado and helloworld_direct_tornado tests
---
 python/examples/proton_tornado.py |  7 +++++--
 python/examples/test_examples.py  |  6 ++++++
 python/proton/_handlers.py        | 26 +++++++++++++++-----------
 3 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/python/examples/proton_tornado.py b/python/examples/proton_tornado.py
index 8829a4a..55e8db8 100755
--- a/python/examples/proton_tornado.py
+++ b/python/examples/proton_tornado.py
@@ -91,8 +91,11 @@ class TornadoLoopHandler:
             self.loop.add_callback(self._stop)
 
     def _stop(self):
-        self.reactor.stop()
-        self.loop.stop()
+        # We could have received a new selectable since we sent the stop
+        if self.count == 0:
+            self.reactor.stop()
+            self.loop.stop()
+
 
 class Container(object):
     def __init__(self, *handlers, **kwargs):
diff --git a/python/examples/test_examples.py b/python/examples/test_examples.py
index 01bcc94..8df68ec 100644
--- a/python/examples/test_examples.py
+++ b/python/examples/test_examples.py
@@ -69,6 +69,12 @@ class ExamplesTest(unittest.TestCase):
     def test_helloworld_blocking(self):
         self.test_helloworld('helloworld_blocking.py')
 
+    def test_helloworld_tornado(self):
+        self.test_helloworld('helloworld_tornado.py')
+
+    def test_helloworld_direct_tornado(self):
+        self.test_helloworld('helloworld_direct_tornado.py')
+
     def test_simple_send_recv(self, recv='simple_recv.py', send='simple_send.py'):
         with Popen([recv]) as r:
             with Popen([send]):
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index 3525ffb..5ff45dd 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -1396,26 +1396,33 @@ class ConnectSelectable(Selectable):
     def writable(self):
         e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
         t = self._transport
+
+        # Always cleanup this ConnectSelectable: either we failed or created a new one
+        # Do it first to ensure the socket gets deregistered before being registered again
+        # in the case of connecting
+        self.terminate()
+        self._transport = None
+        self.update()
+
         if e == 0:
             log.debug("Connection succeeded")
+
+            # Disassociate from the socket (which will be passed on)
+            self.release()
+
             s = self._reactor.selectable(delegate=self._delegate)
             s._transport = t
             t._selectable = s
             self._iohandler.update(t, s, t._reactor.now)
 
-            # Disassociate from the socket (which has been passed on)
-            self._delegate = None
-            self.terminate()
-            self._transport = None
-            self.update()
             return
         elif e == errno.ECONNREFUSED:
             if len(self._addrs) > 0:
                 log.debug("Connection refused: trying next transport address: %s", self._addrs[0])
+
                 sock = IO.connect(self._addrs[0])
-                self._addrs = self._addrs[1:]
-                self._delegate.close()
-                self._delegate = sock
+                # New ConnectSelectable for the new socket with rest of addresses
+                ConnectSelectable(sock, self._reactor, self._addrs[1:], t, self._iohandler)
                 return
             else:
                 log.debug("Connection refused, but tried all transport addresses")
@@ -1426,6 +1433,3 @@ class ConnectSelectable(Selectable):
 
         t.close_tail()
         t.close_head()
-        self.terminate()
-        self._transport = None
-        self.update()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message