This is an automated email from the ASF dual-hosted git repository.
aconway pushed a commit to branch schedule-zero
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit de5c509d073f279a5b66668b38016e8555b13e7a
Author: Alan Conway <aconway@redhat.com>
AuthorDate: Fri Mar 1 10:49:54 2019 -0500
DISPATCH-1274: Optimize qd_timer_schedule(0)
Introduced pn_immediate_t, a simpler schedule for immediate requests.
qd_timer_schedule delegates schedule(0) requests.
---
src/CMakeLists.txt | 1 +
src/immediate.c | 96 ++++++++++++++++++++++++++++
src/{timer_private.h => immediate_private.h} | 39 +++++------
src/server.c | 20 ++++--
src/server_private.h | 1 +
src/timer.c | 15 ++++-
src/timer_private.h | 2 +
7 files changed, 150 insertions(+), 24 deletions(-)
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1544db9..cb8ad70 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -66,6 +66,7 @@ set(qpid_dispatch_SOURCES
entity_cache.c
failoverlist.c
hash.c
+ immediate.c
iterator.c
log.c
message.c
diff --git a/src/immediate.c b/src/immediate.c
new file mode 100644
index 0000000..6149be7
--- /dev/null
+++ b/src/immediate.c
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "immediate_private.h"
+#include "server_private.h"
+
+#include <qpid/dispatch/threading.h>
+#include <assert.h>
+
+struct qd_immediate_t {
+ qd_server_t *server;
+ void (*handler)(void* context);
+ void *context;
+ bool armed;
+};
+
+/* Array rather than list for fast access and cache-coherence */
+static qd_immediate_t immediates[256] = {0};
+static size_t count = 0;
+static sys_mutex_t *lock = NULL;
+
+void qd_immediate_initialize(void) {
+ lock = sys_mutex();
+}
+
+void qd_immediate_finalize(void) {
+ sys_mutex_free(lock);
+ lock = 0;
+}
+
+qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* context) {
+ sys_mutex_lock(lock);
+ if (count >= sizeof(immediates)/sizeof(immediates[0])) {
+ assert("exceeded max number of qd_immediate_t objects" == 0);
+ return 0;
+ }
+ qd_immediate_t *i = &immediates[count++];
+ i->server = qd ? qd->server : NULL;
+ i->handler = handler;
+ i->context = context;
+ i->armed = false;
+ sys_mutex_unlock(lock);
+ return i;
+}
+
+void qd_immediate_arm(qd_immediate_t *i) {
+ bool interrupt = false;
+ sys_mutex_lock(lock);
+ if (!i->armed) {
+ interrupt = i->armed = true;
+ }
+ sys_mutex_unlock(lock);
+ if (interrupt && i->server) {
+ qd_server_interrupt(i->server);
+ }
+}
+
+void qd_immediate_disarm(qd_immediate_t *i) {
+ sys_mutex_lock(lock);
+ i->armed = false;
+ sys_mutex_unlock(lock);
+}
+
+void qd_immediate_free(qd_immediate_t *i) {
+ /* Just disarm, its harmless to leave it in place. */
+ qd_immediate_disarm(i);
+}
+
+void qd_immediate_visit() {
+ sys_mutex_lock(lock);
+ for (qd_immediate_t *i = immediates; i < immediates + count; ++i) {
+ if (i->armed) {
+ i->armed = false;
+ sys_mutex_unlock(lock);
+ i->handler(i->context);
+ sys_mutex_lock(lock);
+ }
+ }
+ sys_mutex_unlock(lock);
+}
diff --git a/src/timer_private.h b/src/immediate_private.h
similarity index 52%
copy from src/timer_private.h
copy to src/immediate_private.h
index 537eb4b..cd8d11b 100644
--- a/src/timer_private.h
+++ b/src/immediate_private.h
@@ -1,5 +1,5 @@
-#ifndef __timer_private_h__
-#define __timer_private_h__ 1
+#ifndef __immediate_private_h__
+#define __immediate_private_h__ 1
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -19,26 +19,27 @@
* under the License.
*/
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/timer.h>
-#include <qpid/dispatch/threading.h>
-struct qd_timer_t {
- DEQ_LINKS(qd_timer_t);
- qd_server_t *server;
- qd_timer_cb_t handler;
- void *context;
- qd_timestamp_t delta_time;
- bool scheduled; /* true means on scheduled list, false on idle list */
-};
+#include <qpid/dispatch/dispatch.h>
+#include <qpid/dispatch/server.h>
+#include <stdint.h>
-DEQ_DECLARE(qd_timer_t, qd_timer_list_t);
+/* Immediate actions - used by timer to optimize schedule(0) */
-void qd_timer_initialize(sys_mutex_t *server_lock);
-void qd_timer_finalize(void);
-void qd_timer_visit();
+void qd_immediate_initialize(void);
+void qd_immediate_finalize(void);
+void qd_immediate_visit(void);
-/// For tests only
-sys_mutex_t* qd_timer_lock();
+typedef struct qd_immediate_t qd_immediate_t;
+
+qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* context);
+
+/* Arm causes a call to handler(context) ASAP in a server thread. */
+void qd_immediate_arm(qd_immediate_t *);
+
+/* After disarm() returns, there will be no handler() call unless re-armed. */
+void qd_immediate_disarm(qd_immediate_t *);
+
+void qd_immediate_free(qd_immediate_t *);
#endif
diff --git a/src/server.c b/src/server.c
index 24add4e..1863546 100644
--- a/src/server.c
+++ b/src/server.c
@@ -38,6 +38,7 @@
#include "entity.h"
#include "entity_cache.h"
#include "dispatch_private.h"
+#include "immediate_private.h"
#include "policy.h"
#include "server_private.h"
#include "timer_private.h"
@@ -68,6 +69,7 @@ struct qd_server_t {
uint64_t next_connection_id;
void *py_displayname_obj;
qd_http_server_t *http;
+ bool stopping;
};
#define HEARTBEAT_INTERVAL 1000
@@ -905,10 +907,15 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t
*pn_co
switch (pn_event_type(e)) {
case PN_PROACTOR_INTERRUPT:
- /* Interrupt the next thread */
- pn_proactor_interrupt(qd_server->proactor);
- /* Stop the current thread */
- return false;
+ if (qd_server->stopping) {
+ /* Interrupt the next thread */
+ pn_proactor_interrupt(qd_server->proactor);
+ /* Stop the current thread */
+ return false;
+ } else {
+ /* Check for immediate tasks */
+ qd_immediate_visit();
+ }
case PN_PROACTOR_TIMEOUT:
qd_timer_visit();
@@ -1296,6 +1303,7 @@ void qd_server_run(qd_dispatch_t *qd)
void qd_server_stop(qd_dispatch_t *qd)
{
+ qd->server->stopping = true;
/* Interrupt the proactor, async-signal-safe */
pn_proactor_interrupt(qd->server->proactor);
}
@@ -1505,6 +1513,10 @@ void qd_server_timeout(qd_server_t *server, qd_duration_t duration)
{
pn_proactor_set_timeout(server->proactor, duration);
}
+void qd_server_interrupt(qd_server_t *server) {
+ pn_proactor_interrupt(server->proactor);
+}
+
qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; }
const char* qd_connection_name(const qd_connection_t *c) {
diff --git a/src/server_private.h b/src/server_private.h
index 9f3c75c..9fcec6c 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -38,6 +38,7 @@
qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
void qd_server_timeout(qd_server_t *server, qd_duration_t delay);
+void qd_server_interrupt(qd_server_t *server);
qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config);
diff --git a/src/timer.c b/src/timer.c
index a4aae2a..0fd87c7 100644
--- a/src/timer.c
+++ b/src/timer.c
@@ -18,8 +18,9 @@
*/
#include "dispatch_private.h"
-#include "timer_private.h"
+#include "immediate_private.h"
#include "server_private.h"
+#include "timer_private.h"
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/alloc.h>
@@ -54,6 +55,7 @@ static void timer_cancel_LH(qd_timer_t *timer)
DEQ_INSERT_TAIL(idle_timers, timer);
timer->scheduled = false;
}
+ qd_immediate_disarm(timer->immediate);
}
/* Adjust timer's time_base and delays for the current time. */
@@ -95,6 +97,7 @@ qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
timer->context = context;
timer->delta_time = 0;
timer->scheduled = false;
+ timer->immediate = qd_immediate(qd, cb, context);
sys_mutex_lock(lock);
DEQ_INSERT_TAIL(idle_timers, timer);
sys_mutex_unlock(lock);
@@ -109,6 +112,7 @@ void qd_timer_free(qd_timer_t *timer)
sys_mutex_lock(lock);
timer_cancel_LH(timer);
DEQ_REMOVE(idle_timers, timer);
+ qd_immediate_free(timer->immediate);
sys_mutex_unlock(lock);
free_qd_timer_t(timer);
}
@@ -124,6 +128,11 @@ qd_timestamp_t qd_timer_now() {
void qd_timer_schedule(qd_timer_t *timer, qd_duration_t duration)
{
sys_mutex_lock(lock);
+ if (duration == 0) {
+ qd_immediate_arm(timer->immediate);
+ sys_mutex_unlock(lock);
+ return;
+ }
timer_cancel_LH(timer); // Timer is now on the idle list
DEQ_REMOVE(idle_timers, timer);
@@ -175,6 +184,7 @@ void qd_timer_cancel(qd_timer_t *timer)
void qd_timer_initialize(sys_mutex_t *server_lock)
{
+ qd_immediate_initialize();
lock = server_lock;
DEQ_INIT(idle_timers);
DEQ_INIT(scheduled_timers);
@@ -185,6 +195,7 @@ void qd_timer_initialize(sys_mutex_t *server_lock)
void qd_timer_finalize(void)
{
lock = 0;
+ qd_immediate_finalize();
}
@@ -196,6 +207,7 @@ void qd_timer_visit()
qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
while (timer && timer->delta_time == 0) {
timer_cancel_LH(timer); /* Removes timer from scheduled_timers */
+ qd_immediate_disarm(timer->immediate);
sys_mutex_unlock(lock);
timer->handler(timer->context); /* Call the handler outside the lock, may re-schedule
*/
sys_mutex_lock(lock);
@@ -206,4 +218,5 @@ void qd_timer_visit()
qd_server_timeout(first->server, first->delta_time);
}
sys_mutex_unlock(lock);
+ qd_immediate_visit();
}
diff --git a/src/timer_private.h b/src/timer_private.h
index 537eb4b..263fca5 100644
--- a/src/timer_private.h
+++ b/src/timer_private.h
@@ -19,6 +19,7 @@
* under the License.
*/
+#include "immediate_private.h"
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/timer.h>
#include <qpid/dispatch/threading.h>
@@ -29,6 +30,7 @@ struct qd_timer_t {
qd_timer_cb_t handler;
void *context;
qd_timestamp_t delta_time;
+ qd_immediate_t *immediate; /* Optimized path for schedule(0) */
bool scheduled; /* true means on scheduled list, false on idle list */
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
|