qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gmur...@apache.org
Subject qpid-dispatch git commit: DISPATCH-1008 - Back out previous change that was storing every failover url obtained from every connection. Modified the code to wipe out the failover urls obtained from the previous connection if the current connection returne
Date Mon, 06 Aug 2018 20:18:12 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/master d28002a49 -> 83f5d524a


DISPATCH-1008 - Back out previous change that was storing every failover url obtained from
every connection. Modified the code to wipe out the failover urls obtained from the previous
connection if the current connection returned an empty list for failover urls. This closes
#348.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/83f5d524
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/83f5d524
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/83f5d524

Branch: refs/heads/master
Commit: 83f5d524a63dec84f648a8afa126f794cbcafccb
Parents: d28002a
Author: Ganesh Murthy <gmurthy@redhat.com>
Authored: Thu Jul 12 14:56:10 2018 -0400
Committer: Ganesh Murthy <gmurthy@redhat.com>
Committed: Mon Aug 6 16:11:22 2018 -0400

----------------------------------------------------------------------
 CMakeLists.txt                        |   1 +
 src/router_node.c                     |  73 +++++++++++++++++--
 tests/failoverserver.py.in            |  72 +++++++++++++++++++
 tests/system_tests_handle_failover.py | 111 ++++++++++++++++++++++-------
 4 files changed, 228 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/83f5d524/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a6620a4..17dcb79 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -241,6 +241,7 @@ add_subdirectory(src) # Build src first so other subdirs can use QPID_DISPATCH_L
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run.py.in ${CMAKE_CURRENT_BINARY_DIR}/run.py)
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/run.py.in ${CMAKE_CURRENT_BINARY_DIR}/tests/run.py)
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/tests/authservice.py.in ${CMAKE_CURRENT_SOURCE_DIR}/tests/authservice.py)
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/tests/failoverserver.py.in ${CMAKE_CURRENT_SOURCE_DIR}/tests/failoverserver.py)
 execute_process(COMMAND ${RUN} --sh OUTPUT_FILE config.sh)
 
 if (NOT UNITTEST2_MISSING)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/83f5d524/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index dbc7b59..8c97b8b 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -807,6 +807,61 @@ static void bind_connection_context(qdr_connection_t *qdrc, void* token)
     qdr_connection_set_context(qdrc, conn);
 }
 
