qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject qpid-dispatch git commit: DISPATCH-1100: add auto delete linkRoutes
Date Fri, 24 Aug 2018 20:32:37 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 4892bfaa6 -> c03a9f0ab


DISPATCH-1100: add auto delete linkRoutes

This closes #359


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c03a9f0a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c03a9f0a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c03a9f0a

Branch: refs/heads/master
Commit: c03a9f0ab6773471faa9b7411a76e4f68b3023d2
Parents: 4892bfa
Author: Kenneth Giusti <kgiusti@apache.org>
Authored: Fri Aug 3 10:57:43 2018 -0400
Committer: Kenneth Giusti <kgiusti@apache.org>
Committed: Fri Aug 24 16:31:40 2018 -0400

----------------------------------------------------------------------
 python/qpid_dispatch/management/qdrouter.json |  11 +-
 src/router_core/agent_config_link_route.c     |  13 ++-
 src/router_core/agent_config_link_route.h     |   2 +-
 src/router_core/route_control.c               |  29 +++--
 src/router_core/route_control.h               |   3 +-
 src/router_core/router_core.c                 |   5 +-
 src/router_core/router_core_private.h         |   5 +-
 tests/system_tests_link_routes.py             | 122 ++++++++++++++++++---
 8 files changed, 154 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 0511343..5e66bdc 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1146,13 +1146,13 @@
                 },
                 "containerId": {
                     "type": "string",
-                    "description": "ContainerID for the target container. Only one of containerId
or connection should be specified for a linkRoute. Specifying both will result in the linkRoute
not being created.",
+                    "description": "ContainerID for the target container. Only one of containerId
or connection should be specified for a linkRoute. Specifying both will result in the linkRoute
not being created. For linkRoutes created with a lifetimePolicy of 'connection-close' the
linkRoute configuration will be deleted when the last connection through which this container
can be accessed is closed.",
                     "create": true,
                     "required": false
                 },
                 "connection": {
                     "type": "string",
-                    "description": "The name from a connector or listener. Only one of containerId
or connection should be specified for a linkRoute. Specifying both will result in the linkRoute
not being created.",
+                    "description": "The name from a connector or listener. Only one of containerId
or connection should be specified for a linkRoute. Specifying both will result in the linkRoute
not being created. For linkRoutes created with a lifetimePolicy of 'connection-close' the
linkRoute configuration will be deleted when the connection is closed.",
                     "create": true,
                     "required": false
                 },
@@ -1174,6 +1174,13 @@
                     "type": ["inactive", "active"],
                     "description": "The operational status of this linkRoute: inactive -
The remote container is not connected; active - the remote container is connected and ready
to accept link routed attachments.",
                     "create": false
+                },
+                "deleteOnClose": {
+                    "type": "boolean",
+                    "description": "By default a configured linkRoute exists until it is
removed via the DELETE management operation. When deleteOnClose is set to 'true' the router
will automatically delete the linkRoute when its associated connection or container closes.
 This allows transient servers to configure linkRoutes for their services such that the linkRoutes
are removed when the server disconnects.",
+                    "default": false,
+                    "required": false,
+                    "create": true
                 }
             }
         },

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/src/router_core/agent_config_link_route.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_link_route.c b/src/router_core/agent_config_link_route.c
index e94ad23..7f1ee9f 100644
--- a/src/router_core/agent_config_link_route.c
+++ b/src/router_core/agent_config_link_route.c
@@ -36,6 +36,7 @@
 #define QDR_CONFIG_LINK_ROUTE_PATTERN       10
 #define QDR_CONFIG_LINK_ROUTE_ADD_EXTERNAL_PREFIX 11
 #define QDR_CONFIG_LINK_ROUTE_DEL_EXTERNAL_PREFIX 12
+#define QDR_CONFIG_LINK_ROUTE_DELETE_ON_CLOSE     13
 
 const char *qdr_config_link_route_columns[] =
     {"name",
@@ -51,6 +52,7 @@ const char *qdr_config_link_route_columns[] =
      "pattern",
      "addExternalPrefix",
      "delExternalPrefix",
+     "deleteOnClose",
      0};
 
 const char *CONFIG_LINKROUTE_TYPE = "org.apache.qpid.dispatch.router.config.linkRoute";
