qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1551 - Assure that differential MAUs are expected before acc… (#667)
Date Mon, 20 Jan 2020 19:12:07 GMT
This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new c8a292f  DISPATCH-1551 - Assure that differential MAUs are expected before acc…
(#667)
c8a292f is described below

commit c8a292fd96612b491f9fc38419315ef65696b579
Author: Ted Ross <tross@redhat.com>
AuthorDate: Mon Jan 20 14:11:55 2020 -0500

    DISPATCH-1551 - Assure that differential MAUs are expected before acc… (#667)
    
    * DISPATCH-1551 - Assure that differential MAUs are expected before accepting them.
    
    * DISPATCH-1551 - Clarified the usage of the addr_sync mask bits.
    Made another address unique in the link-routes test set.
---
 src/router_core/modules/mobile_sync/mobile.c | 65 ++++++++++++++++++----------
 src/router_core/router_core_private.h        |  1 +
 tests/system_tests_link_routes.py            |  4 +-
 3 files changed, 44 insertions(+), 26 deletions(-)

diff --git a/src/router_core/modules/mobile_sync/mobile.c b/src/router_core/modules/mobile_sync/mobile.c
index 5055f2f..1a95053 100644
--- a/src/router_core/modules/mobile_sync/mobile.c
+++ b/src/router_core/modules/mobile_sync/mobile.c
@@ -44,12 +44,17 @@ static const char *EXIST      = "exist";
 static const char *HAVE_SEQ   = "have_seq";
 
 //
-// Address.sync_mask bit values
+// qdr_address_t.sync_mask bit values
 //
-#define ADDR_SYNC_IN_ADD_LIST           0x00000001
-#define ADDR_SYNC_IN_DEL_LIST           0x00000002
-#define ADDR_SYNC_TO_BE_DELETED         0x00000004
-#define ADDR_SYNC_MOBILE_TRACKING       0x00000008
+#define ADDR_SYNC_ADDRESS_IN_ADD_LIST     0x00000001
+#define ADDR_SYNC_ADDRESS_IN_DEL_LIST     0x00000002
+#define ADDR_SYNC_ADDRESS_TO_BE_DELETED   0x00000004
+#define ADDR_SYNC_ADDRESS_MOBILE_TRACKING 0x00000008
+
+//
+// qdr_node_t.sync_mask bit values
+//
+#define ADDR_SYNC_ROUTER_MA_REQUESTED          0x00000001
 
 #define BIT_SET(M,B)   M |= B
 #define BIT_CLEAR(M,B) M &= ~B
@@ -67,6 +72,7 @@ typedef struct {
     qdr_address_list_t         deleted_addrs;
 } qdrm_mobile_sync_t;
 
+static void qcm_mobile_sync_on_router_advanced_CT(qdrm_mobile_sync_t *msync, qdr_node_t *router);
 
 //================================================================================
 // Helper Functions
@@ -118,7 +124,7 @@ qdr_node_t *qdc_mobile_sync_router_by_id(qdrm_mobile_sync_t *msync, qd_parsed_fi
  */
 static void qcm_mobile_sync_start_tracking(qdr_address_t *addr)
 {
-    BIT_SET(addr->sync_mask, ADDR_SYNC_MOBILE_TRACKING);
+    BIT_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_MOBILE_TRACKING);
     addr->ref_count++;
 }
 
@@ -129,7 +135,7 @@ static void qcm_mobile_sync_start_tracking(qdr_address_t *addr)
  */
 static void qcm_mobile_sync_stop_tracking(qdr_core_t *core, qdr_address_t *addr)
 {
-    BIT_CLEAR(addr->sync_mask, ADDR_SYNC_MOBILE_TRACKING);
+    BIT_CLEAR(addr->sync_mask, ADDR_SYNC_ADDRESS_MOBILE_TRACKING);
     if (--addr->ref_count == 0)
         qdr_check_addr_CT(core, addr);
 }
@@ -170,10 +176,10 @@ static void qcm_mobile_sync_compose_diff_addr_list(qdrm_mobile_sync_t
*msync, qd
         qd_compose_insert_string(field, hash_key);
         if (is_added) {
             DEQ_REMOVE_HEAD_N(SYNC_ADD, *list);
-            BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST);
+            BIT_CLEAR(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_ADD_LIST);
         } else {
             DEQ_REMOVE_HEAD_N(SYNC_DEL, *list);
-            BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST);
+            BIT_CLEAR(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_DEL_LIST);
             qcm_mobile_sync_stop_tracking(msync->core, addr);
         }
         addr = DEQ_HEAD(*list);
@@ -285,8 +291,8 @@ static qd_message_t *qcm_mobile_sync_compose_absolute_mau(qdrm_mobile_sync_t
*ms
         //
         if (qcm_mobile_sync_addr_is_mobile(addr)
             && ((DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->conns) >
0 || !!addr->exchange)
-                || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST))
-            && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) {
+                || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_DEL_LIST))
+            && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_ADD_LIST)) {
             const char *hash_key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
             qd_compose_insert_string(body, hash_key);
         }
@@ -303,8 +309,8 @@ static qd_message_t *qcm_mobile_sync_compose_absolute_mau(qdrm_mobile_sync_t
*ms
         //
         if (qcm_mobile_sync_addr_is_mobile(addr)
             && ((DEQ_SIZE(addr->rlinks) > 0 || DEQ_SIZE(addr->conns) >
0 || !!addr->exchange)
-                || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST))
-            && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST))
+                || BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_DEL_LIST))
+            && !BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_ADD_LIST))
             qd_compose_insert_int(body, addr->treatment);
         addr = DEQ_NEXT(addr);
     }
@@ -474,8 +480,19 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
             }
 
             //
