pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhai...@apache.org
Subject [incubator-pulsar] branch master updated: Add some metrics to prometheus. (#2299)
Date Fri, 03 Aug 2018 10:04:14 GMT
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d91dfb   Add some metrics to prometheus. (#2299)
8d91dfb is described below

commit 8d91dfb449ec1be54a7f2fad2e02ea2212bdb1a4
Author: penghui <codelipenghui@gmail.com>
AuthorDate: Fri Aug 3 18:04:12 2018 +0800

     Add some metrics to prometheus. (#2299)
    
    * Add some metrics to prometheus.
    
    1.pulsar_subscription_back_log
    2.pulsar_subscription_msg_rate_redeliver
    3.pulsar_subscription_unacked_massages
    4.pulsar_subscription_blocked_on_unacked_messages
    5.pulsar_consumer_msg_rate_redeliver
    6.pulsar_consumer_unacked_massages
    7.pulsar_consumer_blocked_on_unacked_messages
    
    * Add some metrics to prometheus.
    
    1.pulsar_subscription_back_log
    2.pulsar_subscription_msg_rate_redeliver
    3.pulsar_subscription_unacked_massages
    4.pulsar_subscription_blocked_on_unacked_messages
    5.pulsar_consumer_msg_rate_redeliver
    6.pulsar_consumer_unacked_massages
    7.pulsar_consumer_blocked_on_unacked_messages
---
 conf/broker.conf                                   |    3 +
 docker/grafana/dashboards/topic.json               | 1160 ++++++++++++++++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |    9 +
 .../org/apache/pulsar/broker/PulsarService.java    |    2 +-
 .../stats/prometheus/AggregatedConsumerStats.java  |   34 +
 .../stats/prometheus/AggregatedNamespaceStats.java |   20 +
 .../prometheus/AggregatedSubscriptionStats.java    |   41 +
 .../stats/prometheus/NamespaceStatsAggregator.java |   41 +-
 .../prometheus/PrometheusMetricsGenerator.java     |    4 +-
 .../stats/prometheus/PrometheusMetricsServlet.java |    6 +-
 .../pulsar/broker/stats/prometheus/TopicStats.java |   53 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |    4 +-
 12 files changed, 1360 insertions(+), 17 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index a639b85..5f4a4e9 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -479,6 +479,9 @@ webSocketSessionIdleTimeoutMillis=300000
 # Enable topic level metrics
 exposeTopicLevelMetricsInPrometheus=true
 
+# Enable consumer level metrics. default is false
+# exposeConsumerLevelMetricsInPrometheus=false
+
 ### --- Functions --- ###
 
 # Enable Functions Worker Service in Broker
diff --git a/docker/grafana/dashboards/topic.json b/docker/grafana/dashboards/topic.json
new file mode 100644
index 0000000..d53a018
--- /dev/null
+++ b/docker/grafana/dashboards/topic.json
@@ -0,0 +1,1160 @@
+{
+  "__inputs": [
+    {
+      "name": "DS_default",
+      "label": "default",
+      "description": "",
+      "type": "datasource",
+      "pluginId": "prometheus",
+      "pluginName": "Prometheus"
+    }
+  ],
+  "__requires": [
+    {
+      "type": "grafana",
+      "id": "grafana",
+      "name": "Grafana",
+      "version": "5.1.0"
+    },
+    {
+      "type": "panel",
+      "id": "graph",
+      "name": "Graph",
+      "version": "5.0.0"
+    },
+    {
+      "type": "datasource",
+      "id": "prometheus",
+      "name": "Prometheus",
+      "version": "5.0.0"
+    }
+  ],
+  "annotations": {
+    "list": [
+      {
+        "builtIn": 1,
+        "datasource": "-- Grafana --",
+        "enable": true,
+        "hide": true,
+        "iconColor": "rgba(0, 211, 255, 1)",
+        "name": "Annotations & Alerts",
+        "type": "dashboard"
+      }
+    ]
+  },
+  "editable": true,
+  "gnetId": null,
+  "graphTooltip": 0,
+  "id": null,
+  "iteration": 1533285490117,
+  "links": [],
+  "panels": [
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 0
+      },
+      "id": 16,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "hideEmpty": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_rate_in{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "hide": false,
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{cluster}} - {{namespace}}",
+          "metric": "pulsar_rate_in",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local publish rate",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg/s",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 0
+      },
+      "id": 2,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_subscription_msg_rate_out{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "{{subscription}}",
+          "metric": "pulsar_rate_out",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local delivery rate",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg / s",
+          "logBase": 1,
+          "max": null,
+          "min": "0",
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 7
+      },
+      "id": 5,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_throughput_in{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{cluster}} - {{namespace}}",
+          "metric": "pulsar_throughput_in",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local publish throughput (bytes/s)",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "Bps",
+          "label": "",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "description": "",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 7
+      },
+      "id": 8,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_subscription_msg_throughput_out{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{subscription}}",
+          "metric": "pulsar_throughput_out",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local delivery throughput (bytes/s)",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "Bps",
+          "label": "",
+          "logBase": 1,
+          "max": null,
+          "min": "0",
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "decimals": 0,
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 14
+      },
+      "id": 7,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(pulsar_producers_count{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "producers",
+          "metric": "pulsar_producers_count",
+          "refId": "A",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_subscriptions_count{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "subscriptions",
+          "metric": "pulsar_subscriptions_count",
+          "refId": "B",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_consumers_count{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "consumers",
+          "metric": "pulsar_consumers_count",
+          "refId": "C",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Topics - Producers - Subscriptions - Consumers",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": [
+          "current"
+        ]
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "count",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 14
+      },
+      "id": 4,
+      "legend": {
+        "alignAsTable": false,
+        "avg": false,
+        "current": true,
+        "max": false,
+        "min": false,
+        "rightSide": false,
+        "show": true,
+        "total": false,
+        "values": true
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "pulsar_subscription_back_log{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "{{subscription}}",
+          "metric": "pulsar_msg_backlog",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Local backlog",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "Messages",
+          "logBase": 1,
+          "max": null,
+          "min": "0",
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {
+        "0 - 0.5 ms": "#2F575E",
+        "0.5 - 1 ms": "#3F6833",
+        "1 - 5 ms": "#629E51",
+        "10 - 20 ms": "#E5A8E2",
+        "100 - 200 ms": "#EF843C",
+        "20 - 50 ms": "#65C5DB",
+        "200 ms - 1 s": "#EA6460",
+        "5 - 10 ms": "#1F78C1",
+        "50 - 100 ms": "#E5AC0E",
+        "< +Inf ms": "#BF1B00",
+        "< 0.5 ms": "#508642",
+        "> 1 s": "#BF1B00"
+      },
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 5,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 21
+      },
+      "id": 3,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 0,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [
+        {
+          "alias": "< 100 ms",
+          "yaxis": 1
+        }
+      ],
+      "spaceLength": 10,
+      "stack": true,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_0_5{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "0 - 0.5 ms",
+          "metric": "pulsar_add_entry_latency_le_0_5",
+          "refId": "A",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_1{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "0.5 - 1 ms",
+          "metric": "pulsar_add_entry_latency_le_1",
+          "refId": "B",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_5{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "1 - 5 ms",
+          "metric": "pulsar_add_entry_latency_le_5",
+          "refId": "C",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_10{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "5 - 10 ms",
+          "metric": "pulsar_add_entry_latency_le_10",
+          "refId": "D",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_20{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "10 - 20 ms",
+          "metric": "pulsar_add_entry_latency_le_20",
+          "refId": "E",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_50{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "20 - 50 ms",
+          "metric": "pulsar_add_entry_latency_le_50",
+          "refId": "F",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_100{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "hide": false,
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "50 - 100 ms",
+          "metric": "pulsar_add_entry_latency_le_100",
+          "refId": "G",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_200{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "100 - 200 ms",
+          "metric": "pulsar_add_entry_latency_le_200",
+          "refId": "H",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_le_1000{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "intervalFactor": 2,
+          "legendFormat": "200 ms - 1 s",
+          "metric": "pulsar_add_entry_latency_le_1000",
+          "refId": "I",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_storage_write_latency_overflow{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "> 1 s",
+          "metric": "pulsar_add_entry_latency_overflow",
+          "refId": "J",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Storage Write Latency",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg / s",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 1,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 12,
+        "y": 21
+      },
+      "id": 9,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": false,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 1,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [],
+      "spaceLength": 10,
+      "stack": false,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(pulsar_storage_size{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"})",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "$namespace",
+          "metric": "pulsar_storage_size",
+          "refId": "A",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Storage Size",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "decbytes",
+          "label": "",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    },
+    {
+      "aliasColors": {},
+      "bars": false,
+      "dashLength": 10,
+      "dashes": false,
+      "datasource": "${DS_default}",
+      "fill": 6,
+      "gridPos": {
+        "h": 7,
+        "w": 12,
+        "x": 0,
+        "y": 28
+      },
+      "id": 12,
+      "legend": {
+        "avg": false,
+        "current": false,
+        "max": false,
+        "min": false,
+        "show": true,
+        "total": false,
+        "values": false
+      },
+      "lines": true,
+      "linewidth": 0,
+      "links": [],
+      "nullPointMode": "null",
+      "percentage": false,
+      "pointradius": 5,
+      "points": false,
+      "renderer": "flot",
+      "seriesOverrides": [
+        {
+          "alias": "< 2 KB",
+          "yaxis": 1
+        }
+      ],
+      "spaceLength": 10,
+      "stack": true,
+      "steppedLine": false,
+      "targets": [
+        {
+          "expr": "sum(pulsar_entry_size_le_128{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 128 bytes",
+          "metric": "pulsar_entry_size_le_128",
+          "refId": "A",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_512{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 512 bytes",
+          "metric": "pulsar_entry_size_le_512",
+          "refId": "B",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_1_kb{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 1 KB",
+          "metric": "pulsar_entry_size_le_1_kb",
+          "refId": "C",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_2_kb{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 2 KB",
+          "metric": "pulsar_entry_size_le_2_kb",
+          "refId": "D",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_4_kb{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 4 KB",
+          "metric": "pulsar_entry_size_le_4_kb",
+          "refId": "E",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_16_kb{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 16 KB",
+          "metric": "pulsar_entry_size_le_16_kb",
+          "refId": "F",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_100_kb{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 100 KB",
+          "metric": "pulsar_entry_size_le_100_kb",
+          "refId": "G",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_le_1_mb{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "< 1 MB",
+          "metric": "pulsar_entry_size_le_1_mb",
+          "refId": "H",
+          "step": 10
+        },
+        {
+          "expr": "sum(pulsar_entry_size_overflow{cluster=~\"$cluster\", namespace=~\"$namespace\", topic=~\"$topic\"}) / 60.0",
+          "format": "time_series",
+          "interval": "",
+          "intervalFactor": 2,
+          "legendFormat": "> 1 MB",
+          "metric": "pulsar_entry_size_le_overflow",
+          "refId": "I",
+          "step": 10
+        }
+      ],
+      "thresholds": [],
+      "timeFrom": null,
+      "timeShift": null,
+      "title": "Storage entry size",
+      "tooltip": {
+        "shared": true,
+        "sort": 0,
+        "value_type": "individual"
+      },
+      "type": "graph",
+      "xaxis": {
+        "buckets": null,
+        "mode": "time",
+        "name": null,
+        "show": true,
+        "values": []
+      },
+      "yaxes": [
+        {
+          "format": "short",
+          "label": "msg / s",
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": true
+        },
+        {
+          "format": "short",
+          "label": null,
+          "logBase": 1,
+          "max": null,
+          "min": null,
+          "show": false
+        }
+      ],
+      "yaxis": {
+        "align": false,
+        "alignLevel": null
+      }
+    }
+  ],
+  "refresh": "5s",
+  "schemaVersion": 16,
+  "style": "dark",
+  "tags": [],
+  "templating": {
+    "list": [
+      {
+        "allValue": null,
+        "current": {},
+        "datasource": "${DS_default}",
+        "hide": 0,
+        "includeAll": false,
+        "label": "Cluster",
+        "multi": false,
+        "name": "cluster",
+        "options": [],
+        "query": "{cluster=~\".+\"}",
+        "refresh": 1,
+        "regex": "/.*[^_]cluster=\\\"([^\\\"]+)\\\".*/",
+        "sort": 1,
+        "tagValuesQuery": "",
+        "tags": [],
+        "tagsQuery": "",
+        "type": "query",
+        "useTags": false
+      },
+      {
+        "allValue": null,
+        "current": {},
+        "datasource": "${DS_default}",
+        "hide": 0,
+        "includeAll": true,
+        "label": "Namespace",
+        "multi": false,
+        "name": "namespace",
+        "options": [],
+        "query": "{namespace=~\".+\"}",
+        "refresh": 2,
+        "regex": "/.*namespace=\\\"([^\\\"]+)\\\".*/",
+        "sort": 1,
+        "tagValuesQuery": "",
+        "tags": [],
+        "tagsQuery": "",
+        "type": "query",
+        "useTags": false
+      },
+      {
+        "allValue": null,
+        "current": {},
+        "datasource": "${DS_default}",
+        "hide": 0,
+        "includeAll": true,
+        "label": "Topic",
+        "multi": false,
+        "name": "topic",
+        "options": [],
+        "query": "{topic=~\".+\"}",
+        "refresh": 2,
+        "regex": "/.*topic=\\\"([^\\\"]+)\\\".*/",
+        "sort": 1,
+        "tagValuesQuery": "",
+        "tags": [],
+        "tagsQuery": "",
+        "type": "query",
+        "useTags": false
+      }
+    ]
+  },
+  "time": {
+    "from": "now-5m",
+    "to": "now"
+  },
+  "timepicker": {
+    "refresh_intervals": [
+      "5s",
+      "10s",
+      "30s",
+      "1m",
+      "5m",
+      "15m",
+      "30m",
+      "1h",
+      "2h",
+      "1d"
+    ],
+    "time_options": [
+      "5m",
+      "15m",
+      "1h",
+      "6h",
+      "12h",
+      "24h",
+      "2d",
+      "7d",
+      "30d"
+    ]
+  },
+  "timezone": "browser",
+  "title": "Pulsar - Topic",
+  "uid": "3xEtii5mk",
+  "version": 10
+}
\ No newline at end of file
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 182bc09..7656ecc 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -470,6 +470,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
     /**** --- Metrics --- ****/
     // If true, export topic level metrics otherwise namespace level
     private boolean exposeTopicLevelMetricsInPrometheus = true;
+    private boolean exposeConsumerLevelMetricsInPrometheus = false;
 
     /**** --- Functions --- ****/
     private boolean functionsWorkerEnabled = false;
@@ -1615,10 +1616,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
         return exposeTopicLevelMetricsInPrometheus;
     }
 
+    public boolean exposeConsumerLevelMetricsInPrometheus() {
+        return exposeConsumerLevelMetricsInPrometheus;
+    }
+
     public void setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
         this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
     }
 
+    public void setExposeConsumerLevelMetricsInPrometheus(boolean exposeConsumerLevelMetricsInPrometheus) {
+        this.exposeConsumerLevelMetricsInPrometheus = exposeConsumerLevelMetricsInPrometheus;
+    }
+
     public String getSchemaRegistryStorageClassName() {
        return schemaRegistryStorageClassName;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 8a811ac..93a045e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -357,7 +357,7 @@ public class PulsarService implements AutoCloseable {
             this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
 
             this.webService.addServlet("/metrics",
-                    new ServletHolder(new PrometheusMetricsServlet(this, config.exposeTopicLevelMetricsInPrometheus())),
+                    new ServletHolder(new PrometheusMetricsServlet(this, config.exposeTopicLevelMetricsInPrometheus(), config.exposeConsumerLevelMetricsInPrometheus())),
                     false, attributeMap);
 
             if (config.isWebSocketServiceEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
new file mode 100644
index 0000000..0fadf3e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+public class AggregatedConsumerStats {
+
+    public boolean blockedSubscriptionOnUnackedMsgs;
+
+    public double msgRateRedeliver;
+
+    public long unackedMessages;
+
+    public double msgRateOut;
+
+    public double msgThroughputOut;
+
+    public long availablePermits;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 299af88..995effc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -46,6 +46,8 @@ public class AggregatedNamespaceStats {
 
     public Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
 
+    public Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
+
     void updateStats(TopicStats stats) {
         topicsCount++;
 
@@ -77,6 +79,22 @@ public class AggregatedNamespaceStats {
             replStats.msgThroughputOut += as.msgThroughputOut;
             replStats.replicationBacklog += as.replicationBacklog;
         });
+
+        stats.subscriptionStats.forEach((n, as) -> {
+            AggregatedSubscriptionStats subsStats =
+                    subscriptionStats.computeIfAbsent(n, k -> new AggregatedSubscriptionStats());
+            subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs;
+            subsStats.msgBacklog += as.msgBacklog;
+            subsStats.msgRateRedeliver += as.msgRateRedeliver;
+            subsStats.unackedMessages += as.unackedMessages;
+            as.consumerStat.forEach((c, v) -> {
+                AggregatedConsumerStats consumerStats =
+                        subsStats.consumerStat.computeIfAbsent(c, k -> new AggregatedConsumerStats());
+                consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs;
+                consumerStats.msgRateRedeliver += v.msgRateRedeliver;
+                consumerStats.unackedMessages += v.unackedMessages;
+            });
+        });
     }
 
     public void reset() {
@@ -95,6 +113,8 @@ public class AggregatedNamespaceStats {
         storageReadRate = 0;
 
         replicationStats.clear();
+        subscriptionStats.clear();
+
         storageWriteLatencyBuckets.reset();
         entrySizeBuckets.reset();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
new file mode 100644
index 0000000..c46bbf5
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus;
+
+import org.apache.pulsar.broker.service.Consumer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AggregatedSubscriptionStats {
+
+    public long msgBacklog;
+
+    public boolean blockedSubscriptionOnUnackedMsgs;
+
+    public double msgRateRedeliver;
+
+    public long unackedMessages;
+
+    public double msgRateOut;
+
+    public double msgThroughputOut;
+
+    public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 22115ea..d07bc38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -43,7 +43,7 @@ public class NamespaceStatsAggregator {
         }
     };
 
-    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, SimpleTextOutputStream stream) {
+    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
         TopicStats topicStats = localTopicStats.get();
@@ -53,7 +53,7 @@ public class NamespaceStatsAggregator {
 
             bundlesMap.forEach((bundle, topicsMap) -> {
                 topicsMap.forEach((name, topic) -> {
-                    getTopicStats(topic, topicStats);
+                    getTopicStats(topic, topicStats, includeConsumerMetrics);
 
                     if (includeTopicMetrics) {
                         TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
@@ -71,12 +71,12 @@ public class NamespaceStatsAggregator {
         });
     }
 
-    private static void getTopicStats(Topic topic, TopicStats stats) {
+    private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics) {
         stats.reset();
 
         if (topic instanceof PersistentTopic) {
             // Managed Ledger stats
-            ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ((PersistentTopic)topic).getManagedLedger().getStats();
+            ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ((PersistentTopic) topic).getManagedLedger().getStats();
 
             stats.storageSize = mlStats.getStoredMessagesSize();
 
@@ -108,7 +108,32 @@ public class NamespaceStatsAggregator {
             stats.subscriptionsCount++;
             stats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
 
+            AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                    .computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
+            subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog();
+
             subscription.getConsumers().forEach(consumer -> {
+
+                // Consumer stats can be a lot if a subscription has many consumers
+                if (includeConsumerMetrics) {
+                    AggregatedConsumerStats consumerStats = subsStats.consumerStat
+                            .computeIfAbsent(consumer, k -> new AggregatedConsumerStats());
+                    consumerStats.unackedMessages = consumer.getStats().unackedMessages;
+                    consumerStats.msgRateRedeliver = consumer.getStats().msgRateRedeliver;
+                    consumerStats.msgRateOut = consumer.getStats().msgRateOut;
+                    consumerStats.msgThroughputOut = consumer.getStats().msgThroughputOut;
+                    consumerStats.availablePermits = consumer.getStats().availablePermits;
+                    consumerStats.blockedSubscriptionOnUnackedMsgs = consumer.getStats().blockedConsumerOnUnackedMsgs;
+                }
+
+                subsStats.unackedMessages += consumer.getStats().unackedMessages;
+                subsStats.msgRateRedeliver += consumer.getStats().msgRateRedeliver;
+                subsStats.msgRateOut += consumer.getStats().msgRateOut;
+                subsStats.msgThroughputOut += consumer.getStats().msgThroughputOut;
+                if (!subsStats.blockedSubscriptionOnUnackedMsgs && consumer.getStats().blockedConsumerOnUnackedMsgs) {
+                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
+                }
+
                 stats.consumersCount++;
                 stats.rateOut += consumer.getStats().msgRateOut;
                 stats.throughputOut += consumer.getStats().msgThroughputOut;
@@ -127,7 +152,7 @@ public class NamespaceStatsAggregator {
     }
 
     private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
-            AggregatedNamespaceStats stats) {
+                                            AggregatedNamespaceStats stats) {
         metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
         metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
         metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
@@ -192,19 +217,19 @@ public class NamespaceStatsAggregator {
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
-            long value) {
+                               long value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
-            double value) {
+                               double value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
-            String name, String remoteCluster, double value) {
+                                                String name, String remoteCluster, double value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace);
         stream.write("\", remote_cluster=\"").write(remoteCluster).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 167ec1c..8f0d30d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -63,14 +63,14 @@ public class PrometheusMetricsGenerator {
         }).register(CollectorRegistry.defaultRegistry);
     }
 
-    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, OutputStream out) throws IOException {
+    public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, OutputStream out) throws IOException {
         ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
         try {
             SimpleTextOutputStream stream = new SimpleTextOutputStream(buf);
 
             generateSystemMetrics(stream, pulsar.getConfiguration().getClusterName());
 
-            NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, stream);
+            NamespaceStatsAggregator.generate(pulsar, includeTopicMetrics, includeConsumerMetrics, stream);
 
             FunctionsStatsGenerator.generate(pulsar.getWorkerService(),
                     pulsar.getConfiguration().getClusterName(), stream);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index b924177..12058c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -43,12 +43,14 @@ public class PrometheusMetricsServlet extends HttpServlet {
 
     private final PulsarService pulsar;
     private final boolean shouldExportTopicMetrics;
+    private final boolean shouldExportConsumerMetrics;
 
     private ExecutorService executor = null;
 
-    public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics) {
+    public PrometheusMetricsServlet(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics) {
         this.pulsar = pulsar;
         this.shouldExportTopicMetrics = includeTopicMetrics;
+        this.shouldExportConsumerMetrics = includeConsumerMetrics;
     }
 
     @Override
@@ -65,7 +67,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
             try {
                 res.setStatus(HttpStatus.OK_200);
                 res.setContentType("text/plain");
-                PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, res.getOutputStream());
+                PrometheusMetricsGenerator.generate(pulsar, shouldExportTopicMetrics, shouldExportConsumerMetrics, res.getOutputStream());
                 context.complete();
 
             } catch (IOException e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index f5938b6..c592b8f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -44,6 +44,7 @@ class TopicStats {
     double storageReadRate;
 
     Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
+    Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
 
     public void reset() {
         subscriptionsCount = 0;
@@ -60,12 +61,13 @@ class TopicStats {
         storageReadRate = 0;
 
         replicationStats.clear();
+        subscriptionStats.clear();
         storageWriteLatencyBuckets.reset();
         entrySizeBuckets.reset();
     }
 
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            TopicStats stats) {
+                                TopicStats stats) {
 
         metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
         metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
@@ -108,12 +110,59 @@ class TopicStats {
         metric(stream, cluster, namespace, topic, "pulsar_entry_size_count", stats.entrySizeBuckets.getCount());
         metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum", stats.entrySizeBuckets.getSum());
 
+        stats.subscriptionStats.forEach((n, subsStats) -> {
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log", subsStats.msgBacklog);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver", subsStats.msgRateRedeliver);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_massages", subsStats.unackedMessages);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
+            subsStats.consumerStat.forEach((c, consumerStats) -> {
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_massages", consumerStats.unackedMessages);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_blocked_on_unacked_messages", consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_available_permits", consumerStats.availablePermits);
+            });
+        });
+
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-            String name, double value) {
+                               String name, double value) {
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
                 .write("\", topic=\"").write(topic).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
+                               String name, long value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
+                               String name, double value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
+                               String consumerName, long consumerId, String name, long value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription)
+                .write("\", consumer_name=\"").write(consumerName).write("\", consumer_id=\"").write(consumerId).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
+
+    private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
+                               String consumerName, long consumerId, String name, double value) {
+        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
+                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription)
+                .write("\", consumer_name=\"").write(consumerName).write("\", consumer_id=\"").write(consumerId).write("\"} ");
+        stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 62d490e..6ad8d6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -64,7 +64,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, true, statsOut);
+        PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);
@@ -110,7 +110,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         }
 
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
-        PrometheusMetricsGenerator.generate(pulsar, false, statsOut);
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
 
         Multimap<String, Metric> metrics = parseMetrics(metricsStr);


Mime
View raw message