qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tr...@apache.org
Subject qpid-dispatch git commit: DISPATCH-1099 - Added core-thread timer facility (based on a PR by Ganesh Murthy with edits and a test) This closes #362.
Date Wed, 22 Aug 2018 21:04:15 GMT
Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 385bbe9eb -> 2469f4bca


DISPATCH-1099 - Added core-thread timer facility (based on a PR by Ganesh Murthy with edits
and a test)
This closes #362.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2469f4bc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2469f4bc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2469f4bc

Branch: refs/heads/master
Commit: 2469f4bca88f95fc76b1b1252a1b435615689466
Parents: 385bbe9
Author: Ted Ross <tross@redhat.com>
Authored: Wed Aug 22 17:02:09 2018 -0400
Committer: Ted Ross <tross@redhat.com>
Committed: Wed Aug 22 17:02:09 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  14 +++
 src/CMakeLists.txt                    |   1 +
 src/router_core/core_timer.c          | 122 +++++++++++++++++++
 src/router_core/router_core_private.h |  50 ++++++++
 src/router_node.c                     |   3 +
 tests/CMakeLists.txt                  |   1 +
 tests/core_timer_test.c               | 187 +++++++++++++++++++++++++++++
 tests/run_unit_tests.c                |   2 +
 8 files changed, 380 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index ec2f1b4..5990ac1 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -60,6 +60,20 @@ void qdr_core_free(qdr_core_t *core);
 
 /**
  ******************************************************************************
+ * Miscellaneous functions
+ ******************************************************************************
+ */
+
+/**
+ * Drive the core-internal timer every one second.
+ *
+ * @param core Pointer to the core object returned by qd_core()
+ */
+void qdr_process_tick(qdr_core_t *core);
+
+
+/**
+ ******************************************************************************
  * Route table maintenance functions (Router Control)
  ******************************************************************************
  */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index bc132dd..91e7783 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -95,6 +95,7 @@ set(qpid_dispatch_SOURCES
   router_core/management_agent.c
   router_core/terminus.c
   router_core/transfer.c
+  router_core/core_timer.c
   router_node.c
   router_pynode.c
   schema_enum.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/src/router_core/core_timer.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_timer.c b/src/router_core/core_timer.c
new file mode 100644
index 0000000..0312290
--- /dev/null
+++ b/src/router_core/core_timer.c
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <qpid/dispatch/ctools.h>
+#include "router_core_private.h"
+
+ALLOC_DECLARE(qdr_core_timer_t);
+ALLOC_DEFINE(qdr_core_timer_t);
+
+void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+void qdr_process_tick(qdr_core_t *core)
+{
+    qdr_action_t *action = qdr_action(qdr_process_tick_CT, "process_tick");
+    qdr_action_enqueue(core, action);
+}
+
+
+
+qdr_core_timer_t *qdr_core_timer_CT(qdr_core_t *core, qdr_timer_cb_t callback, void *timer_context)
+{
+    qdr_core_timer_t *timer = new_qdr_core_timer_t();
+    if (!timer)
+        return 0;
+
+    ZERO(timer);
+    timer->handler = callback;
+    timer->context = timer_context;
+    DEQ_ITEM_INIT(timer);
+
+    return timer;
+}
+
+
+void qdr_core_timer_schedule_CT(qdr_core_t *core, qdr_core_timer_t *timer, uint32_t delay)
+{
+    qdr_core_timer_t *ptr         = DEQ_HEAD(core->scheduled_timers);
+    uint32_t          time_before = 0;
+
+    while (ptr && time_before + ptr->delta_time_seconds <= delay) {
+        time_before += ptr->delta_time_seconds;
+        ptr = DEQ_NEXT(ptr);
+    }
+
+    //
+    // ptr is the first timer to exceed duration or NULL if we ran out
+    //
+    timer->delta_time_seconds = delay - time_before;
+    timer->scheduled          = true;
+
+    if (!ptr)
+        DEQ_INSERT_TAIL(core->scheduled_timers, timer);
+    else {
+        ptr->delta_time_seconds -= timer->delta_time_seconds;
+        ptr = DEQ_PREV(ptr);
+        if (ptr)
+            DEQ_INSERT_AFTER(core->scheduled_timers, timer, ptr);
+        else
+            DEQ_INSERT_HEAD(core->scheduled_timers, timer);
+    }
+}
+
+
+void qdr_core_timer_cancel_CT(qdr_core_t *core, qdr_core_timer_t *timer)
+{
+    if (timer->scheduled && DEQ_NEXT(timer)) {
+        DEQ_NEXT(timer)->delta_time_seconds += timer->delta_time_seconds;
+        DEQ_REMOVE(core->scheduled_timers, timer);
+        timer->scheduled = false;
+    }
+}
+
+
+void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer)
+{
+    if (!timer)
+        return;
+
+    qdr_core_timer_cancel_CT(core, timer);
+    free_qdr_core_timer_t(timer);
+}
+
+
+void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+
+    qdr_core_timer_t *timer = DEQ_HEAD(core->scheduled_timers);
+
+    while (timer && timer->delta_time_seconds == 0) {
+        assert(timer->scheduled);
+        if (timer->handler)
+            timer->handler(core, timer->context);
+        timer->scheduled = false;
+        timer = DEQ_NEXT(timer);
+        DEQ_REMOVE_HEAD(core->scheduled_timers);
+    }
+
+    timer = DEQ_HEAD(core->scheduled_timers);
+    if (timer)
+        timer->delta_time_seconds--;
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index c82f3c8..2ebe38c 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -647,6 +647,20 @@ struct qdr_conn_identifier_t {
 ALLOC_DECLARE(qdr_conn_identifier_t);
 DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t);
 
