This is an automated email from the ASF dual-hosted git repository.
gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 8222d0a DISPATCH-1278: initial support for prometheus metrics export
8222d0a is described below
commit 8222d0ae63f187ffc42086dd709ac0e7e17700ab
Author: Gordon Sim <gsim@redhat.com>
AuthorDate: Wed Mar 6 10:30:50 2019 +0000
DISPATCH-1278: initial support for prometheus metrics export
---
include/qpid/dispatch/router_core.h | 24 ++++
include/qpid/dispatch/server.h | 5 +
python/qpid_dispatch/management/qdrouter.json | 6 +
src/connection_manager.c | 4 +
src/dispatch.c | 4 +
src/dispatch_private.h | 2 +
src/http-libwebsockets.c | 170 +++++++++++++++++++++++++-
src/router_core/router_core.c | 42 +++++++
src/router_core/router_core_private.h | 12 +-
tests/system_tests_http.py | 36 ++++++
10 files changed, 303 insertions(+), 2 deletions(-)
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index f7d6c51..2f7da25 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -837,4 +837,28 @@ qdr_connection_info_t *qdr_connection_info(bool is_encrypted,
int ssl_ssf,
bool ssl);
+
+typedef struct {
+ size_t connections;
+ size_t links;
+ size_t addrs;
+ size_t routers;
+ size_t link_routes;
+ size_t auto_links;
+ size_t presettled_deliveries;
+ size_t dropped_presettled_deliveries;
+ size_t accepted_deliveries;
+ size_t rejected_deliveries;
+ size_t released_deliveries;
+ size_t modified_deliveries;
+ size_t deliveries_ingress;
+ size_t deliveries_egress;
+ size_t deliveries_transit;
+ size_t deliveries_ingress_route_container;
+ size_t deliveries_egress_route_container;
+} qdr_global_stats_t;
+ALLOC_DECLARE(qdr_global_stats_t);
+typedef void (*qdr_global_stats_handler_t) (void *context);
+void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t
callback, void *context);
+
#endif
diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index e56519f..94f14cc 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -133,6 +133,11 @@ typedef struct qd_server_config_t {
char *protocol_family;
/**
+ * Export metrics.
+ */
+ bool metrics;
+
+ /**
* Accept HTTP connections, allow WebSocket "amqp" protocol upgrades.
*/
bool http;
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 023c7c2..5a5900e 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -828,6 +828,12 @@
"deprecationName": "failoverList",
"description": "A comma-separated list of failover urls to be supplied
to connected clients. Form: [(amqp|amqps|ws|wss)://]host_or_ip[:port]"
},
+ "metrics": {
+ "type": "boolean",
+ "default": true,
+ "description": "Export metrics in prometheus text format for the router
(using path /metrics).",
+ "create": true
+ },
"http": {
"type": "boolean",
"default": false,
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 98eadd4..8f8cbe7 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -312,6 +312,7 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t
*conf
config->role = qd_entity_get_string(entity, "role");
CHECK();
config->inter_router_cost = qd_entity_opt_long(entity, "cost", 1);
CHECK();
config->protocol_family = qd_entity_opt_string(entity, "protocolFamily", 0);
CHECK();
+ config->metrics = qd_entity_opt_bool(entity, "metrics", true);
CHECK();
config->http = qd_entity_opt_bool(entity, "http", false);
CHECK();
config->http_root_dir = qd_entity_opt_string(entity, "httpRootDir", false);
CHECK();
@@ -323,6 +324,9 @@ static qd_error_t load_server_config(qd_dispatch_t *qd, qd_server_config_t
*conf
if (config->http && ! config->http_root_dir) {
qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "HTTP service is requested
but no httpRootDir specified. The router will serve AMQP-over-websockets but no static content.");
}
+ if (config->metrics && !config->http) {
+ qd_log(qd->connection_manager->log_source, QD_LOG_INFO, "Metrics can only be
exported on listener with http enabled.");
+ }
config->max_frame_size = qd_entity_get_long(entity, "maxFrameSize");
CHECK();
config->max_sessions = qd_entity_get_long(entity, "maxSessions");
CHECK();
diff --git a/src/dispatch.c b/src/dispatch.c
index 4906b39..eb2d195 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -367,3 +367,7 @@ void qd_dispatch_free(qd_dispatch_t *qd)
void qd_dispatch_router_lock(qd_dispatch_t *qd) { sys_mutex_lock(qd->router->lock);
}
void qd_dispatch_router_unlock(qd_dispatch_t *qd) { sys_mutex_unlock(qd->router->lock);
}
+
+qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd) {
+ return qd->router->router_core;
+}
diff --git a/src/dispatch_private.h b/src/dispatch_private.h
index f5a089c..a01db9b 100644
--- a/src/dispatch_private.h
+++ b/src/dispatch_private.h
@@ -124,4 +124,6 @@ void qd_dispatch_unregister_entity(qd_dispatch_t *qd, void *impl);
/** Set the agent */
void qd_dispatch_set_agent(qd_dispatch_t *qd, void *agent);
+qdr_core_t* qd_dispatch_router_core(qd_dispatch_t *qd);
+
#endif
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index b2f0d9c..0690c0c 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -19,6 +19,7 @@
#include <qpid/dispatch/atomic.h>
#include <qpid/dispatch/amqp.h>
+#include <qpid/dispatch/router_core.h>
#include <qpid/dispatch/threading.h>
#include <qpid/dispatch/timer.h>
@@ -95,6 +96,14 @@ typedef struct connection_t {
struct lws *wsi;
} connection_t;
+typedef struct stats_t {
+ size_t current;
+ bool headers_sent;
+ qdr_global_stats_t stats;
+ qd_http_server_t *server;
+ struct lws *wsi;
+} stats_t;
+
/* Navigating from WSI pointer to qd objects */
static qd_http_server_t *wsi_server(struct lws *wsi);
static qd_http_listener_t *wsi_listener(struct lws *wsi);
@@ -106,6 +115,8 @@ static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len);
static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len);
+static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len);
static struct lws_protocols protocols[] = {
/* HTTP only protocol comes first */
@@ -128,6 +139,11 @@ static struct lws_protocols protocols[] = {
callback_amqpws,
sizeof(connection_t),
},
+ {
+ "http",
+ callback_metrics,
+ sizeof(stats_t),
+ },
{ NULL, NULL, 0, 0 } /* terminator */
};
@@ -161,7 +177,7 @@ static int handle_events(connection_t* c) {
/* The server has a bounded, thread-safe queue for external work */
typedef struct work_t {
- enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP } type;
+ enum { W_NONE, W_LISTEN, W_CLOSE, W_WAKE, W_STOP, W_HANDLE_STATS } type;
void *value;
} work_t;
@@ -177,6 +193,7 @@ typedef struct work_queue_t {
/* HTTP Server runs in a single thread, communication from other threads via work_queue */
struct qd_http_server_t {
qd_server_t *server;
+ qdr_core_t *core;
sys_thread_t *thread;
work_queue_t work;
qd_log_source_t *log;
@@ -230,6 +247,7 @@ struct qd_http_listener_t {
qd_http_server_t *server;
struct lws_vhost *vhost;
struct lws_http_mount mount;
+ struct lws_http_mount metrics;
};
void qd_http_listener_free(qd_http_listener_t *hl) {
@@ -283,6 +301,14 @@ static void listener_start(qd_http_listener_t *hl, qd_http_server_t *hs)
{
m->def = "index.html"; /* Default file name */
m->origin_protocol = LWSMPRO_FILE; /* mount type is a directory in a filesystem */
m->extra_mimetypes = mime_types;
+ if (config->metrics) {
+ struct lws_http_mount *metrics = &hl->metrics;
+ m->mount_next = metrics;
+ metrics->mountpoint = "/metrics";
+ metrics->mountpoint_len = strlen(metrics->mountpoint);
+ metrics->origin_protocol = LWSMPRO_CALLBACK;
+ metrics->protocol = "http";
+ }
struct lws_context_creation_info info = {0};
info.mounts = m;
@@ -361,6 +387,143 @@ static void connection_wake(qd_connection_t *qd_conn)
}
}
+static void handle_stats_results(void *context)
+{
+ stats_t* stats = (stats_t*) context;
+ qd_http_server_t *hs = stats->server;
+ if (hs) {
+ work_t w = { W_HANDLE_STATS, stats->wsi };
+ work_push(hs, w);
+ }
+}
+
+typedef int (*int_metric) (qdr_global_stats_t *stats);
+typedef struct metric_definition {
+ const char* name;
+ const char* type;
+ int_metric value;
+} metric_definition;
+
+static int stats_get_connections(qdr_global_stats_t *stats) { return stats->connections;
}
+static int stats_get_links(qdr_global_stats_t *stats) { return stats->links; }
+static int stats_get_addrs(qdr_global_stats_t *stats) { return stats->addrs; }
+static int stats_get_routers(qdr_global_stats_t *stats) { return stats->routers; }
+static int stats_get_link_routes(qdr_global_stats_t *stats) { return stats->link_routes;
}
+static int stats_get_auto_links(qdr_global_stats_t *stats) { return stats->auto_links;
}
+static int stats_get_presettled_deliveries(qdr_global_stats_t *stats) { return stats->presettled_deliveries;
}
+static int stats_get_dropped_presettled_deliveries(qdr_global_stats_t *stats) { return stats->dropped_presettled_deliveries;
}
+static int stats_get_accepted_deliveries(qdr_global_stats_t *stats) { return stats->accepted_deliveries;
}
+static int stats_get_released_deliveries(qdr_global_stats_t *stats) { return stats->released_deliveries;
}
+static int stats_get_rejected_deliveries(qdr_global_stats_t *stats) { return stats->rejected_deliveries;
}
+static int stats_get_modified_deliveries(qdr_global_stats_t *stats) { return stats->modified_deliveries;
}
+static int stats_get_deliveries_ingress(qdr_global_stats_t *stats) { return stats->deliveries_ingress;
}
+static int stats_get_deliveries_egress(qdr_global_stats_t *stats) { return stats->deliveries_egress;
}
+static int stats_get_deliveries_transit(qdr_global_stats_t *stats) { return stats->deliveries_transit;
}
+static int stats_get_deliveries_ingress_route_container(qdr_global_stats_t *stats) { return
stats->deliveries_ingress_route_container; }
+static int stats_get_deliveries_egress_route_container(qdr_global_stats_t *stats) { return
stats->deliveries_egress_route_container; }
+
+static struct metric_definition metrics[] = {
+ {"connections", "gauge", stats_get_connections},
+ {"links", "gauge", stats_get_links},
+ {"addresses", "gauge", stats_get_addrs},
+ {"routers", "gauge", stats_get_routers},
+ {"link_routes", "gauge", stats_get_link_routes},
+ {"auto_links", "gauge", stats_get_auto_links},
+ {"presettled_deliveries", "counter", stats_get_presettled_deliveries},
+ {"dropped_presettled_deliveries", "counter", stats_get_dropped_presettled_deliveries},
+ {"accepted_deliveries", "counter", stats_get_accepted_deliveries},
+ {"released_deliveries", "counter", stats_get_released_deliveries},
+ {"rejected_deliveries", "counter", stats_get_rejected_deliveries},
+ {"modified_deliveries", "counter", stats_get_modified_deliveries},
+ {"deliveries_ingress", "counter", stats_get_deliveries_ingress},
+ {"deliveries_egress", "counter", stats_get_deliveries_egress},
+ {"deliveries_transit", "counter", stats_get_deliveries_transit},
+ {"deliveries_ingress_route_container", "counter", stats_get_deliveries_ingress_route_container},
+ {"deliveries_egress_route_container", "counter", stats_get_deliveries_egress_route_container}
+};
+static size_t metrics_length = sizeof(metrics)/sizeof(metrics[0]);
+
+static bool write_stats(uint8_t **position, const uint8_t * const end, const char* name,
const char* type, int value)
+{
+ //11 chars + type + 2*name + 20 chars for int
+ size_t length = 11 + strlen(type) + strlen(name)*2 + 20;
+ if (end - *position >= length) {
+ *position += lws_snprintf((char*) *position, end - *position, "# TYPE %s %s\n", name,
type);
+ *position += lws_snprintf((char*) *position, end - *position, "%s %i\n", name, value);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+static bool write_metric(uint8_t **position, const uint8_t * const end, metric_definition*
definition, qdr_global_stats_t* stats)
+{
+ return write_stats(position, end, definition->name, definition->type, definition->value(stats));
+}
+
+static int add_header_by_name(struct lws *wsi, const char* name, const char* value, uint8_t**
position, uint8_t* end)
+{
+ return lws_add_http_header_by_name(wsi, (unsigned char*) name, (unsigned char*) value,
strlen(value), position, end);
+}
+
+static int callback_metrics(struct lws *wsi, enum lws_callback_reasons reason,
+ void *user, void *in, size_t len)
+{
+ qd_http_server_t *hs = wsi_server(wsi);
+ stats_t *stats = (stats_t*) user;
+ uint8_t buffer[LWS_PRE + 2048];
+ uint8_t *start = &buffer[LWS_PRE], *position = start, *end = &buffer[sizeof(buffer)
- LWS_PRE - 1];
+
+ switch (reason) {
+
+ case LWS_CALLBACK_HTTP: {
+ stats->wsi = wsi;
+ stats->server = hs;
+ //request stats from core thread
+ qdr_request_global_stats(hs->core, &stats->stats, handle_stats_results,
(void*) stats);
+ return 0;
+ }
+
+ case LWS_CALLBACK_HTTP_WRITEABLE: {
+ //encode stats into buffer
+ if (!stats->headers_sent) {
+ if (lws_add_http_header_status(wsi, HTTP_STATUS_OK, &position, end)
+ || add_header_by_name(wsi, "content-type:", "text/plain", &position,
end)
+ || add_header_by_name(wsi, "connection:", "close", &position, end))
+ return 1;
+ if (lws_finalize_http_header(wsi, &position, end))
+ return 1;
+ stats->headers_sent = true;
+ }
+
+ while (stats->current < metrics_length) {
+ if (write_metric(&position, end, &metrics[stats->current], &stats->stats))
{
+ stats->current++;
+ qd_log(hs->log, QD_LOG_DEBUG, "wrote metric %i of %i", stats->current,
metrics_length);
+ } else {
+ qd_log(hs->log, QD_LOG_DEBUG, "insufficient space in buffer");
+ break;
+ }
+ }
+ int n = stats->current < metrics_length ? LWS_WRITE_HTTP : LWS_WRITE_HTTP_FINAL;
+
+ //write buffer
+ size_t available = position - start;
+ if (lws_write(wsi, (unsigned char*) start, available, n) != available)
+ return 1;
+ if (n == LWS_WRITE_HTTP_FINAL) {
+ if (lws_http_transaction_completed(wsi)) return -1;
+ } else {
+ lws_callback_on_writable(wsi);
+ }
+ return 0;
+ }
+
+ default:
+ return 0;
+ }
+}
+
/* Callbacks for promoted AMQP over WS connections. */
static int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason,
void *user, void *in, size_t len)
@@ -494,6 +657,9 @@ static void* http_thread_run(void* v) {
case W_CLOSE:
listener_close((qd_http_listener_t*)w.value, hs);
break;
+ case W_HANDLE_STATS:
+ lws_callback_on_writable((struct lws*) w.value);
+ break;
case W_WAKE: {
connection_t *c = w.value;
pn_collector_put(c->driver.collector, PN_OBJECT, c->driver.connection,
@@ -546,6 +712,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log)
{
hs->context = lws_create_context(&info);
hs->server = s;
hs->log = log; /* For messages from this file */
+ hs->core = 0; // not yet available
if (!hs->context) {
qd_log(hs->log, QD_LOG_CRITICAL, "No memory starting HTTP server");
qd_http_server_free(hs);
@@ -559,6 +726,7 @@ qd_http_server_t *qd_http_server(qd_server_t *s, qd_log_source_t *log)
{
qd_http_listener_t *qd_http_server_listen(qd_http_server_t *hs, qd_listener_t *li)
{
+ hs->core = qd_dispatch_router_core(qd_server_dispatch(hs->server));
sys_mutex_lock(hs->work.lock);
if (!hs->thread) {
hs->thread = sys_thread(http_thread_run, hs);
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 99fe99b..5b5fbec 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -749,3 +749,45 @@ void qdr_connection_work_free_CT(qdr_connection_work_t *work)
qdr_terminus_free(work->target);
free_qdr_connection_work_t(work);
}
+
+static void qdr_post_global_stats_response(qdr_core_t *core, qdr_general_work_t *work)
+{
+ work->stats_handler(work->context);
+}
+
+static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ qdr_global_stats_t *stats = action->args.stats_request.stats;
+ stats->addrs = DEQ_SIZE(core->addrs);
+ stats->links = DEQ_SIZE(core->open_links);
+ stats->routers = DEQ_SIZE(core->routers);
+ stats->connections = DEQ_SIZE(core->open_connections);
+ stats->link_routes = DEQ_SIZE(core->link_routes);
+ stats->auto_links = DEQ_SIZE(core->auto_links);
+ stats->presettled_deliveries = core->presettled_deliveries;
+ stats->dropped_presettled_deliveries = core->dropped_presettled_deliveries;
+ stats->accepted_deliveries = core->accepted_deliveries;
+ stats->rejected_deliveries = core->rejected_deliveries;
+ stats->released_deliveries = core->released_deliveries;
+ stats->modified_deliveries = core->modified_deliveries;
+ stats->deliveries_ingress = core->deliveries_ingress;
+ stats->deliveries_egress = core->deliveries_egress;
+ stats->deliveries_transit = core->deliveries_transit;
+ stats->deliveries_ingress_route_container = core->deliveries_ingress_route_container;
+ stats->deliveries_egress_route_container = core->deliveries_egress_route_container;
+
+ qdr_general_work_t *work = qdr_general_work(qdr_post_global_stats_response);
+ work->stats_handler = action->args.stats_request.handler;
+ work->context = action->args.stats_request.context;
+ qdr_post_general_work_CT(core, work);
+}
+
+void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t
callback, void *context)
+{
+ qdr_action_t *action = qdr_action(qdr_global_stats_request_CT, "global_stats_request");
+ action->args.stats_request.stats = stats;
+ action->args.stats_request.handler = callback;
+ action->args.stats_request.context = context;
+ qdr_action_enqueue(core, action);
+}
+
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 894d19e..6ddaff1 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -166,6 +166,15 @@ struct qdr_action_t {
} agent;
//
+ // Arguments for stats request actions
+ //
+ struct {
+ qdr_global_stats_t *stats;
+ qdr_global_stats_handler_t handler;
+ void *context;
+ } stats_request;
+
+ //
// Arguments for general use
//
struct {
@@ -218,6 +227,8 @@ struct qdr_general_work_t {
uint64_t in_conn_id;
int treatment;
qdr_delivery_cleanup_list_t delivery_cleanup_list;
+ qdr_global_stats_handler_t stats_handler;
+ void *context;
};
ALLOC_DECLARE(qdr_general_work_t);
@@ -316,7 +327,6 @@ struct qdr_query_t {
DEQ_DECLARE(qdr_query_t, qdr_query_list_t);
-
struct qdr_node_t {
DEQ_LINKS(qdr_node_t);
qdr_address_t *owning_addr;
diff --git a/tests/system_tests_http.py b/tests/system_tests_http.py
index 4da09eb..f7e901c 100644
--- a/tests/system_tests_http.py
+++ b/tests/system_tests_http.py
@@ -145,6 +145,42 @@ class RouterTestHttp(TestCase):
# https not configured
self.assertRaises(URLError, urlopen, "https://localhost:%d/nosuch" % r.ports[0])
+ def test_http_metrics(self):
+
+ if not sys.version_info >= (2, 7):
+ return
+
+ config = Qdrouterd.Config([
+ ('router', {'id': 'QDR.METRICS'}),
+ ('listener', {'port': self.get_port(), 'http': 'yes'}),
+ ('listener', {'port': self.get_port(), 'httpRootDir': os.path.dirname(__file__)}),
+ ])
+ r = self.qdrouterd('metrics-test-router', config)
+
+ def test(port):
+ result = urlopen("http://localhost:%d/metrics" % port, cafile=self.ssl_file('ca-certificate.pem'))
+ self.assertEqual(200, result.getcode())
+ data = result.read().decode('utf-8')
+ assert('connections' in data)
+ assert('deliveries_ingress' in data)
+
+ # Sequential calls on multiple ports
+ for port in r.ports: test(port)
+
+ # Concurrent calls on multiple ports
+ class TestThread(threading.Thread):
+ def __init__(self, port):
+ threading.Thread.__init__(self)
+ self.port, self.ex = port, None
+ self.start()
+ def run(self):
+ try: test(self.port)
+ except Exception as e: self.ex = e
+ threads = [TestThread(p) for p in r.ports + r.ports]
+ for t in threads: t.join()
+ for t in threads:
+ if t.ex: raise t.ex
+
def test_https_get(self):
def listener(**kwargs):
args = dict(kwargs)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
|