qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject qpid-dispatch git commit: DISPATCH-1092: clear the cached session pointer when the session closes
Date Thu, 02 Aug 2018 13:15:30 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 7285a47dc -> ecfd32527


DISPATCH-1092: clear the cached session pointer when the session closes


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/ecfd3252
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/ecfd3252
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/ecfd3252

Branch: refs/heads/master
Commit: ecfd325276cadc8c5da3e81159e30aa8ed582fef
Parents: 7285a47
Author: Kenneth Giusti <kgiusti@apache.org>
Authored: Tue Jul 31 14:44:42 2018 -0400
Committer: Kenneth Giusti <kgiusti@apache.org>
Committed: Wed Aug 1 14:21:11 2018 -0400

----------------------------------------------------------------------
 src/container.c                   |   6 +-
 tests/system_test.py              |  41 +++++------
 tests/system_tests_link_routes.py | 125 ++++++++++++++++++++++++++++++++-
 3 files changed, 147 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ecfd3252/src/container.c
----------------------------------------------------------------------
diff --git a/src/container.c b/src/container.c
index d4fbe51..c23f951 100644
--- a/src/container.c
+++ b/src/container.c
@@ -465,6 +465,8 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
 
     case PN_SESSION_LOCAL_CLOSE :
         ssn = pn_event_session(event);