@@ -156,9 +158,13 @@ static void qdr_config_link_route_insert_column_CT(qdr_link_route_t *lr,
int col
         break;
 
     case QDR_CONFIG_LINK_ROUTE_OPER_STATUS:
-        text = lr->active ? "active" : "inactive";
+        text = lr->active_ct > 0 ? "active" : "inactive";
         qd_compose_insert_string(body, text);
         break;
+
+    case QDR_CONFIG_LINK_ROUTE_DELETE_ON_CLOSE:
+        qd_compose_insert_bool(body, lr->delete_on_close);
+        break;
     }
 }
 
@@ -416,6 +422,7 @@ void qdra_config_link_route_create_CT(qdr_core_t        *core,
         qd_parsed_field_t *connection_field = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_CONNECTION]);
         qd_parsed_field_t *container_field  = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_CONTAINER_ID]);
         qd_parsed_field_t *dir_field        = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_DIRECTION]);
+        qd_parsed_field_t *del_on_close_field  = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_DELETE_ON_CLOSE]);
         if (! dir_field) {
             dir_field        = qd_parse_value_by_key(in_body, qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_DIR]);
             if (dir_field)
@@ -472,11 +479,13 @@ void qdra_config_link_route_create_CT(qdr_core_t        *core,
             break;
         }
 
+        bool auto_delete = del_on_close_field ? qd_parse_as_bool(del_on_close_field) : false;
+
         //
         // The request is good.  Create the entity.
         //
 
-        lr = qdr_route_add_link_route_CT(core, name, prefix_field, pattern_field, add_prefix_field,
del_prefix_field, container_field, connection_field, trt, dir);
+        lr = qdr_route_add_link_route_CT(core, name, prefix_field, pattern_field, add_prefix_field,
del_prefix_field, container_field, connection_field, trt, dir, auto_delete);
 
         //
         // Compose the result map for the response.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/src/router_core/agent_config_link_route.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_link_route.h b/src/router_core/agent_config_link_route.h
index 465fd22..ef74221 100644
--- a/src/router_core/agent_config_link_route.h
+++ b/src/router_core/agent_config_link_route.h
@@ -33,7 +33,7 @@ void qdra_config_link_route_get_CT(qdr_core_t    *core,
                                    qdr_query_t   *query,
                                    const char    *qdr_config_link_route_columns[]);
 
