From commits-return-49387-archive-asf-public=cust-asf.ponee.io@qpid.apache.org Wed Jan 15 19:45:08 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 27DAA18065E for ; Wed, 15 Jan 2020 20:45:08 +0100 (CET) Received: (qmail 53116 invoked by uid 500); 15 Jan 2020 19:45:04 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 53064 invoked by uid 99); 15 Jan 2020 19:45:03 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jan 2020 19:45:03 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id CC32A8194F; Wed, 15 Jan 2020 19:45:03 +0000 (UTC) Date: Wed, 15 Jan 2020 19:45:03 +0000 To: "commits@qpid.apache.org" Subject: [qpid-dispatch] branch master updated: DISPATCH-1488: add test to verify transanction handling MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <157911750371.28372.828663100505126230@gitbox.apache.org> From: kgiusti@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: qpid-dispatch X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 992cf139319bddd72442846583540f52b4e7d193 X-Git-Newrev: 2536f69c58166e4c746a2d173509da238fae59b0 X-Git-Rev: 2536f69c58166e4c746a2d173509da238fae59b0 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git The following commit(s) were added to refs/heads/master by this push: new 2536f69 DISPATCH-1488: add test to verify transanction handling 2536f69 is described below commit 2536f69c58166e4c746a2d173509da238fae59b0 Author: Kenneth Giusti AuthorDate: Thu Nov 21 14:04:38 2019 -0500 DISPATCH-1488: add test to verify transanction handling --- tests/system_tests_one_router.py | 155 +++++++++++++++++++++++++++++++++++++++ tests/test_broker.py | 4 +- 2 files changed, 157 insertions(+), 2 deletions(-) diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 77e8804..3810a8f 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -28,10 +28,15 @@ from system_test import unittest, QdManager from proton.handlers import MessagingHandler, TransactionHandler from proton.reactor import Container, AtMostOnce, AtLeastOnce, DynamicNodeProperties, LinkOption, ApplicationEvent, EventInjector from proton.utils import BlockingConnection, SyncRequestResponse +from proton import VERSION as PROTON_VERSION +from proton import Terminus +from proton import Data from qpid_dispatch.management.client import Node import os, json from subprocess import PIPE, STDOUT from time import sleep +from test_broker import FakeBroker + CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451} CONNECTION_PROPERTIES_SYMBOL = dict() @@ -3149,6 +3154,7 @@ class UnsettledLargeMessageTest(MessagingHandler): self.receiver.close() self.recv_conn.close() + class OneRouterUnavailableCoordinatorTest(TestCase): @classmethod def setUpClass(cls): @@ -3355,5 +3361,154 @@ class Q2HoldoffDropTest(MessagingHandler): sleep(0.1) +class OneRouterTransactionalAttachTest(TestCase): + """ + Verify that a transaction is properly forwarded through the router + """ + + class FakeTxnBroker(FakeBroker): + """ + A FakeBroker that tracks Transaction declaration. + Note well: Proton python does not provide the ability to set a delivery + state to DECLARED (0x0033), so this broker cannot simulate a full + transactional delivery. At best we ensure that the router properly + forwards the target capabilities and the declare message. + """ + def __init__(self, url, container_id=None, **handler_kwargs): + super(OneRouterTransactionalAttachTest.FakeTxnBroker, + self).__init__(url, container_id, **handler_kwargs) + self.txn_link = None + self.remote_caps = None + self.declare_body = None + + def on_link_opening(self, event): + if event.link.remote_target.type == Terminus.COORDINATOR: + self.txn_link = event.link + self.txn_link.source.copy(event.link.remote_source) + self.txn_link.target.copy(event.link.remote_target) + self.remote_caps = self.txn_link.remote_target.capabilities + self.txn_link.flow(1) + else: + super(OneRouterTransactionalAttachTest.FakeTxnBroker, + self).on_link_opening(event) + + def on_message(self, event): + if event.link == self.txn_link: + self.declare_body = event.message.body + event.delivery.update(Delivery.REJECTED) + event.delivery.settle() + else: + super(OneRouterTransactionalAttachTest.FakeTxnBroker, + self).on_message(event) + + + class TxSender(MessagingHandler, TransactionHandler): + """ + Transactional publisher client. The transaction will fail since the + fake broker cannot declare the transaction properly + """ + def __init__(self, url, messages=1): + super(OneRouterTransactionalAttachTest.TxSender, self).__init__() + self.url = Url(url) + self.sent = 0 + self.declare_failed = False + self.total = messages + + def on_start(self, event): + self.container = event.container + self.conn = self.container.connect(self.url) + self.sender = self.container.create_sender(self.conn, self.url.path) + self.container.declare_transaction(self.conn, handler=self) + self.transaction = None + + def on_transaction_declared(self, event): + self.transaction = event.transaction + self.declare_failed = False + self.send() + + def on_sendable(self, event): + self.send() + + def send(self): + if self.transaction and self.sender.credit > 0 and self.sent < self.total: + seq = self.sent + self.sent -= 1 + msg = Message(id=seq, body={'sequence':seq}) + self.transaction.send(self.sender, msg) + self.transaction.commit() + self.transaction = None + + def on_transaction_declare_failed(self, event): + # expected to fail, since the FakeBroker cannot declare a transaction + self.declare_failed = True + self.conn.close() + + + @classmethod + def setUpClass(cls): + super(OneRouterTransactionalAttachTest, cls).setUpClass() + config = Qdrouterd.Config([ + ('router', {'mode': 'standalone', 'id': 'TxnRouter'}), + ('listener', {'port': cls.tester.get_port() }), + ('connector', {'port': cls.tester.get_port(), + 'role': 'route-container'}), + + ('linkRoute', {'prefix': "$coordinator", + 'containerId': "FakeBroker", + 'direction': "in"}), + + ('linkRoute', {'prefix': 'closest/queue01', + 'containerId': 'FakeBroker', + 'direction': 'in'}), + ('linkRoute', {'prefix': 'closest/queue01', + 'containerId': 'FakeBroker', + 'direction': 'out'}), + + ('address', {'prefix': 'closest', 'distribution': 'closest'}), + ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), + ('address', {'prefix': 'multicast', 'distribution': 'multicast'}), + ]) + cls.router = cls.tester.qdrouterd('TxnRouter', config, wait=False) + cls.listener = cls.router.addresses[0] + cls.connector = cls.router.connector_addresses[0] + + cls.broker = cls.FakeTxnBroker(url=cls.connector, + prefetch=0, + auto_accept=False, + auto_settle=False) + cls.router.wait_connectors() + cls.router.wait_address("closest/queue01") + + def test_01_verify_attach(self): + """ + Verify the transaction link attach is correctly forwarded to the broker + """ + client = self.TxSender(url=self.listener) + Container(client).run() + self.assertTrue(client.declare_failed) + self.assertTrue(self.broker.txn_link is not None) + self.assertTrue(self.broker.declare_body is not None) + self.assertEqual(symbol('amqp:declare:list'), + self.broker.declare_body.descriptor) + if PROTON_VERSION >= (0, 30, 0): + # prior to proton 0.30.0 capabilities were not provided + # see PROTON-2138 + self.assertTrue(self.broker.remote_caps is not None) + # capabilities should be a list with a txn-capability type + # verify router has forwarded this correctly: + rc = self.broker.remote_caps + rc.rewind() + count = 0 + while rc.next() == Data.SYMBOL: + s = rc.get_symbol() + self.assertTrue(s in [symbol('amqp:local-transactions'), + symbol('amqp:distributed-transactions'), + symbol('amqp:promotable-transactions'), + symbol('amqp:multi-txns-per-ssn'), + symbol('amqp:multi-ssns-per-txn')]) + count += 1 + self.assertTrue(count > 0) + + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/test_broker.py b/tests/test_broker.py index 3b99bac..2f101a7 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -85,8 +85,8 @@ class FakeBroker(MessagingHandler): except IndexError: # no more messages return 0 - def __init__(self, url, container_id=None): - super(FakeBroker, self).__init__() + def __init__(self, url, container_id=None, **handler_kwargs): + super(FakeBroker, self).__init__(**handler_kwargs) self.url = url self.queues = {} self.acceptor = None --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org