qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject [qpid-dispatch] branch master updated: DISPATCH-1310: refactor the receive handler code
Date Mon, 06 May 2019 11:01:57 GMT
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f0632a  DISPATCH-1310: refactor the receive handler code
2f0632a is described below

commit 2f0632a4c5005c657d664700d29114be31e3a97a
Author: Kenneth Giusti <kgiusti@apache.org>
AuthorDate: Wed Apr 17 13:14:36 2019 -0400

    DISPATCH-1310: refactor the receive handler code
    
    Refactor the rx handler to exit early if the message is being
    discarded or the delivery has already been handed to the core thread.
    
    Avoid calling the receive code entirely if the PN_DELIVERY being read
    does not contain the links current delivery.
    
    This closes #492
---
 src/container.c                   |   7 +-
 src/router_core/delivery.h        |   3 +-
 src/router_core/transfer.c        |   4 +-
 src/router_node.c                 | 171 +++++++++++++++++++++++---------------
 tests/system_tests_link_routes.py |  63 ++++++++++++++
 5 files changed, 172 insertions(+), 76 deletions(-)

diff --git a/src/container.c b/src/container.c
index 3b28eda..9c54cd8 100644
--- a/src/container.c
+++ b/src/container.c
@@ -177,9 +177,8 @@ static void handle_link_open(qd_container_t *container, pn_link_t *pn_link)
 }
 
 