+
+static void save_original_and_current_conn_info(qd_connection_t *conn)
+{
+    // The failover list is present but it is empty. We will wipe the old backup information
from the failover list.
+    // The only items we want to keep in this list is the original connection information
(from the config file)
+    // and the current connection information.
+
+    if (conn->connector && DEQ_SIZE(conn->connector->conn_info_list) >
1) {
+        // Here we are simply removing all other failover information except the original
connection information and the one we used to make a successful connection.
+        int i = 1;
+        qd_failover_item_t *item = DEQ_HEAD(conn->connector->conn_info_list);
+        qd_failover_item_t *next_item = 0;
+
+        bool match_found = false;
+        int dec_conn_index=0;
+
+        while(item) {
+
+            //The first item on this list is always the original connector, so we want to
keep that item in place
+            // We have to delete items in the list that were left over from the previous
failover list from the previous connection
+            // because the new connection might have its own failover list.
+            if (i != conn->connector->conn_index) {
+                if (item != DEQ_HEAD(conn->connector->conn_info_list)) {
+                    next_item = DEQ_NEXT(item);
+                    free(item->scheme);
+                    free(item->host);
+                    free(item->port);
+                    free(item->hostname);
+                    free(item->host_port);
+
+                    DEQ_REMOVE(conn->connector->conn_info_list, item);
+
+                    free(item);
+                    item = next_item;
+
+                    // We are removing an item from the list before the conn_index match
was found. We need to
+                    // decrement the conn_index
+                    if (!match_found)
+                        dec_conn_index += 1;
+                }
+                else {
+                    item = DEQ_NEXT(item);
+                }
+            }
+            else {
+                match_found = true;
+                item = DEQ_NEXT(item);
+            }
+            i += 1;
+        }
+
+        conn->connector->conn_index -= dec_conn_index;
+    }
+}
+
 static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool inbound)
 {
     qdr_connection_role_t  role = 0;
@@ -899,11 +954,14 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t
*conn, bool
             pn_data_enter(props);
 
             //
-            // We are attempting to find a connection property called failover-server-list.
-            // which looks something like this
+            // We are attempting to find a connection property called failover-server-list
which is a list of failover host names and ports..
+            // failover-server-list looks something like this
             //      :"failover-server-list"=[{:"network-host"="some-host", :port="35000"},
{:"network-host"="localhost", :port="25000"}]
-            // Note that the failover-list can contain one or more maps that contain failover
connection information.
-            // In the following code, we are trying to get the contents of each map into
the qd_failover_item_t object.
+            // There are three cases here -
+            // 1. The failover-server-list is present but the content of the list is empty
in which case we scrub the failover list except we keep the original connector information
and current connection information.
+            // 2. If the failover list contains one or more maps that contain failover connection
information, that information will be appended to the list which already contains the original
connection information
+            //    and the current connection information. Any other failover information
left over from the previous connection is deleted
+            // 3. If the failover-server-list is not present at all in the connection properties,
the failover list we maintain in untoched.
             //
             while (pn_data_next(props)) {
                 if (pn_data_type(props) == PN_SYMBOL) {
@@ -918,6 +976,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn,
bool
 
                     if (list_num_items > 0) {
 
+                        save_original_and_current_conn_info(conn);
+
                         pn_data_enter(props); // enter list
 
                         for (int i=0; i < list_num_items; i++) {
@@ -1007,6 +1067,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t
*conn, bool
                                         DEQ_INSERT_TAIL(conn->connector->conn_info_list,
item);
                                         qd_log(router->log_source, QD_LOG_DEBUG, "Added
%s as backup host", item->host_port);
                                     }
+
                                 }
                                 else {
                                         free(item->scheme);
@@ -1020,6 +1081,10 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t
*conn, bool
                             pn_data_exit(props);
                         }
                     } // list_num_items > 0
+                    else {
+                        save_original_and_current_conn_info(conn);
+
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/83f5d524/tests/failoverserver.py.in
----------------------------------------------------------------------
diff --git a/tests/failoverserver.py.in b/tests/failoverserver.py.in
new file mode 100644
index 0000000..b973a13
--- /dev/null
+++ b/tests/failoverserver.py.in
@@ -0,0 +1,72 @@
+#!/usr/bin/env ${PY_STRING}
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+import signal, optparse, sys
+from proton.handlers import MessagingHandler
+from proton.reactor import Container
+from proton import symbol
+
+
+class FailoverServer(MessagingHandler):
+    def __init__(self, address):
+        super(FailoverServer, self).__init__()
+        self.listener = None
+        self.address = address
+
+    def on_start(self, event):
+        self.listener = event.container.listen(self.address)
+
+    def stop(self):
+        if self.listener:
+            self.listener.close()
+
+    def on_connection_opening(self, event):
+        # Sends an empty failover list.
+        # This will test the case where we deliberately send an empty failover list so that
the router
+        # receiving this open frame will clean out its failover list.
+        event.connection.properties = {
+            symbol('failover-server-list'): []
+        }
+
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+                               description="Testing Router failover support")
+
+parser.add_option("-a", "--address", default="localhost:55671",
+                  help="address to listen on (default %default)")
+
+opts, args = parser.parse_args()
+
+handler = FailoverServer(opts.address)
+
+
+def sigterm_handler(_signo, _stack_frame):
+    sys.exit(0)
+
+
+signal.signal(signal.SIGTERM, sigterm_handler)
+
+Container(handler).run()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/83f5d524/tests/system_tests_handle_failover.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_handle_failover.py b/tests/system_tests_handle_failover.py
index 8ff6311..9020e7d 100644
--- a/tests/system_tests_handle_failover.py
+++ b/tests/system_tests_handle_failover.py
@@ -22,6 +22,7 @@ from __future__ import division
 from __future__ import absolute_import
 from __future__ import print_function
 
+import os
 from threading import Timer
 import unittest2 as unittest
 import json, re
@@ -49,6 +50,7 @@ class FailoverTest(TestCase):
         cls.inter_router_port_1 = cls.tester.get_port()
         cls.backup_port = cls.tester.get_port()
         cls.backup_url = 'amqp://0.0.0.0:' + str(cls.backup_port)
+        cls.my_server_port = cls.tester.get_port()
 
         cls.failover_list = 'amqp://third-host:5671, ' + cls.backup_url
 
@@ -126,15 +128,19 @@ class FailoverTest(TestCase):
 
     def test_1_connector_has_failover_list(self):
         """
-        Makes a qdmanage connector query and checks if Router A is storing the failover information
received from
-        Router B.
-        :return:
+        This is the most simple and straightforward case. Router A connects to Router B.
Router B sends
+        failover information to Router A.
+        We make a qdmanage connector query to Router A which checks if Router A is storing
the failover information
+        received from  Router B.The failover list must consist of the original connection
info (from the connector)
+        followed by the two items sent by the Router B (stored in cls.failover_list)
+        The 'failoverUrls' is comma separated.
         """
         long_type = 'org.apache.qpid.dispatch.connector'
         query_command = 'QUERY --type=' + long_type
         output = json.loads(self.run_qdmanage(query_command))
-        self.assertEqual("amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", "
+ FailoverTest.failover_list,
-                         output[0]['failoverUrls'])
+        expected = "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", " + FailoverTest.failover_list
+
+        self.assertEqual(expected, output[0]['failoverUrls'])
 
     def schedule_B_to_C_failover_test(self):
         if self.attempts < self.max_attempts:
@@ -143,10 +149,6 @@ class FailoverTest(TestCase):
                 self.attempts += 1
 
     def check_C_connector(self):
-        # Router A should now try to connect to Router C. Router C does NOT have failoverUrls.
-        # Query Router A which previously had failoverUrls in its connector (because Router
B sent it failoverUrls)
-        # does not have it anymore.
-        # Further wait for the connection from Router A to show up in Router C's management
stack
         long_type = 'org.apache.qpid.dispatch.connector'
         query_command = 'QUERY --type=' + long_type
         output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0]))
