qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1488: add test to verify transanction handling
Date Wed, 15 Jan 2020 19:45:03 GMT
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 2536f69  DISPATCH-1488: add test to verify transanction handling
2536f69 is described below

commit 2536f69c58166e4c746a2d173509da238fae59b0
Author: Kenneth Giusti <kgiusti@apache.org>
AuthorDate: Thu Nov 21 14:04:38 2019 -0500

    DISPATCH-1488: add test to verify transanction handling
---
 tests/system_tests_one_router.py | 155 +++++++++++++++++++++++++++++++++++++++
 tests/test_broker.py             |   4 +-
 2 files changed, 157 insertions(+), 2 deletions(-)

diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 77e8804..3810a8f 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -28,10 +28,15 @@ from system_test import unittest, QdManager
 from proton.handlers import MessagingHandler, TransactionHandler
 from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption,
ApplicationEvent, EventInjector
 from proton.utils import BlockingConnection, SyncRequestResponse
+from proton import VERSION as PROTON_VERSION
+from proton import Terminus
+from proton import Data
 from qpid_dispatch.management.client import Node
 import os, json
 from subprocess import PIPE, STDOUT
 from time import sleep
+from test_broker import FakeBroker
+
 
 CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451}
 CONNECTION_PROPERTIES_SYMBOL = dict()
@@ -3149,6 +3154,7 @@ class UnsettledLargeMessageTest(MessagingHandler):
             self.receiver.close()
             self.recv_conn.close()
 
+
 class OneRouterUnavailableCoordinatorTest(TestCase):
     @classmethod
     def setUpClass(cls):
@@ -3355,5 +3361,154 @@ class Q2HoldoffDropTest(MessagingHandler):
                 sleep(0.1)
 
 