-static void do_receive(pn_delivery_t *pnd)
+static void do_receive(pn_link_t *pn_link, pn_delivery_t *pnd)
 {
-    pn_link_t     *pn_link  = pn_delivery_link(pnd);
     qd_link_t     *link     = (qd_link_t*) pn_link_get_context(pn_link);
 
     if (link) {
@@ -642,8 +641,8 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
         delivery = pn_event_delivery(event);
         pn_link  = pn_event_link(event);
 
-        if (pn_link_is_receiver(pn_link))
-            do_receive(delivery);
+        if (pn_delivery_readable(delivery))
+            do_receive(pn_link, delivery);
 
         if (pn_delivery_updated(delivery) || pn_delivery_settled(delivery)) {
             do_updated(delivery);
diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h
index daeabb1..e3fd88b 100644
--- a/src/router_core/delivery.h
+++ b/src/router_core/delivery.h
@@ -22,6 +22,7 @@
 
 #include "router_core_private.h"
 
+#define QDR_DELIVERY_TAG_MAX 32
 
 typedef enum {
     QDR_DELIVERY_NOWHERE = 0,
@@ -49,7 +50,7 @@ struct qdr_delivery_t {
     bool                    settled;
     bool                    presettled;
     qdr_delivery_where_t    where;
-    uint8_t                 tag[32];
+    uint8_t                 tag[QDR_DELIVERY_TAG_MAX];
     int                     tag_length;
     qd_bitmask_t           *link_exclusion;
     qdr_address_t          *tracking_addr;
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index f27cbf1..cd6bc81 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -99,9 +99,6 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t
*
                                                 const uint8_t *tag, int tag_length,
                                                 uint64_t disposition, pn_data_t* disposition_data)
 {
-    if (tag_length > 32)
-        return 0;
-    
     qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
     qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
@@ -120,6 +117,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t
*
     action->args.connection.delivery = dlv;
     action->args.connection.more = !qd_message_receive_complete(msg);
     action->args.connection.tag_length = tag_length;
+    assert(tag_length <= QDR_DELIVERY_TAG_MAX);
     memcpy(action->args.connection.tag, tag, tag_length);
     qdr_action_enqueue(link->core, action);
     return dlv;
diff --git a/src/router_node.c b/src/router_node.c
index c898379..141ae63 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -305,23 +305,29 @@ static void log_link_message(qd_connection_t *conn, pn_link_t *pn_link,
qd_messa
 
 /**
  * Inbound Delivery Handler
+ *
+ * @return true if we've advanced to the next delivery on this link and it is
+ * ready for rx processing.  This will cause the container to immediately
+ * re-call this function with the next delivery.
  */
 static bool AMQP_rx_handler(void* context, qd_link_t *link)
 {
-    qd_router_t    *router       = (qd_router_t*) context;
-    pn_link_t      *pn_link      = qd_link_pn(link);
-    bool            next_delivery = false;
+    qd_router_t    *router  = (qd_router_t*) context;
+    pn_link_t      *pn_link = qd_link_pn(link);
+
     assert(pn_link);
 
     if (!pn_link)
-        return next_delivery;
+        return false;
 
-    pn_delivery_t  *pnd          = pn_link_current(pn_link);
+    // ensure the current delivery is readable
+    pn_delivery_t *pnd = pn_link_current(pn_link);
     if (!pnd)
-        return next_delivery;
-    qdr_link_t     *rlink        = (qdr_link_t*) qd_link_get_context(link);
-    qd_connection_t  *conn       = qd_link_connection(link);
-    qdr_delivery_t *delivery     = qdr_node_delivery_qdr_from_pn(pnd);
+        return false;
+
+    qd_connection_t  *conn   = qd_link_connection(link);
+    qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
+    bool       next_delivery = false;
 
     //
     // Receive the message into a local representation.
@@ -340,24 +346,53 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
 
         if (qdr_delivery_disposition(delivery) != 0)
             pn_delivery_update(pnd, qdr_delivery_disposition(delivery));
+    }
 
+    if (qd_message_is_discard(msg)) {
         //
-        // The entire message has been received but this message needs to be discarded
+        // Message has been marked for discard, no further processing necessary
         //
-        if (qd_message_is_discard(msg)) {
-            qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+        if (receive_complete) {
+            // note: expected that the code that set discard has handled
+            // setting disposition and updating flow!
             pn_delivery_settle(pnd);
-            return next_delivery;
+            if (delivery) {
+                // if delivery already exists then the core thread discarded this
+                // delivery, it will eventually free the qdr_delivery_t and its
+                // associated message - do not free it here.
+                qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
+            } else {
+                qd_message_free(msg);
+            }
         }
+        return next_delivery;
+    }
+
+    //
+    // If the delivery already exists we've already passed it to the core (2nd
+    // frame for a multi-frame transfer). Simply continue.
+    //
+
+    if (delivery) {
+        qdr_deliver_continue(router->router_core, delivery);
+        return next_delivery;
     }
 
     //
-    // If there's no router link, free the message and finish.  It's likely that the link
-    // is closing.
+    // No pre-existing delivery means we're starting a new delivery or
+    // continuing a delivery that has not accumulated enough of the message
+    // for forwarding.
     //
+
+    qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
     if (!rlink) {
-        if (receive_complete) // The entire message has been received but there is nowhere
to send it to, free it and do nothing.
+        // receive link was closed or deleted - can't be forwarded
+        // so no use setting disposition or adding flow
+        qd_message_set_discard(msg, true);
+        if (receive_complete) {
+            pn_delivery_settle(pnd);
             qd_message_free(msg);
+        }
         return next_delivery;
     }
 
@@ -366,31 +401,29 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     //
     if (qdr_link_is_routed(rlink)) {
         pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
-        //
-        // A delivery object was already available via pn_delivery_get_context. This means
a qdr_delivery was already created. Use it to continue delivery.
-        //
-        if (delivery) {
 
-            //
-            // Call continue only if the discard flag on the message is not set
-            // We should not continue processing the message after it has been discarded
-            //
-            if (!qd_message_is_discard(msg)) {
-                qdr_deliver_continue(router->router_core, delivery);
+        if (dtag.size > QDR_DELIVERY_TAG_MAX) {
+            qd_log(router->log_source, QD_LOG_DEBUG, "link route delivery failure: msg
tag size exceeded %zd (max=%d)",
+                   dtag.size, QDR_DELIVERY_TAG_MAX);
+            qd_message_set_discard(msg, true);
+            pn_link_flow(pn_link, 1);
+            pn_delivery_update(pnd, PN_REJECTED);
+            if (receive_complete) {
+                pn_delivery_settle(pnd);
+                qd_message_free(msg);
             }
-        }
-        else {
-            delivery = qdr_link_deliver_to_routed_link(rlink,
-                                                       msg,
-                                                       pn_delivery_settled(pnd),
-                                                       (uint8_t*) dtag.start,
-                                                       dtag.size,
-                                                       pn_disposition_type(pn_delivery_remote(pnd)),
-                                                       pn_disposition_data(pn_delivery_remote(pnd)));
-            qdr_node_connect_deliveries(link, delivery, pnd);
-            qdr_delivery_decref(router->router_core, delivery, "release protection of
return from deliver_to_routed_link");
+            return next_delivery;
         }
 
+        delivery = qdr_link_deliver_to_routed_link(rlink,
+                                                   msg,
+                                                   pn_delivery_settled(pnd),
+                                                   (uint8_t*) dtag.start,
+                                                   dtag.size,
+                                                   pn_disposition_type(pn_delivery_remote(pnd)),
+                                                   pn_disposition_data(pn_delivery_remote(pnd)));
+        qdr_node_connect_deliveries(link, delivery, pnd);
+        qdr_delivery_decref(router->router_core, delivery, "release protection of return
from deliver_to_routed_link");
         return next_delivery;
     }
 
@@ -432,30 +465,18 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     qd_message_depth_t  validation_depth = (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES
: QD_DEPTH_MESSAGE_ANNOTATIONS;
     bool                valid_message    = qd_message_check(msg, validation_depth);
 
-    if (!valid_message && receive_complete) {
-        //
-        // The entire message has been received and the message is still invalid.  Reject
the message.
-        //
-        qd_message_set_discard(msg, true);
-        pn_link_flow(pn_link, 1);
-        pn_delivery_update(pnd, PN_REJECTED);
-        pn_delivery_settle(pnd);
-        qd_message_free(msg);
-    }
-
     if (!valid_message) {
-        return next_delivery;
-    }
-
-    if (delivery) {
-        //
-        // Call continue only if the discard flag on the message is not set
-        // We should not continue processing the message after it has been discarded
-        //
-        if (!qd_message_is_discard(msg)) {
-            qdr_deliver_continue(router->router_core, delivery);
+        if (receive_complete) {
+            //
+            // The entire message has been received and the message is still invalid.  Reject
the message.
+            //
+            qd_message_set_discard(msg, true);
+            pn_link_flow(pn_link, 1);
+            pn_delivery_update(pnd, PN_REJECTED);
+            pn_delivery_settle(pnd);
+            qd_message_free(msg);
         }
-
+        // otherwise wait until more data arrives and re-try the validation
         return next_delivery;
     }
 
@@ -469,10 +490,13 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
                 if (!qd_iterator_equal(userid_iter, (const unsigned char *)conn->user_id))
{
                     // This message is rejected: attempted user proxy is disallowed
                     qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to
user_id proxy violation. User:%s", conn->user_id);
+                    qd_message_set_discard(msg, true);
                     pn_link_flow(pn_link, 1);
                     pn_delivery_update(pnd, PN_REJECTED);
-                    pn_delivery_settle(pnd);
-                    qd_message_free(msg);
+                    if (receive_complete) {
+                        pn_delivery_settle(pnd);
+                        qd_message_free(msg);
+                    }
                     qd_iterator_free(userid_iter);
                     return next_delivery;
                 }
@@ -495,12 +519,14 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     // destinations via shorter paths.
     //
     if (distance > (router->topology_radius + 1)) {
+        qd_bitmask_free(link_exclusions);
         qd_message_set_discard(msg, true);
         pn_link_flow(pn_link, 1);
         pn_delivery_update(pnd, PN_RELEASED);
-        pn_delivery_settle(pnd);
-        qd_message_free(msg);
-        qd_bitmask_free(link_exclusions);
+        if (receive_complete) {
+            pn_delivery_settle(pnd);
+            qd_message_free(msg);
+        }
         return next_delivery;
     }
 
@@ -546,10 +572,13 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
             } else {
                 //reject
                 qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due to policy
violation on target. User:%s", conn->user_id);
+                qd_message_set_discard(msg, true);
                 pn_link_flow(pn_link, 1);
                 pn_delivery_update(pnd, PN_REJECTED);
-                pn_delivery_settle(pnd);
-                qd_message_free(msg);
+                if (receive_complete) {
+                    pn_delivery_settle(pnd);
+                    qd_message_free(msg);
+                }
                 qd_iterator_free(addr_iter);
                 qd_bitmask_free(link_exclusions);
                 return next_delivery;
@@ -589,6 +618,10 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions,
ingress_index);
     }
 
+    //
+    // End of new delivery processing
+    //
+
     if (delivery) {
         qdr_node_connect_deliveries(link, delivery, pnd);
         qdr_delivery_decref(router->router_core, delivery, "release protection of return
from deliver");
@@ -600,8 +633,10 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         qd_message_set_discard(msg, true);
         pn_link_flow(pn_link, 1);
         pn_delivery_update(pnd, PN_REJECTED);
-        pn_delivery_settle(pnd);
-        qd_message_free(msg);
+        if (receive_complete) {
+            pn_delivery_settle(pnd);
+            qd_message_free(msg);
+        }
     }
 
     return next_delivery;
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index dd1631d..2e5e6b0 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -34,6 +34,7 @@ from system_test import QdManager
 from system_test import MgmtMsgProxy
 from test_broker import FakeBroker
 
+from proton import Delivery
 from proton import Message
 from proton.handlers import MessagingHandler
 from proton.reactor import AtMostOnce, Container, DynamicNodeProperties, LinkOption, AtLeastOnce
@@ -666,6 +667,11 @@ class LinkRouteTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
+    def test_yyy_invalid_delivery_tag(self):
+        test = InvalidTagTest(self.routers[2].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+
     def test_close_with_unsettled(self):
         test = CloseWithUnsettledTest(self.routers[1].addresses[0], self.routers[1].addresses[1])
         test.run()
@@ -1990,5 +1996,62 @@ class ConnLinkRouteMgmtProxy(object):
         return _func
 
 
+class InvalidTagTest(MessagingHandler):
+    """Verify that a message with an invalid tag length is rejected
+    """
+    def __init__(self, router_addr):
+        super(InvalidTagTest, self).__init__(auto_accept=False, auto_settle=False)
+        self.test_conn = None
+        self.test_address = router_addr
+        self.tx_ct = 0;
+        self.accept_ct = 0;
+        self.reject_ct = 0;
+        self.error = None
+
+    def timeout(self):
+        self.error = "Timeout expired: sent=%d rcvd=%d" % (self.tx_ct,
+                                                           self.accept_ct
+                                                           + self.reject_ct)
+        if self.test_conn:
+            self.test_conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.test_conn = event.container.connect(self.test_address)
+        rx = event.container.create_receiver(self.test_conn, "org.apache.foo")
+
+    def on_link_opened(self, event):
+        if event.receiver:
+            event.receiver.flow(100)
+            event.container.create_sender(event.connection, "org.apache.foo")
+
+    def on_sendable(self, event):
+        if self.tx_ct < 10:
+            self.tx_ct += 1
+            if self.tx_ct == 5:
+                event.sender.send(Message(body="YO"), tag=str("X" * 64))
+            else:
+                event.sender.send(Message(body="YO"), tag=str("BLAH%d" %
+                                                              self.tx_ct))
+
+    def on_accepted(self, event):
+        self.accept_ct += 1
+        event.delivery.settle()
+        if self.accept_ct == 9 and self.reject_ct == 1:
+            event.connection.close()
+            self.timer.cancel()
+
+    def on_rejected(self, event):
+        self.reject_ct += 1
+        event.delivery.settle()
+
+    def on_message(self, event):
+        event.delivery.update(Delivery.ACCEPTED)
+        event.delivery.settle()
+
+    def run(self):
+        Container(self).run()
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message