-#define QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT 13
+#define QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT 14
 
 const char *qdr_config_link_route_columns[QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT + 1];
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index fff2c74..6e5a16d 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -196,19 +196,19 @@ static void qdr_link_route_activate_CT(qdr_core_t *core, qdr_link_route_t
*lr, q
     // activation for this address, notify the router module of the added address.
     //
     if (lr->addr) {
+        lr->active_ct += 1;
         qdr_add_connection_ref(&lr->addr->conns, conn);
-        if (DEQ_SIZE(lr->addr->conns) == 1) {
+        if (lr->active_ct == 1) {
             address = qdr_link_route_pattern_to_address(lr->pattern, lr->dir);
             qd_log(core->log, QD_LOG_TRACE, "Activating link route pattern [%s]", address);
             qdr_post_mobile_added_CT(core, address);
             free(address);
         }
     }
-
-    lr->active = true;
 }
 
 
+// note that this function may free the qdr_link_route_t
 static void qdr_link_route_deactivate_CT(qdr_core_t *core, qdr_link_route_t *lr, qdr_connection_t
*conn)
 {
     qdr_route_log_CT(core, "Link Route Deactivated", lr->name, lr->identity, conn);
@@ -216,9 +216,10 @@ static void qdr_link_route_deactivate_CT(qdr_core_t *core, qdr_link_route_t
*lr,
     //
     // Deactivate the address(es) for link-routed destinations.
     //
-    if (lr->addr) {
-        qdr_del_connection_ref(&lr->addr->conns, conn);
-        if (DEQ_IS_EMPTY(lr->addr->conns)) {
+    if (lr->addr && qdr_del_connection_ref(&lr->addr->conns, conn))
{
+        assert(lr->active_ct > 0);
+        lr->active_ct -= 1;
+        if (lr->active_ct == 0) {
             char *address = qdr_link_route_pattern_to_address(lr->pattern, lr->dir);
             qd_log(core->log, QD_LOG_TRACE, "Deactivating link route pattern [%s]", address);
             qdr_post_mobile_removed_CT(core, address);
@@ -226,7 +227,9 @@ static void qdr_link_route_deactivate_CT(qdr_core_t *core, qdr_link_route_t
*lr,
         }
     }
 
-    lr->active = false;
+    if (lr->delete_on_close && lr->active_ct == 0) {
+        qdr_route_del_link_route_CT(core, lr);  // note: frees 'lr'
+    }
 }
 
 
@@ -305,7 +308,8 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t             *core,
                                               qd_parsed_field_t      *container_field,
                                               qd_parsed_field_t      *connection_field,
                                               qd_address_treatment_t  treatment,
-                                              qd_direction_t          dir)
+                                              qd_direction_t          dir,
+                                              bool                    auto_delete)
 {
     const bool is_prefix = !!prefix_field;
     qd_iterator_t *iter = qd_parse_raw(is_prefix ? prefix_field : pattern_field);
@@ -339,6 +343,7 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t             *core,
     lr->treatment = treatment;
     lr->is_prefix = is_prefix;
     lr->pattern   = pattern;
+    lr->delete_on_close = auto_delete;
 
     if (!!add_prefix_field) {
         qd_iterator_t *ap_iter = qd_parse_raw(add_prefix_field);
@@ -454,6 +459,7 @@ void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr)
     // Disassociate from the connection identifier.  Check to see if the identifier
     // should be removed.
     //
+    lr->delete_on_close = false;  // prevent recursion via deactivate
     qdr_conn_identifier_t *cid = lr->conn_id;
     if (cid) {
         qdr_connection_ref_t * cref = DEQ_HEAD(cid->connection_refs);
@@ -627,12 +633,15 @@ void qdr_route_connection_closed_CT(qdr_core_t *core, qdr_connection_t
*conn)
     qdr_conn_identifier_t *cid = conn->conn_id;
     if (cid) {
         //
-        // Deactivate all link-routes associated with this remote container.
+        // Deactivate all link-routes associated with this remote
+        // container.  This may delete deleteOnClose=True link routes
+        // so be careful walking the linked list
         //
         qdr_link_route_t *lr = DEQ_HEAD(cid->link_route_refs);
         while (lr) {
+            qdr_link_route_t *next = DEQ_NEXT_N(REF, lr);
             qdr_link_route_deactivate_CT(core, lr, conn);
-            lr = DEQ_NEXT_N(REF, lr);
+            lr = next;
         }
 
         //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/src/router_core/route_control.h
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.h b/src/router_core/route_control.h
index a927948..21d0fde 100644
--- a/src/router_core/route_control.h
+++ b/src/router_core/route_control.h
@@ -30,7 +30,8 @@ qdr_link_route_t *qdr_route_add_link_route_CT(qdr_core_t             *core,
                                               qd_parsed_field_t      *container_field,
                                               qd_parsed_field_t      *connection_field,
                                               qd_address_treatment_t  treatment,
-                                              qd_direction_t          dir);
+                                              qd_direction_t          dir,
+                                              bool                    auto_delete);
 
 void qdr_route_del_link_route_CT(qdr_core_t *core, qdr_link_route_t *lr);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index ef9e382..4e3e442 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -454,17 +454,18 @@ void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_
 }
 
 
-void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn)
+bool qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn)
 {
     qdr_connection_ref_t *ref = DEQ_HEAD(*ref_list);
     while (ref) {
         if (ref->conn == conn) {
             DEQ_REMOVE(*ref_list, ref);
             free_qdr_connection_ref_t(ref);
-            break;
+            return true;
         }
         ref = DEQ_NEXT(ref);
     }
+    return false;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index e9d7af4..c895c99 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -444,7 +444,7 @@ ALLOC_DECLARE(qdr_connection_ref_t);
 DEQ_DECLARE(qdr_connection_ref_t, qdr_connection_ref_list_t);
 
 void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn);
-void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn);
+bool qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn);
 
 struct qdr_address_t {
     DEQ_LINKS(qdr_address_t);
@@ -592,8 +592,9 @@ struct qdr_link_route_t {
     qd_direction_t          dir;
     qdr_conn_identifier_t  *conn_id;
     qd_address_treatment_t  treatment;
-    bool                    active;
+    int                     active_ct;
     bool                    is_prefix;
+    bool                    delete_on_close;
     char                   *pattern;
     char                   *add_prefix;
     char                   *del_prefix;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c03a9f0a/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 71a3528..d345b73 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -24,7 +24,7 @@ from __future__ import print_function
 
 import unittest2 as unittest
 from time import sleep, time
-from threading import Thread
+from threading import Thread, Event
 from subprocess import PIPE, STDOUT
 
 from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, Process
@@ -1440,12 +1440,12 @@ class MultiLinkSendReceive(MessagingHandler):
         Container(self).run()
 
 
-class LinkRouteProtocolTest(TestCase):
+class LinkRouteContainerTest(TestCase):
     """
-    Test link route implementation against "misbehaving" containers
+    Test link route implementation against custom containers
+
+    Custom fake brokers (not a router) tailored for particular test cases.
 
-    Uses a custom fake broker (not a router) that can do weird things at the
-    protocol level.
 
              +-------------+         +---------+         +-----------------+
              |             | <------ |         | <-----  | blocking_sender |
@@ -1457,7 +1457,7 @@ class LinkRouteProtocolTest(TestCase):
     @classmethod
     def setUpClass(cls):
         """Configure and start QDR.A"""
-        super(LinkRouteProtocolTest, cls).setUpClass()
+        super(LinkRouteContainerTest, cls).setUpClass()
         config = [
             ('router', {'mode': 'standalone', 'id': 'QDR.A'}),
             # for client connections:
@@ -1482,7 +1482,7 @@ class LinkRouteProtocolTest(TestCase):
     def _fake_broker(self, cls):
         """Spawn a fake broker listening on the broker's connector
         """
-        fake_broker = cls(self.router.connector_addresses[0])
+        fake_broker = cls(self.router)
         # wait until the connection to the fake broker activates
         self.router.wait_connectors()
         return fake_broker
@@ -1495,27 +1495,64 @@ class LinkRouteProtocolTest(TestCase):
         for i in range(2):
             bconn = BlockingConnection(self.router.addresses[0])
             bsender = bconn.create_sender(address="org.apache",
-                                          options=AtLeastOnce())
+                                          options=AtMostOnce())
             msg = Message(body="Hey!")
             bsender.send(msg)
             bsender.close()
             bconn.close()
         killer.join()
 
+    def test_delete_on_close(self):
+        # create some deleteOnClose link routes:
+        mgmt = self.router.management
+        mgmt.create(type="org.apache.qpid.dispatch.router.config.linkRoute",
+                    name="testy-in",
+                    attributes={'prefix': 'testy',
+                                'containerId': 'FakeBroker',
+                                'direction': 'in',
+                                'deleteOnClose': True})
+        mgmt.create(type="org.apache.qpid.dispatch.router.config.linkRoute",
+                    name="testy-out",
+                    attributes={'prefix': 'testy',
+                                'containerId': 'FakeBroker',
+                                'direction': 'out',
+                                'deleteOnClose': True})
+        def count_link_routes():
+            rsp = mgmt.query(type="org.apache.qpid.dispatch.router.config.linkRoute")
+            total = count = 0
+            for r in rsp.iter_dicts():
+                total += 1
+                if r.get("deleteOnClose", False):
+                    count += 1
+            return total, count
+
+        self.assertEqual(4, count_link_routes()[0])
+        self.assertEqual(2, count_link_routes()[1])
+        broker = self._fake_broker(OnDeleteBroker)
+        broker.done.wait(timeout=TIMEOUT)
+        broker.join()
+        self.assertEqual(2, count_link_routes()[0])
+        self.assertEqual(0, count_link_routes()[1])
+
 
 class _FakeBroker(MessagingHandler):
-    """Base class for creating customized fake brokers
+    """Base class for creating customized fake brokers.  This class spawns a
+    broker thread that accepts incoming links and forwards based on the link
+    address
     """
-    def __init__(self, address):
+    def __init__(self, router):
         super(_FakeBroker, self).__init__()
-        self.address = address
+        self.router = router
+        self.address = router.connector_addresses[0]
         self.listener = None
         self._container = Container(self)
         self._container.container_id = 'FakeBroker'
+        self._stop_thread = False
+        self._connections = []
+        self._senders = {}
         self._thread = Thread(target=self._main)
         self._thread.daemon = True
         self._thread.start()
-        self._stop_thread = False
 
     def _main(self):
         self._container.timeout = 1.0
@@ -1523,10 +1560,17 @@ class _FakeBroker(MessagingHandler):
         while self._container.process():
             if self._stop_thread:
                 break
+        if self.listener:
+            self.listener.close()
+        for c in self._connections:
+            c.close()
+        while self._container.process():
+            pass
         self._container.stop()
         self._container.process()
 
     def join(self):
+        # stop thread cleanly
         self._stop_thread = True
         self._container.wakeup()
         self._thread.join(timeout=10)
@@ -1537,20 +1581,27 @@ class _FakeBroker(MessagingHandler):
         self.listener = event.container.listen(self.address)
 
     def on_connection_opening(self, event):
+        # remotely initiated connection, need to set my id
         pn_conn = event.connection
         pn_conn.container = self._container.container_id
 
+    def on_connection_opened(self, event):
+        self._connections.append(event.connection)
+
     def on_connection_closed(self, event):
-        if self.listener:
-            self.listener.close()
-            self.listener = None
+        self._connections.remove(event.connection)
 
     def on_link_opening(self, event):
-        # just copy the addresses and open the link
         event.link.target.address = event.link.remote_target.address
         event.link.source.address = event.link.remote_source.address
+        if event.link.is_sender:
+            self._senders[event.link.remote_source.address] = event.link
         event.link.open()
 
+    def on_message(self, event):
+        out_link = self._senders.get(event.link.remote_target.address)
+        out_link and out_link.send(event.message)
+
 
 class SessionKiller(_FakeBroker):
     """DISPATCH-1092: force a session close when the link closes.  This should
@@ -1561,5 +1612,44 @@ class SessionKiller(_FakeBroker):
         event.session.close()
 
 
+class OnDeleteBroker(_FakeBroker):
+    """Test the automatic deletion of linkRoute configuration entities
+    """
+    def __init__(self, router):
+        super(OnDeleteBroker, self).__init__(router)
+        self._router_address = router.addresses[0]
+        self._app_conn = None
+        self._app_sender = None
+        self._app_receiver = None
+        self._app_sent = False
+        self.done = Event()
+
+    def on_connection_opened(self, event):
+        super(OnDeleteBroker, self).on_connection_opened(event)
+        if self._app_conn is None:
+            # connector from router has been opened. Create application
+            # sender/receiver
+            self._app_conn = event.container.connect(self._router_address)
+            self._app_receiver = event.container.create_receiver(self._app_conn,
+                                                                 source="testy/one",
+                                                                 name="app-receiver")
+            self._app_sender = event.container.create_sender(self._app_conn,
+                                                             target="testy/one",
+                                                             name="app-sender")
+
+    def on_sendable(self, event):
+        if event.sender == self._app_sender and not self._app_sent:
+            self._app_sent = True
+            event.sender.send(Message(body="deleteOnClose"))
+        else:
+            super(OnDeleteBroker, self).on_sendable(event)
+
+    def on_message(self, event):
+        if event.receiver == self._app_receiver:
+            self.done.set()
+        else:
+            super(OnDeleteBroker, self).on_message(event)
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message