+class OneRouterTransactionalAttachTest(TestCase):
+    """
+    Verify that a transaction is properly forwarded through the router
+    """
+
+    class FakeTxnBroker(FakeBroker):
+        """
+        A FakeBroker that tracks Transaction declaration.
+        Note well: Proton python does not provide the ability to set a delivery
+        state to DECLARED (0x0033), so this broker cannot simulate a full
+        transactional delivery.  At best we ensure that the router properly
+        forwards the target capabilities and the declare message.
+        """
+        def __init__(self, url, container_id=None, **handler_kwargs):
+            super(OneRouterTransactionalAttachTest.FakeTxnBroker,
+                  self).__init__(url, container_id, **handler_kwargs)
+            self.txn_link = None
+            self.remote_caps = None
+            self.declare_body = None
+
+        def on_link_opening(self, event):
+            if event.link.remote_target.type == Terminus.COORDINATOR:
+                self.txn_link = event.link
+                self.txn_link.source.copy(event.link.remote_source)
+                self.txn_link.target.copy(event.link.remote_target)
+                self.remote_caps = self.txn_link.remote_target.capabilities
+                self.txn_link.flow(1)
+            else:
+                super(OneRouterTransactionalAttachTest.FakeTxnBroker,
+                      self).on_link_opening(event)
+
+        def on_message(self, event):
+            if event.link == self.txn_link:
+                self.declare_body = event.message.body
+                event.delivery.update(Delivery.REJECTED)
+                event.delivery.settle()
+            else:
+                super(OneRouterTransactionalAttachTest.FakeTxnBroker,
+                      self).on_message(event)
+
+
+    class TxSender(MessagingHandler, TransactionHandler):
+        """
+        Transactional publisher client.  The transaction will fail since the
+        fake broker cannot declare the transaction properly
+        """
+        def __init__(self, url, messages=1):
+            super(OneRouterTransactionalAttachTest.TxSender, self).__init__()
+            self.url = Url(url)
+            self.sent = 0
+            self.declare_failed = False
+            self.total = messages
+
+        def on_start(self, event):
+            self.container = event.container
+            self.conn = self.container.connect(self.url)
+            self.sender = self.container.create_sender(self.conn, self.url.path)
+            self.container.declare_transaction(self.conn, handler=self)
+            self.transaction = None
+
+        def on_transaction_declared(self, event):
+            self.transaction = event.transaction
+            self.declare_failed = False
+            self.send()
+
+        def on_sendable(self, event):
+            self.send()
+
+        def send(self):
+            if self.transaction and self.sender.credit > 0 and self.sent < self.total:
+                seq = self.sent
+                self.sent -= 1
+                msg = Message(id=seq, body={'sequence':seq})
+                self.transaction.send(self.sender, msg)
+                self.transaction.commit()
+                self.transaction = None
+
+        def on_transaction_declare_failed(self, event):
+            # expected to fail, since the FakeBroker cannot declare a transaction
+            self.declare_failed = True
+            self.conn.close()
+
+
+    @classmethod
+    def setUpClass(cls):
+        super(OneRouterTransactionalAttachTest, cls).setUpClass()
+        config = Qdrouterd.Config([
+            ('router', {'mode': 'standalone', 'id': 'TxnRouter'}),
+            ('listener', {'port': cls.tester.get_port() }),
+            ('connector', {'port': cls.tester.get_port(),
+                           'role': 'route-container'}),
+
+            ('linkRoute', {'prefix': "$coordinator",
+                           'containerId': "FakeBroker",
+                           'direction': "in"}),
+
+            ('linkRoute', {'prefix': 'closest/queue01',
+                           'containerId': 'FakeBroker',
+                           'direction': 'in'}),
+            ('linkRoute', {'prefix': 'closest/queue01',
+                           'containerId': 'FakeBroker',
+                           'direction': 'out'}),
+
+            ('address', {'prefix': 'closest', 'distribution': 'closest'}),
+            ('address', {'prefix': 'balanced', 'distribution': 'balanced'}),
+            ('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
+        ])
+        cls.router = cls.tester.qdrouterd('TxnRouter', config, wait=False)
+        cls.listener = cls.router.addresses[0]
+        cls.connector = cls.router.connector_addresses[0]
+
+        cls.broker = cls.FakeTxnBroker(url=cls.connector,
+                                       prefetch=0,
+                                       auto_accept=False,
+                                       auto_settle=False)
+        cls.router.wait_connectors()
+        cls.router.wait_address("closest/queue01")
+
+    def test_01_verify_attach(self):
+        """
+        Verify the transaction link attach is correctly forwarded to the broker
+        """
+        client = self.TxSender(url=self.listener)
+        Container(client).run()
+        self.assertTrue(client.declare_failed)
+        self.assertTrue(self.broker.txn_link is not None)
+        self.assertTrue(self.broker.declare_body is not None)
+        self.assertEqual(symbol('amqp:declare:list'),
+                         self.broker.declare_body.descriptor)
+        if PROTON_VERSION >= (0, 30, 0):
+            # prior to proton 0.30.0 capabilities were not provided
+            # see PROTON-2138
+            self.assertTrue(self.broker.remote_caps is not None)
+            # capabilities should be a list with a txn-capability type
+            # verify router has forwarded this correctly:
+            rc = self.broker.remote_caps
+            rc.rewind()
+            count = 0
+            while rc.next() == Data.SYMBOL:
+                s = rc.get_symbol()
+                self.assertTrue(s in [symbol('amqp:local-transactions'),
+                                      symbol('amqp:distributed-transactions'),
+                                      symbol('amqp:promotable-transactions'),
+                                      symbol('amqp:multi-txns-per-ssn'),
+                                      symbol('amqp:multi-ssns-per-txn')])
+                count += 1
+            self.assertTrue(count > 0)
+
+
 if __name__ == '__main__':
     unittest.main(main_module())
diff --git a/tests/test_broker.py b/tests/test_broker.py
index 3b99bac..2f101a7 100644
--- a/tests/test_broker.py
+++ b/tests/test_broker.py
@@ -85,8 +85,8 @@ class FakeBroker(MessagingHandler):
             except IndexError: # no more messages
                 return 0
 
-    def __init__(self, url, container_id=None):
-        super(FakeBroker, self).__init__()
+    def __init__(self, url, container_id=None, **handler_kwargs):
+        super(FakeBroker, self).__init__(**handler_kwargs)
         self.url = url
         self.queues = {}
         self.acceptor = None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message