@@ -154,16 +156,10 @@ class FailoverTest(TestCase):
         expected = FailoverTest.backup_url  + ", " + "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port)
\
                    + ", " + "amqp://third-host:5671"
 
-        if output[0].get('failoverUrls') != expected:
-            self.schedule_B_to_C_failover_test()
+        if output[0].get('failoverUrls') == expected:
+            self.success = True
         else:
-            # Router A now sees the proper failover list when connected to Router C
-            # Stall until an inter-router connection shows up in Router C's status
-            outs = self.run_qdstat(['--connections'], address=self.routers[2].addresses[1])
-            if not "inter-router" in outs:
-                self.schedule_B_to_C_failover_test()
-            else:
-                self.success = True
+            self.schedule_B_to_C_failover_test()
 
     def can_terminate(self):
         if self.attempts == self.max_attempts:
@@ -175,6 +171,13 @@ class FailoverTest(TestCase):
         return False
 
     def test_2_remove_router_B(self):
+        """
+        In this test, we are killing Router B. As a result, Router A should try to connect
to Router C.
+        Router C does NOT have a failover list, so the open frame that Router C sends to
Router A will not contain
+        the failover-server-list property..Hence the failoverUrls list will remain unchanged
except that the order of
+        the URLs would be different.
+        """
+
         # First make sure there are no inter-router connections on router C
         outs = self.run_qdstat(['--connections'], address=self.routers[2].addresses[1])
 
@@ -193,9 +196,6 @@ class FailoverTest(TestCase):
 
         self.assertTrue(self.success)
 
-        # Since router B has been killed, router A should now try to connect to a listener
on router C.
-        # Use qdstat to connect to router C and determine that there is an inter-router connection
with router A.
-        self.run_qdstat(['--connections'], regexp=r'A.*inter-router.*', address=self.routers[2].addresses[1])
 
     def schedule_C_to_B_failover_test(self):
         if self.attempts < self.max_attempts:
@@ -209,7 +209,12 @@ class FailoverTest(TestCase):
         query_command = 'QUERY --type=' + long_type
         output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0]))
 
-        expected = "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", " + FailoverTest.failover_list
+        # The order that the URLs appear in the failoverUrls is important. This is the order
in which the router
+        # will attempt to make connections in case the existing connection goes down.
+
+        expected = "amqp://127.0.0.1:" + str(FailoverTest.inter_router_port) + ", " + \
+                   FailoverTest.failover_list + \
+                   ', amqp://127.0.0.1:%d' % FailoverTest.my_server_port
 
         if output[0].get('failoverUrls') == expected:
             self.success = True
@@ -217,13 +222,21 @@ class FailoverTest(TestCase):
             self.schedule_C_to_B_failover_test()
 
     def test_3_reinstate_router_B(self):
+        """
+        In this test, we are restarting Router B and killing Router C. Router A should now
try to connect back to
+        Router B since it maintains the original connection info to Router B from the connector
config information.
+        Before starting Router B back again, we
+        have a small config change to Router B  wherein we are adding a new failover url
to the original list.
+        This new failover url
+        points to our own server which will accept connections. This server will actually
be used in the next test
+        but this test maskes sure that the new server url also shows up in the failoverUrls
list.
+        """
         FailoverTest.router('B', [
                         ('router', {'mode': 'interior', 'id': 'B'}),
                         ('listener', {'host': '0.0.0.0', 'role': 'inter-router', 'port':
FailoverTest.inter_router_port,
-                                      'failoverUrls': FailoverTest.failover_list}),
+                                      'failoverUrls': FailoverTest.failover_list +  ', amqp://127.0.0.1:%d'
% FailoverTest.my_server_port}),
                         ('listener', {'host': '0.0.0.0', 'role': 'normal', 'port': FailoverTest.tester.get_port()}),
-                        ]
-              )
+                        ])
 
         FailoverTest.routers[3].wait_ready()
 
@@ -243,6 +256,54 @@ class FailoverTest(TestCase):
 
         self.assertTrue(self.success)
 
+    def check_A_connector(self):
+        # Router A should now try to connect to Router B again since we killed Router C.
+        long_type = 'org.apache.qpid.dispatch.connector'
+        query_command = 'QUERY --type=' + long_type
+        output = json.loads(self.run_qdmanage(query_command, address=self.routers[1].addresses[0]))
+
+        # The order that the URLs appear in the failoverUrls is important. This is the order
in which the router
+        # will attempt to make connections in case the existing connection goes down.
+        expected = 'amqp://127.0.0.1:%d' % FailoverTest.my_server_port + ", " + "amqp://127.0.0.1:"
+ str(FailoverTest.inter_router_port)
+
+        if output[0].get('failoverUrls') == expected:
+            self.success = True
+        else:
+            self.schedule_B_to_my_server_failover_test()
+
+    def schedule_B_to_my_server_failover_test(self):
+        if self.attempts < self.max_attempts:
+            if not self.success:
+                Timer(self.timer_delay, self.check_A_connector).start()
+                self.attempts += 1
+
+    def test_4_remove_router_B_connect_to_my_server(self):
+        """
+        This test kills Router B again and makes sure that Router A now connects to our custom
server that
+        accepts connections. This custom server intentionally sends an empty list for failover-server-list
+        Router A must look at this empty list and wipe out all failover information except
the original connector information
+        and the current connection info.
+        """
+
+
+        # Start MyServer
+        proc = FailoverTest.tester.popen(
+            ['/usr/bin/env', 'python', os.path.join(os.path.dirname(os.path.abspath(__file__)),
'failoverserver.py'), '-a',
+             'amqp://127.0.0.1:%d' % FailoverTest.my_server_port], expect=Process.RUNNING)
+
+        # Kill the router B again
+        FailoverTest.routers[3].teardown()
+
+        self.success = False
+        self.attempts = 0
+
+        self.schedule_B_to_my_server_failover_test()
+
+        while not self.can_terminate():
+            pass
+
+        self.assertTrue(self.success)
+
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message