qpid-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ted-ross <...@git.apache.org>
Subject [GitHub] qpid-dispatch pull request #410: DISPATCH-1160 - Added edge address tracking...
Date Wed, 31 Oct 2018 17:22:18 GMT
Github user ted-ross commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/410#discussion_r229790940
  
    --- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c ---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +#include <qpid/dispatch/ctools.h>
    +#include <qpid/dispatch/amqp.h>
    +#include "module.h"
    +#include "core_link_endpoint.h"
    +#include "edge_addr_tracking.h"
    +#include <stdio.h>
    +
    +
    +struct qdr_addr_endpoint_state_t {
    +    DEQ_LINKS(qdr_addr_endpoint_state_t);
    +    qdrc_endpoint_t                    *endpoint;
    +    qdr_connection_t                   *conn;    // The connection associated with the
endpoint.
    +    qdr_addr_tracking_module_context_t *mc;
    +};
    +
    +DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
    +ALLOC_DECLARE(qdr_addr_endpoint_state_t);
    +ALLOC_DEFINE(qdr_addr_endpoint_state_t);
    +
    +struct  qdr_addr_tracking_module_context_t {
    +    qdr_core_t                     *core;
    +    qdr_addr_endpoint_state_list_t  endpoint_state_list;
    +    qdrc_event_subscription_t      *event_sub;
    +    qdrc_endpoint_desc_t           addr_tracking_endpoint;
    +};
    +
    +
    +static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t   *addr,
bool insert_addr)
    +{
    +    qd_message_t *msg = qd_message();
    +
    +    //
    +    // Start header
    +    //
    +    qd_composed_field_t *fld   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
    +    qd_compose_start_list(fld);
    +    qd_compose_insert_bool(fld, 0);     // durable
    +    qd_compose_end_list(fld);
    +
    +    qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
    +
    +    qd_compose_start_list(body);
    +
    +    const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle);
    +
    +    qd_compose_insert_string(body, addr_str);
    +    qd_compose_insert_bool(body, insert_addr);
    +    qd_compose_end_list(body);
    +
    +    // Finally, compose and retuen the message so it can be sent out.
    +    qd_message_compose_3(msg, fld, body);
    +
    +    return msg;
    +}
    +
    +static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t
 endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
    +    while(endpoint_state) {
    +        if (endpoint_state->conn == conn) {
    +            return endpoint_state;
    +        }
    +        endpoint_state = DEQ_NEXT(endpoint_state);
    +    }
    +    return 0;
    +}
    +
    +
    +static void qdrc_address_endpoint_first_attach(void              *bind_context,
    +                                               qdrc_endpoint_t   *endpoint,
    +                                               void             **link_context,
    +                                               qdr_terminus_t   *remote_source,
    +                                               qdr_terminus_t   *remote_target)
    +{
    +    qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context;
    +
    +    qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t();
    +
    +    ZERO(endpoint_state);
    +    endpoint_state->endpoint = endpoint;
    +    endpoint_state->mc       = bc;
    +    endpoint_state->conn     = qdrc_endpoint_get_connection_CT(endpoint);
    +
    +    DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
    +
    +
    +    //
    +    // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created
only if this is a receiver link
    +    // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection.
    +    //
    +    if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role
== QDR_ROLE_EDGE_CONNECTION) {
    +        *link_context = endpoint_state;
    +        qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target);
    +   }
    +    else {
    +        //
    +        // We simply detach any links that dont match the above condition.
    +        //
    +        *link_context = 0;
    +        qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
    +    }
    +}
    +
    +
    +static void qdrc_address_endpoint_on_first_detach(void *link_context,
    +                                              qdr_error_t *error)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state  = (qdr_addr_endpoint_state_t *)link_context;
    +    qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint,
0);
    +    qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
    +    DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
    +    free_qdr_addr_endpoint_state_t(endpoint_state);
    +}
    +
    +
    +static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
    +{
    +    if (!addr)
    +        return false;
    +
    +    bool can_send = false;
    +    if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) >
0) {
    +        // There is at least one receiver for this address somewhere in the router network
    +        can_send = true;
    +    }
    +    if (!can_send) {
    +        if (DEQ_SIZE(addr->rlinks) == 1) {
    +            qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
    +            if (link_ref->link->conn != conn)
    +                can_send=true;
    +        }
    +    }
    +    return can_send;
    +}
    +
    +
    +static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t
*endpoint, bool insert_addr)
    +{
    +    if (!addr)
    +        return;
    +
    +    if (!endpoint)
    +        return;
    +
    +    qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr);
    +    qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
    +
    +    qdrc_endpoint_send_CT(core, endpoint, dlv, true);
    +}
    +
    +static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr)
    +{
    +    // We only care about mobile addresses.
    +    if(!qdr_address_is_mobile_CT(addr))
    +        return;
    +
    +    qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*)
context;
    +    switch (event) {
    +        case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
    +            //
    +            // This address transitioned from zero to one local destination. If this
address already has more than zero remote destinations, don't do anything
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed of the appearence
of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
    +                        if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
    +                            qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                            qdrc_send_message(addr_tracking->core, addr, endpoint,
true);
    +                        }
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_BECAME_DEST : {
    +            //
    +            // This address transitioned from zero to one destination. If this address
already had local destinations
    +            //
    +            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +            //
    +            // Every inlink that has an edge context must be informed of the appearence
of this address.
    +            //
    +            while (inlink) {
    +                if(inlink->link->edge_context != 0) {
    +                    qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
    +                    if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
    +                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, endpoint, true);
    +                    }
    +                }
    +                inlink = DEQ_NEXT(inlink);
    +            }
    +        }
    +        break;
    +
    +        case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
    +            // The address no longer has any local destinations.
    +            // If there are no remote destinations either, we have to tell the edge routers
to delete their sender links
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed of the disappearence
of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)inlink->link->edge_context;
    +                        qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, endpoint, false);
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
    +            //
    +            // This address transitioned from N destinations to one local dest
    +            // If this address already has non-zero remote destinations, there is no
need to tell the edge routers about it
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    --- End diff --
    
    This is an unnecessary check.  The cardinality of rnodes is, by definition, zero.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org


Mime
View raw message