+        if (ssn == qd_conn->pn_sess)
+            qd_conn->pn_sess = 0;
         pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
         while (pn_link) {
             if (pn_link_session(pn_link) == ssn) {
@@ -481,8 +483,10 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t
*event,
         break;
 
     case PN_SESSION_REMOTE_CLOSE :
+        ssn = pn_event_session(event);
+        if (ssn == qd_conn->pn_sess)
+            qd_conn->pn_sess = 0;
         if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
-            ssn = pn_event_session(event);
             if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
                 // remote has nuked our session.  Check for any links that were
                 // left open and forcibly detach them, since no detaches will

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ecfd3252/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index e4e1109..3159301 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -389,37 +389,32 @@ class Qdrouterd(Process):
         """Return list of configured ports for all listeners"""
         return [l['port'] for l in self.config.sections('listener')]
 
+    def _cfg_2_host_port(self, c):
+        host = c['host']
+        port = c['port']
+        protocol_family = c.get('protocolFamily', 'IPv4')
+        if protocol_family == 'IPv6':
+            return "[%s]:%s" % (host, port)
+        elif protocol_family == 'IPv4':
+            return "%s:%s" % (host, port)
+        raise Exception("Unknown protocol family: %s" % protocol_family)
+
     @property
     def addresses(self):
         """Return amqp://host:port addresses for all listeners"""
-        address_list = []
-        for l in self.config.sections('listener'):
-            protocol_family = l.get('protocolFamily')
-            if protocol_family == 'IPv6':
-                address_list.append("amqp://[%s]:%s"%(l['host'], l['port']))
-            elif protocol_family == 'IPv4':
-                address_list.append("amqp://%s:%s"%(l['host'], l['port']))
-            else:
-                # Default to IPv4
-                address_list.append("amqp://%s:%s"%(l['host'], l['port']))
+        cfg = self.config.sections('listener')
+        return ["amqp://%s" % self._cfg_2_host_port(l) for l in cfg]
 
-        return address_list
+    @property
+    def connector_addresses(self):
+        """Return list of amqp://host:port for all connectors"""
+        cfg = self.config.sections('connector')
+        return ["amqp://%s" % self._cfg_2_host_port(c) for c in cfg]
 
     @property
     def hostports(self):
         """Return host:port for all listeners"""
-        address_list = []
-        for l in self.config.sections('listener'):
-            protocol_family = l.get('protocolFamily')
-            if protocol_family == 'IPv6':
-                address_list.append("[%s]:%s"%(l['host'], l['port']))
-            elif protocol_family == 'IPv4':
-                address_list.append("%s:%s"%(l['host'], l['port']))
-            else:
-                # Default to IPv4
-                address_list.append("%s:%s"%(l['host'], l['port']))
-
-        return address_list
+        return [self._cfg_2_host_port(l) for l in self.config.sections('listener')]
 
     def is_connected(self, port, host='127.0.0.1'):
         """If router has a connection to host:port:identity return the management info.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ecfd3252/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 57e6d41..71a3528 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -24,13 +24,14 @@ from __future__ import print_function
 
 import unittest2 as unittest
 from time import sleep, time
+from threading import Thread
 from subprocess import PIPE, STDOUT
 
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
 
 from proton import Message
 from proton.handlers import MessagingHandler
-from proton.reactor import AtMostOnce, Container, DynamicNodeProperties, LinkOption
+from proton.reactor import AtMostOnce, Container, DynamicNodeProperties, LinkOption, AtLeastOnce
 from proton.utils import BlockingConnection
 from system_tests_drain_support import DrainMessagesHandler, DrainOneMessageHandler, DrainNoMessagesHandler,
DrainNoMoreMessagesHandler
 
@@ -1438,5 +1439,127 @@ class MultiLinkSendReceive(MessagingHandler):
     def run(self):
         Container(self).run()
 
+
+class LinkRouteProtocolTest(TestCase):
+    """
+    Test link route implementation against "misbehaving" containers
+
+    Uses a custom fake broker (not a router) that can do weird things at the
+    protocol level.
+
+             +-------------+         +---------+         +-----------------+
+             |             | <------ |         | <-----  | blocking_sender |
+             | fake broker |         |  QDR.A  |         +-----------------+
+             |             | ------> |         | ------> +-------------------+
+             +-------------+         +---------+         | blocking_receiver |
+                                                         +-------------------+
+    """
+    @classmethod
+    def setUpClass(cls):
+        """Configure and start QDR.A"""
+        super(LinkRouteProtocolTest, cls).setUpClass()
+        config = [
+            ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
+            # for client connections:
+            ('listener', {'role': 'normal',
+                          'host': '0.0.0.0',
+                          'port': cls.tester.get_port(),
+                          'saslMechanisms': 'ANONYMOUS'}),
+            # to connect to the fake broker
+            ('connector', {'name': 'broker',
+                           'role': 'route-container',
+                           'host': '127.0.0.1',
+                           'port': cls.tester.get_port(),
+                           'saslMechanisms': 'ANONYMOUS'}),
+
+            # forward 'org.apache' messages to + from fake broker:
+            ('linkRoute', {'prefix': 'org.apache', 'containerId': 'FakeBroker', 'direction':
'in'}),
+            ('linkRoute', {'prefix': 'org.apache', 'containerId': 'FakeBroker', 'direction':
'out'})
+        ]
+        config = Qdrouterd.Config(config)
+        cls.router = cls.tester.qdrouterd('A', config, wait=False)
+
+    def _fake_broker(self, cls):
+        """Spawn a fake broker listening on the broker's connector
+        """
+        fake_broker = cls(self.router.connector_addresses[0])
+        # wait until the connection to the fake broker activates
+        self.router.wait_connectors()
+        return fake_broker
+
+    def test_DISPATCH_1092(self):
+        # This fake broker will force the session closed after the link
+        # detaches.  Verify that the session comes back up correctly when the
+        # next client attaches
+        killer = self._fake_broker(SessionKiller)
+        for i in range(2):
+            bconn = BlockingConnection(self.router.addresses[0])
+            bsender = bconn.create_sender(address="org.apache",
+                                          options=AtLeastOnce())
+            msg = Message(body="Hey!")
+            bsender.send(msg)
+            bsender.close()
+            bconn.close()
+        killer.join()
+
+
+class _FakeBroker(MessagingHandler):
+    """Base class for creating customized fake brokers
+    """
+    def __init__(self, address):
+        super(_FakeBroker, self).__init__()
+        self.address = address
+        self.listener = None
+        self._container = Container(self)
+        self._container.container_id = 'FakeBroker'
+        self._thread = Thread(target=self._main)
+        self._thread.daemon = True
+        self._thread.start()
+        self._stop_thread = False
+
+    def _main(self):
+        self._container.timeout = 1.0
+        self._container.start()
+        while self._container.process():
+            if self._stop_thread:
+                break
+        self._container.stop()
+        self._container.process()
+
+    def join(self):
+        self._stop_thread = True
+        self._container.wakeup()
+        self._thread.join(timeout=10)
+        if self._thread.is_alive():
+            raise Exception("Fake Broker did not exit")
+
+    def on_start(self, event):
+        self.listener = event.container.listen(self.address)
+
+    def on_connection_opening(self, event):
+        pn_conn = event.connection
+        pn_conn.container = self._container.container_id
+
+    def on_connection_closed(self, event):
+        if self.listener:
+            self.listener.close()
+            self.listener = None
+
+    def on_link_opening(self, event):
+        # just copy the addresses and open the link
+        event.link.target.address = event.link.remote_target.address
+        event.link.source.address = event.link.remote_source.address
+        event.link.open()
+
+
+class SessionKiller(_FakeBroker):
+    """DISPATCH-1092: force a session close when the link closes.  This should
+    cause the router to re-create the session when the next client attaches.
+    """
+    def on_link_closing(self, event):
+        event.link.close()
+        event.session.close()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message