+// Core timer related field/data structures
+typedef void (*qdr_timer_cb_t)(qdr_core_t *core, void* context);
+
+typedef struct qdr_core_timer_t {
+    DEQ_LINKS(struct qdr_core_timer_t);
+    qdr_timer_cb_t  handler;
+    void           *context;
+    uint32_t        delta_time_seconds;
+    bool            scheduled;
+} qdr_core_timer_t;
+
+ALLOC_DECLARE(qdr_core_timer_t);
+DEQ_DECLARE(qdr_core_timer_t, qdr_core_timer_list_t);
+
 
 struct qdr_core_t {
     qd_dispatch_t     *qd;
@@ -659,6 +673,7 @@ struct qdr_core_t {
     sys_mutex_t       *action_lock;
 
     sys_mutex_t             *work_lock;
+    qdr_core_timer_list_t    scheduled_timers;
     qdr_general_work_list_t  work_list;
     qd_timer_t              *work_timer;
 
@@ -765,6 +780,7 @@ struct qdr_terminus_t {
 
 ALLOC_DECLARE(qdr_terminus_t);
 
+
 void *router_core_thread(void *arg);
 uint64_t qdr_identifier(qdr_core_t* core);
 void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost);
@@ -847,4 +863,38 @@ qdr_query_t *qdr_query(qdr_core_t              *core,
                        qd_router_entity_type_t  type,
                        qd_composed_field_t     *body);
 
+/**
+ * Create a new timer which will only be used inside the code thread.
+ *
+ * @param core Pointer to the core object returned by qd_core()
+ * @callback Callback function to be invoked when timer fires.
+ * @timer_context Context to be used when firing callback
+ */
+qdr_core_timer_t *qdr_core_timer_CT(qdr_core_t *core, qdr_timer_cb_t callback, void *timer_context);
+
+
+/**
+ * Schedules a core timer with a delay. The timer will fire after "delay" seconds
+ * @param core Pointer to the core object returned by qd_core()
+ * @param timer Timer object that needs to be scheduled.
+ * @param delay The number of seconds to wait before firing the timer
+ */
+void qdr_core_timer_schedule_CT(qdr_core_t *core, qdr_core_timer_t *timer, uint32_t delay);
+
+/**
+ * Cancels an already scheduled timeer. This does not free the timer. It is the responsibility
of the person who
+ * created the timer to free it.
+ * @param core Pointer to the core object returned by qd_core()
+ * @param timer Timer object that needs to be scheduled.
+ *
+ */
+void qdr_core_timer_cancel_CT(qdr_core_t *core, qdr_core_timer_t *timer);
+
+/**
+ * Cancels the timer if it is scheduled and and free it.
+ * @param core Pointer to the core object returned by qd_core()
+ * @param timer Timer object that needs to be scheduled.
+ */
+void qdr_core_timer_free_CT(qdr_core_t *core, qdr_core_timer_t *timer);
+
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 5285894..c7bec23 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -1179,6 +1179,9 @@ static void qd_router_timer_handler(void *context)
     // Periodic processing.
     //
     qd_pyrouter_tick(router);
+
+    // This sends a tick into the core and this happens every second.
+    qdr_process_tick(router->router_core);
     qd_timer_schedule(router->timer, 1000);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 313ee9d..4185aa5 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -32,6 +32,7 @@ set(unit_test_SOURCES
     tool_test.c
     failoverlist_test.c
     timer_test.c
+    core_timer_test.c
     parse_tree_tests
     proton_utils_tests.c
     )

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/tests/core_timer_test.c
----------------------------------------------------------------------
diff --git a/tests/core_timer_test.c b/tests/core_timer_test.c
new file mode 100644
index 0000000..9a31057
--- /dev/null
+++ b/tests/core_timer_test.c
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "test_case.h"
+#include <stdio.h>
+#include <string.h>
+#include "router_core/router_core_private.h"
+
+void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+static int results[5];
+
+static void callback(qdr_core_t *unused, void *context) {
+    results[(long) context]++;
+}
+
+
+static char* test_core_timer(void *context)
+{
+    qdr_core_t *core = NEW(qdr_core_t);
+    ZERO(core);
+
+    qdr_core_timer_t *timers[5];
+
+    for (long i = 0; i < 5; i++) {
+        timers[i]  = qdr_core_timer_CT(core, callback, (void*) i);
+        results[i] = 0;
+    }
+
+    qdr_core_timer_schedule_CT(core, timers[0], 5);
+    qdr_core_timer_schedule_CT(core, timers[1], 10);
+    qdr_core_timer_schedule_CT(core, timers[2], 1);
+    qdr_core_timer_schedule_CT(core, timers[3], 15);
+    qdr_core_timer_schedule_CT(core, timers[4], 0);
+
+    //
+    // Test the discard
+    //
+    qdr_process_tick_CT(core, 0, true);
+    qdr_process_tick_CT(core, 0, true);
+    qdr_process_tick_CT(core, 0, true);
+    if (results[0] != 0 ||
+        results[1] != 0 ||
+        results[2] != 0 ||
+        results[3] != 0 ||
+        results[4] != 0)
+        return "Received a callback on a discard tick events";
+
+    //
+    // Test zero-length timer
+    //
+    qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 0 ||
+        results[1] != 0 ||
+        results[2] != 0 ||
+        results[3] != 0 ||
+        results[4] != 1)
+        return "Expected zero-length timer to fire once";
+    
+    //
+    // Test 1-timer
+    //
+    qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 0 ||
+        results[1] != 0 ||
+        results[2] != 1 ||
+        results[3] != 0 ||
+        results[4] != 1)
+        return "Expected timer(1) to fire once";
+
+    //
+    // Cancel 10-timer
+    //
+    qdr_core_timer_cancel_CT(core, timers[1]);
+    
+    //
+    // Test 5-timer
+    //
+    qdr_process_tick_CT(core, 0, false);
+    qdr_process_tick_CT(core, 0, false);
+    qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 0 ||
+        results[1] != 0 ||
+        results[2] != 1 ||
+        results[3] != 0 ||
+        results[4] != 1)
+        return "Expected timer(5) to not have fired yet";
+    qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 1 ||
+        results[1] != 0 ||
+        results[2] != 1 ||
+        results[3] != 0 ||
+        results[4] != 1)
+        return "Expected timer(5) to fire once";
+
+    //
+    // Test 15-timer
+    //
+    for (long i = 0; i < 9; i++)
+        qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 1 ||
+        results[1] != 0 ||
+        results[2] != 1 ||
+        results[3] != 0 ||
+        results[4] != 1)
+        return "Expected timer(15) and timer(10) to not have fired";
+    qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 1 ||
+        results[1] != 0 ||
+        results[2] != 1 ||
+        results[3] != 1 ||
+        results[4] != 1)
+        return "Expected timer(15) to fire once";
+
+    //
+    // Run with no timers for awhile
+    //
+    for (long i = 0; i < 100; i++)
+        qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 1 ||
+        results[1] != 0 ||
+        results[2] != 1 ||
+        results[3] != 1 ||
+        results[4] != 1)
+        return "Expected no timers to have fired when all are not scheduled";
+
+    //
+    // Re-schedule some timers at the same time
+    //
+    qdr_core_timer_schedule_CT(core, timers[0], 5);
+    qdr_core_timer_schedule_CT(core, timers[1], 5);
+    qdr_process_tick_CT(core, 0, false);
+    qdr_process_tick_CT(core, 0, false);
+    qdr_process_tick_CT(core, 0, false);
+    qdr_process_tick_CT(core, 0, false);
+    qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 1 ||
+        results[1] != 0 ||
+        results[2] != 1 ||
+        results[3] != 1 ||
+        results[4] != 1)
+        return "Expected no timers to have fired while waiting for 5-timers";
+    qdr_process_tick_CT(core, 0, false);
+    if (results[0] != 2 ||
+        results[1] != 1 ||
+        results[2] != 1 ||
+        results[3] != 1 ||
+        results[4] != 1)
+        return "Expected both 5-timers to have fired";
+
+    //
+    // Free up resources
+    //
+    for (long i = 0; i < 5; i++)
+        qdr_core_timer_free_CT(core, timers[i]);
+
+    return 0;
+}
+
+
+int core_timer_tests(void)
+{
+    int result = 0;
+    char *test_group = "core_timer_tests";
+
+    TEST_CASE(test_core_timer, 0);
+
+    return result;
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2469f4bc/tests/run_unit_tests.c
----------------------------------------------------------------------
diff --git a/tests/run_unit_tests.c b/tests/run_unit_tests.c
index ff95786..3434a90 100644
--- a/tests/run_unit_tests.c
+++ b/tests/run_unit_tests.c
@@ -24,6 +24,7 @@
 
 int tool_tests(void);
 int timer_tests(qd_dispatch_t*);
+int core_timer_tests(void);
 int alloc_tests(void);
 int compose_tests(void);
 int policy_tests(void);
@@ -63,6 +64,7 @@ int main(int argc, char** argv)
     result += failoverlist_tests();
     result += parse_tree_tests();
     result += proton_utils_tests();
+    result += core_timer_tests();
 
     qd_dispatch_free(qd);       // dispatch_free last.
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message