+            // If this is a differential MAU and it doesn't represent the next expected
+            // update, treat this like a sequence-advance and send a MAR
+            //
+            if (!exist_field && router->mobile_seq != mobile_seq - 1 &&
!BIT_IS_SET(router->sync_mask, ADDR_SYNC_ROUTER_MA_REQUESTED)) {
+                qcm_mobile_sync_on_router_advanced_CT(msync, router);
+                BIT_SET(router->sync_mask, ADDR_SYNC_ROUTER_MA_REQUESTED);
+                return;
+            }
+
+            //
             // Record the new mobile sequence for the remote router.
             //
+            BIT_CLEAR(router->sync_mask, ADDR_SYNC_ROUTER_MA_REQUESTED);
             router->mobile_seq = mobile_seq;
 
             //
@@ -502,7 +519,7 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
                 addr = DEQ_HEAD(msync->core->addrs);
                 while (!!addr) {
                     if (qcm_mobile_sync_addr_is_mobile(addr) && !!qd_bitmask_value(addr->rnodes,
router->mask_bit))
-                        BIT_SET(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED);
+                        BIT_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_TO_BE_DELETED);
                     addr = DEQ_NEXT(addr);
                 }
             }
@@ -550,7 +567,7 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
                         }
                     }
 
-                    BIT_CLEAR(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED);
+                    BIT_CLEAR(addr->sync_mask, ADDR_SYNC_ADDRESS_TO_BE_DELETED);
                     if (!qd_bitmask_value(addr->rnodes, router->mask_bit)) {
                         qd_bitmask_set_bit(addr->rnodes, router->mask_bit);
                         router->ref_count++;
@@ -613,7 +630,7 @@ static void qcm_mobile_sync_on_mau_CT(qdrm_mobile_sync_t *msync, qd_parsed_field
                     qdr_address_t *next_addr = DEQ_NEXT(addr);
                     if (qcm_mobile_sync_addr_is_mobile(addr)
                         && !!qd_bitmask_value(addr->rnodes, router->mask_bit)
-                        && BIT_IS_SET(addr->sync_mask, ADDR_SYNC_TO_BE_DELETED))
{
+                        && BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_TO_BE_DELETED))
{
                         qd_bitmask_clear_bit(addr->rnodes, router->mask_bit);
                         router->ref_count--;
                         addr->cost_epoch--;
@@ -682,18 +699,18 @@ static void qcm_mobile_sync_on_became_local_dest_CT(qdrm_mobile_sync_t
*msync, q
 
     qd_log(msync->log, QD_LOG_DEBUG, "Became Local Dest: %s", (const char*) qd_hash_key_by_handle(addr->hash_handle));
 
-    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST))
+    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_ADD_LIST))
         return;
 
-    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST)) {
+    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_DEL_LIST)) {
         //
         // If the address was deleted since the last update, simply forget that it was deleted.
         //
         DEQ_REMOVE_N(SYNC_DEL, msync->deleted_addrs, addr);
-        BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST);
+        BIT_CLEAR(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_DEL_LIST);
     } else {
         DEQ_INSERT_TAIL_N(SYNC_ADD, msync->added_addrs, addr);
-        BIT_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST);
+        BIT_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_ADD_LIST);
         qcm_mobile_sync_start_tracking(addr);
     }
 }
@@ -706,19 +723,19 @@ static void qcm_mobile_sync_on_no_longer_local_dest_CT(qdrm_mobile_sync_t
*msync
 
     qd_log(msync->log, QD_LOG_DEBUG, "No Longer Local Dest: %s", (const char*) qd_hash_key_by_handle(addr->hash_handle));
 
-    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST))
+    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_DEL_LIST))
         return;
 
-    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST)) {
+    if (BIT_IS_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_ADD_LIST)) {
         //
         // If the address was added since the last update, simply forget that it was added.
         //
         DEQ_REMOVE_N(SYNC_ADD, msync->added_addrs, addr);
-        BIT_CLEAR(addr->sync_mask, ADDR_SYNC_IN_ADD_LIST);
+        BIT_CLEAR(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_ADD_LIST);
         qcm_mobile_sync_stop_tracking(msync->core, addr);
     } else {
         DEQ_INSERT_TAIL_N(SYNC_DEL, msync->deleted_addrs, addr);
-        BIT_SET(addr->sync_mask, ADDR_SYNC_IN_DEL_LIST);
+        BIT_SET(addr->sync_mask, ADDR_SYNC_ADDRESS_IN_DEL_LIST);
     }
 }
 
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 77abf9e..3c68b59 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -351,6 +351,7 @@ struct qdr_node_t {
     int               cost;
     uint64_t          mobile_seq;
     char             *wire_address_ma;    ///< The address of this router's mobile-sync
agent in non-hashed form
+    uint32_t          sync_mask;          ///< Bitmask for mobile-address-sync
 };
 
 DEQ_DECLARE(qdr_node_t, qdr_node_list_t);
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 5730340..fea6c3a 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -532,12 +532,12 @@ class LinkRouteTest(TestCase):
         blocking_connection = BlockingConnection(addr)
 
         # Receive on org.apache
-        blocking_receiver = blocking_connection.create_receiver(address="org.apache")
+        blocking_receiver = blocking_connection.create_receiver(address="org.apache.2")
 
         apply_options = AtMostOnce()
 
         # Sender to  to org.apache
-        blocking_sender = blocking_connection.create_sender(address="org.apache", options=apply_options)
+        blocking_sender = blocking_connection.create_sender(address="org.apache.2", options=apply_options)
         msg = Message(body=hello_world_3)
         annotations = {'custom-annotation': '1/Custom_Annotation'}
         msg.annotations = annotations


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message